Refactor log-pusher.py to support daemonization.

* modules/openstack_project/files/logstash/log-pusher.py: The
log-pusher.py script does not cleanly daemonize on its own. Refactor the
script to make it possible for it to daemonize itself.

Change-Id: I974b8370f4dced357beb92ea8e74b1a60cb148b5
Reviewed-on: https://review.openstack.org/28365
Reviewed-by: Jeremy Stanley <fungi@yuggoth.org>
Approved: James E. Blair <corvus@inaugust.com>
Reviewed-by: James E. Blair <corvus@inaugust.com>
Tested-by: Jenkins
This commit is contained in:
Clark Boylan 2013-05-06 17:21:38 -07:00 committed by Jenkins
parent 054e261dee
commit 1491c3e6c8

View File

@ -266,6 +266,104 @@ class TCPLogProcessor(INETLogProcessor):
socket_type = socket.SOCK_STREAM
class Server(object):
def __init__(self, config, debuglog):
# Config init.
self.config = config
self.defaults = self.config['source-defaults']
self.default_source_url = self.defaults['source-url']
self.default_output_host = self.defaults['output-host']
self.default_output_port = self.defaults['output-port']
self.default_output_mode = self.defaults['output-mode']
self.default_retry = self.defaults['retry-get']
# Pythong logging output file.
self.debuglog = debuglog
# Input, retriever, output details
self.catchers = []
self.event_queues = []
self.retrievers = []
# TODO(clarkb) support multiple outputs
self.logqueue = queue.Queue()
self.processor = None
def setup_logging(self):
if self.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=self.debuglog, level=logging.DEBUG)
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
def setup_retrievers(self):
for source_file in self.config['source-files']:
eventqueue = queue.Queue()
self.event_queues.append(eventqueue)
retriever = LogRetriever(eventqueue, self.logqueue,
source_file.get('source-url',
self.default_source_url),
source_file['name'],
retry=source_file.get('retry-get',
self.default_retry),
job_filter=source_file.get('filter',
''))
self.retrievers.append(retriever)
def setup_catchers(self):
for zmq_publisher in self.config['zmq-publishers']:
catcher = EventCatcher(self.event_queues, zmq_publisher)
self.catchers.append(catcher)
def setup_processor(self):
if self.default_output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.default_output_host,
self.default_output_port)
elif self.default_output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.default_output_host,
self.default_output_port)
else:
self.processor = StdOutLogProcessor(self.logqueue)
def main(self):
self.setup_retrievers()
self.setup_catchers()
self.setup_processor()
for catcher in self.catchers:
catcher.daemon = True
catcher.start()
for retriever in self.retrievers:
retriever.daemon = True
retriever.start()
while True:
try:
self.processor.handle_log_event()
except:
logging.exception("Exception processing log event.")
raise
class DaemonContext(object):
def __init__(self):
# Set pidfile path.
pass
def __enter__(self):
# change umask
# chdir
# double fork
# redirect stdin, stdout, stderr to /dev/null
# lock pidfile
pass
def __exit__(self, exc_type, exc_value, traceback):
# remove pidfile
pass
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True,
@ -273,69 +371,21 @@ def main():
parser.add_argument("-d", "--debuglog",
help="Enable debug log. "
"Specifies file to write log to.")
parser.add_argument("--foreground", action='store_true',
help="Run in the foreground.")
args = parser.parse_args()
if args.debuglog:
logging.basicConfig(format='%(asctime)s %(message)s',
filename=args.debuglog, level=logging.DEBUG)
with open(args.config, 'r') as config_stream:
config = yaml.load(config_stream)
server = Server(config, args.debuglog)
if args.foreground:
server.setup_logging()
server.main()
else:
# Prevent leakage into the logstash log stream.
logging.basicConfig(level=logging.CRITICAL)
logging.debug("Log pusher starting.")
config_stream = open(args.config, 'r')
config = yaml.load(config_stream)
defaults = config['source-defaults']
default_source_url = defaults['source-url']
default_output_host = defaults['output-host']
default_output_port = defaults['output-port']
default_output_mode = defaults['output-mode']
default_retry = defaults['retry-get']
event_queues = []
# TODO(clarkb) support multiple outputs
logqueue = queue.Queue()
retrievers = []
for source_file in config['source-files']:
eventqueue = queue.Queue()
event_queues.append(eventqueue)
retriever = LogRetriever(eventqueue, logqueue,
source_file.get('source-url',
default_source_url),
source_file['name'],
retry=source_file.get('retry-get',
default_retry),
job_filter=source_file.get('filter',
''))
retrievers.append(retriever)
catchers = []
for zmq_publisher in config['zmq-publishers']:
catcher = EventCatcher(event_queues, zmq_publisher)
catchers.append(catcher)
if default_output_mode == "tcp":
processor = TCPLogProcessor(logqueue,
default_output_host, default_output_port)
elif default_output_mode == "udp":
processor = UDPLogProcessor(logqueue,
default_output_host, default_output_port)
else:
processor = StdOutLogProcessor(logqueue)
for catcher in catchers:
catcher.daemon = True
catcher.start()
for retriever in retrievers:
retriever.daemon = True
retriever.start()
while True:
try:
processor.handle_log_event()
except:
logging.exception("Exception processing log event.")
raise
with DaemonContext():
server.setup_logging()
server.main()
if __name__ == '__main__':