diff --git a/modules/openstack_project/files/logstash/log-pusher.py b/modules/openstack_project/files/logstash/log-pusher.py index c0321ec53b..004b7408d6 100644 --- a/modules/openstack_project/files/logstash/log-pusher.py +++ b/modules/openstack_project/files/logstash/log-pusher.py @@ -266,6 +266,104 @@ class TCPLogProcessor(INETLogProcessor): socket_type = socket.SOCK_STREAM +class Server(object): + def __init__(self, config, debuglog): + # Config init. + self.config = config + self.defaults = self.config['source-defaults'] + self.default_source_url = self.defaults['source-url'] + self.default_output_host = self.defaults['output-host'] + self.default_output_port = self.defaults['output-port'] + self.default_output_mode = self.defaults['output-mode'] + self.default_retry = self.defaults['retry-get'] + # Pythong logging output file. + self.debuglog = debuglog + # Input, retriever, output details + self.catchers = [] + self.event_queues = [] + self.retrievers = [] + # TODO(clarkb) support multiple outputs + self.logqueue = queue.Queue() + self.processor = None + + def setup_logging(self): + if self.debuglog: + logging.basicConfig(format='%(asctime)s %(message)s', + filename=self.debuglog, level=logging.DEBUG) + else: + # Prevent leakage into the logstash log stream. + logging.basicConfig(level=logging.CRITICAL) + logging.debug("Log pusher starting.") + + def setup_retrievers(self): + for source_file in self.config['source-files']: + eventqueue = queue.Queue() + self.event_queues.append(eventqueue) + retriever = LogRetriever(eventqueue, self.logqueue, + source_file.get('source-url', + self.default_source_url), + source_file['name'], + retry=source_file.get('retry-get', + self.default_retry), + job_filter=source_file.get('filter', + '')) + self.retrievers.append(retriever) + + def setup_catchers(self): + for zmq_publisher in self.config['zmq-publishers']: + catcher = EventCatcher(self.event_queues, zmq_publisher) + self.catchers.append(catcher) + + def setup_processor(self): + if self.default_output_mode == "tcp": + self.processor = TCPLogProcessor(self.logqueue, + self.default_output_host, + self.default_output_port) + elif self.default_output_mode == "udp": + self.processor = UDPLogProcessor(self.logqueue, + self.default_output_host, + self.default_output_port) + else: + self.processor = StdOutLogProcessor(self.logqueue) + + def main(self): + self.setup_retrievers() + self.setup_catchers() + self.setup_processor() + + for catcher in self.catchers: + catcher.daemon = True + catcher.start() + for retriever in self.retrievers: + retriever.daemon = True + retriever.start() + + while True: + try: + self.processor.handle_log_event() + except: + logging.exception("Exception processing log event.") + raise + + +class DaemonContext(object): + def __init__(self): + # Set pidfile path. + pass + + def __enter__(self): + # change umask + # chdir + # double fork + # redirect stdin, stdout, stderr to /dev/null + # lock pidfile + pass + + def __exit__(self, exc_type, exc_value, traceback): + # remove pidfile + pass + + def main(): parser = argparse.ArgumentParser() parser.add_argument("-c", "--config", required=True, @@ -273,69 +371,21 @@ def main(): parser.add_argument("-d", "--debuglog", help="Enable debug log. " "Specifies file to write log to.") + parser.add_argument("--foreground", action='store_true', + help="Run in the foreground.") args = parser.parse_args() - if args.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=args.debuglog, level=logging.DEBUG) + with open(args.config, 'r') as config_stream: + config = yaml.load(config_stream) + server = Server(config, args.debuglog) + + if args.foreground: + server.setup_logging() + server.main() else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - config_stream = open(args.config, 'r') - config = yaml.load(config_stream) - defaults = config['source-defaults'] - default_source_url = defaults['source-url'] - default_output_host = defaults['output-host'] - default_output_port = defaults['output-port'] - default_output_mode = defaults['output-mode'] - default_retry = defaults['retry-get'] - - event_queues = [] - # TODO(clarkb) support multiple outputs - logqueue = queue.Queue() - retrievers = [] - for source_file in config['source-files']: - eventqueue = queue.Queue() - event_queues.append(eventqueue) - retriever = LogRetriever(eventqueue, logqueue, - source_file.get('source-url', - default_source_url), - source_file['name'], - retry=source_file.get('retry-get', - default_retry), - job_filter=source_file.get('filter', - '')) - retrievers.append(retriever) - - catchers = [] - for zmq_publisher in config['zmq-publishers']: - catcher = EventCatcher(event_queues, zmq_publisher) - catchers.append(catcher) - - if default_output_mode == "tcp": - processor = TCPLogProcessor(logqueue, - default_output_host, default_output_port) - elif default_output_mode == "udp": - processor = UDPLogProcessor(logqueue, - default_output_host, default_output_port) - else: - processor = StdOutLogProcessor(logqueue) - - for catcher in catchers: - catcher.daemon = True - catcher.start() - for retriever in retrievers: - retriever.daemon = True - retriever.start() - - while True: - try: - processor.handle_log_event() - except: - logging.exception("Exception processing log event.") - raise + with DaemonContext(): + server.setup_logging() + server.main() if __name__ == '__main__':