diff --git a/modules/openstack_project/files/logstash/log-pusher.py b/modules/openstack_project/files/logstash/log-pusher.py index b2f58939cd..c071d3138c 100644 --- a/modules/openstack_project/files/logstash/log-pusher.py +++ b/modules/openstack_project/files/logstash/log-pusher.py @@ -21,6 +21,7 @@ import logging import threading import time import queue +import socket import sys import urllib.error import urllib.request @@ -203,7 +204,7 @@ class LogRetriever(threading.Thread): return -class LogProcessor(object): +class StdOutLogProcessor(object): def __init__(self, logq, pretty_print=False): self.logq = logq self.pretty_print = pretty_print @@ -219,6 +220,37 @@ class LogProcessor(object): sys.stdout.flush() +class INETLogProcessor(object): + socket_type = None + + def __init__(self, logq, host='localhost', port=9999): + self.logq = logq + self.host = host + self.port = port + self._connect_socket() + + def _connect_socket(self): + logging.debug("Creating socket.") + self.socket = socket.socket(socket.AF_INET, self.socket_type) + self.socket.connect((self.host, self.port)) + + def handle_log_event(self): + log = self.logq.get() + try: + self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + except: + logging.exception("Exception sending INET event.") + self._connect_socket() + + +class UDPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_DGRAM + + +class TCPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_STREAM + + def main(): parser = argparse.ArgumentParser() parser.add_argument("-z", "--zmqaddress", required=True, @@ -234,6 +266,10 @@ def main(): parser.add_argument("-d", "--debuglog", help="Enable debug log. " "Specifies file to write log to.") + parser.add_argument("-u", "--udp", action="store_true", + help="Output to UDP destination.") + parser.add_argument("-t", "--tcp", action="store_true", + help="Output to TCP destination.") args = parser.parse_args() if args.debuglog: @@ -248,7 +284,12 @@ def main(): catcher = EventCatcher(eventqueue, args.zmqaddress) retriever = LogRetriever(eventqueue, logqueue, args.logaddress, args.filename, retry=args.retry) - processor = LogProcessor(logqueue, pretty_print=args.pretty) + if args.tcp: + processor = TCPLogProcessor(logqueue) + elif args.udp: + processor = UDPLogProcessor(logqueue) + else: + processor = StdOutLogProcessor(logqueue, pretty_print=args.pretty) catcher.daemon = True catcher.start()