From 03408d1d835ce5cd1d37762d29953a3ec7fd2bbd Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Wed, 1 May 2013 18:13:21 -0700 Subject: [PATCH] Add udp and tcp output to log-pusher.py. * modules/openstack_project/files/logstash/log-pusher.py: The logstash pipe input is unreliable. Extend the log-pusher.py script to support udp and tcp output so that the logstash udp and tcp inputs are options. Change-Id: I84b0dc17f89ee5b551536da33c35c905df444bba Reviewed-on: https://review.openstack.org/27999 Reviewed-by: James E. Blair Approved: Jeremy Stanley Reviewed-by: Jeremy Stanley Tested-by: Jenkins --- .../files/logstash/log-pusher.py | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/modules/openstack_project/files/logstash/log-pusher.py b/modules/openstack_project/files/logstash/log-pusher.py index b2f58939cd..c071d3138c 100644 --- a/modules/openstack_project/files/logstash/log-pusher.py +++ b/modules/openstack_project/files/logstash/log-pusher.py @@ -21,6 +21,7 @@ import logging import threading import time import queue +import socket import sys import urllib.error import urllib.request @@ -203,7 +204,7 @@ class LogRetriever(threading.Thread): return -class LogProcessor(object): +class StdOutLogProcessor(object): def __init__(self, logq, pretty_print=False): self.logq = logq self.pretty_print = pretty_print @@ -219,6 +220,37 @@ class LogProcessor(object): sys.stdout.flush() +class INETLogProcessor(object): + socket_type = None + + def __init__(self, logq, host='localhost', port=9999): + self.logq = logq + self.host = host + self.port = port + self._connect_socket() + + def _connect_socket(self): + logging.debug("Creating socket.") + self.socket = socket.socket(socket.AF_INET, self.socket_type) + self.socket.connect((self.host, self.port)) + + def handle_log_event(self): + log = self.logq.get() + try: + self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + except: + logging.exception("Exception sending INET event.") + self._connect_socket() + + +class UDPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_DGRAM + + +class TCPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_STREAM + + def main(): parser = argparse.ArgumentParser() parser.add_argument("-z", "--zmqaddress", required=True, @@ -234,6 +266,10 @@ def main(): parser.add_argument("-d", "--debuglog", help="Enable debug log. " "Specifies file to write log to.") + parser.add_argument("-u", "--udp", action="store_true", + help="Output to UDP destination.") + parser.add_argument("-t", "--tcp", action="store_true", + help="Output to TCP destination.") args = parser.parse_args() if args.debuglog: @@ -248,7 +284,12 @@ def main(): catcher = EventCatcher(eventqueue, args.zmqaddress) retriever = LogRetriever(eventqueue, logqueue, args.logaddress, args.filename, retry=args.retry) - processor = LogProcessor(logqueue, pretty_print=args.pretty) + if args.tcp: + processor = TCPLogProcessor(logqueue) + elif args.udp: + processor = UDPLogProcessor(logqueue) + else: + processor = StdOutLogProcessor(logqueue, pretty_print=args.pretty) catcher.daemon = True catcher.start()