diff --git a/modules.env b/modules.env index 3ac8acedd0..59107696f3 100644 --- a/modules.env +++ b/modules.env @@ -69,6 +69,7 @@ INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-iptables"] INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-zuul"]="origin/master" INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-kibana"]="origin/master" INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-lodgeit"]="origin/master" +INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-log_processor"]="origin/master" INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-logstash"]="origin/master" INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-meetbot"]="origin/master" INTEGRATION_MODULES["https://git.openstack.org/openstack-infra/puppet-mysql_backup"]="origin/master" diff --git a/modules/log_processor/files/classify-log.crm b/modules/log_processor/files/classify-log.crm deleted file mode 100755 index 66b02bc48a..0000000000 --- a/modules/log_processor/files/classify-log.crm +++ /dev/null @@ -1,123 +0,0 @@ -#! /usr/bin/crm -# -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# This script trains an OSB (Orthogonal Sparse Bigram) bayesian filter -# with log lines from test runs and classifies each line according to -# the likelyhood it indicates an error. Very little experimentation -# has been done to determine the best classifier and training method; -# further experimentation may be useful. - -# The training method is TET -- Train Every Thing. This is not -# normally advised as a training method for Bayesian filters. In -# experiments, it identified about twice as many lines as being -# associated with errers as were indicated by a TOE (Train On Error) -# method. Some of them were false positives, but many were not, and -# of those, it had a much higher (pR ~= 37) confidence in them than -# TOE. TET seems to give qualitatively better results when filtering -# for higher pR values. - -# Set unbuffered IO -window - -# Base component of path to data files -isolate (:prefix:) /:*:_arg2:/ - -# Whether this run is for a SUCCESS or FAILURE result -isolate (:target:) /:*:_arg3:/ - -# Train each file on a newline just to make sure it exists -learn [:_nl:] (:*:prefix:/SUCCESS.css) -learn [:_nl:] (:*:prefix:/FAILURE.css) -{ - # Iterate over each line - window /\n/ /\n/ - { - isolate (:stats:) - isolate (:result:) - isolate (:prob:) - isolate (:pr:) - # Save a copy of this line - isolate (:line:) /:*:_dw:/ - { - { - # Remove things that look like timestamps from the beginning of the line - match (:timestamp:) /^[-.0-9 |:]+/ - alter (:timestamp:) // - } - { - # Don't treat UUIDs as uniquely special. - match (:uuidtoken:) /[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}/ - alter (:uuidtoken:) /UUIDTOKEN/ - { - match (:uuidtoken:) /[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}/ - alter (:uuidtoken:) /UUIDTOKEN/ - # Loop to replace all TOKENS in line - liaf - } - } - { - # Don't treat IDs as uniquely special. - match (:idtoken:) /[[:xdigit:]]{32,40}/ - alter (:idtoken:) /IDTOKEN/ - { - match (:idtoken:) /[[:xdigit:]]{32,40}/ - alter (:idtoken:) /IDTOKEN/ - # Loop to replace all TOKENS in line - liaf - } - } - { - # Don't treat IDs as uniquely special. - match (:numtoken:) /-[[:digit:]]{7,}/ - alter (:numtoken:) /-NUMTOKEN/ - { - match (:numtoken:) /-[[:digit:]]{7,}/ - alter (:numtoken:) /-NUMTOKEN/ - # Loop to replace all TOKENS in line - liaf - } - } - # Train on the line - learn (:*:prefix:/:*:target:.css) - # Classify the line to see if it looks more like a SUCCESS or FAILURE line - classify (:*:prefix:/SUCCESS.css :*:prefix:/FAILURE.css) (:stats:) - { - # The stats variable looks like: - # CLASSIFY succeeds; success probability: 1.0000 pR: 304.6527 - # Best match to file #0 (/tmp/crm114/console_html/SUCCESS.css) prob: 0.9933 pR: 2.1720 - # Total features in input file: 20 - # #0 (/tmp/crm114/console_html/SUCCESS.css): features: 3544235, hits: 901854, prob: 9.93e-01, pR: 2.17 - # #1 (/tmp/crm114/console_html/FAILURE.css): features: 1, hits: 0, prob: 6.69e-03, pR: -2.17 - # Pull out the filename, probability, and pR (a kind of logarithmic probability, see CRM docs) - match [:stats:] /^Best match to .*\/([A-Za-z]+).css\) prob: ([-.0-9]+) pR: ([-.0-9]+)/ ( :: :result: :prob: :pr: ) - { - # If this line is classified as FAILURE, negate - # the pR value (which will always be positive). - # Do this by prepending a '-' or the empty string. - { - match [:result:] /FAILURE/ - alter (:result:) /-/ - } alius { - alter (:result:) // - } - } - # Output the sign and pR value for this line. - output /:*:result::*:pr:\n/ - } - } - } - liaf -} diff --git a/modules/log_processor/files/jenkins-log-client.init b/modules/log_processor/files/jenkins-log-client.init deleted file mode 100755 index 04357b3d10..0000000000 --- a/modules/log_processor/files/jenkins-log-client.init +++ /dev/null @@ -1,158 +0,0 @@ -#! /bin/sh -### BEGIN INIT INFO -# Provides: jenkins-log-client -# Required-Start: $remote_fs $syslog -# Required-Stop: $remote_fs $syslog -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Jenkins Log Client -# Description: Service to push Jenkins logs into logstash. -### END INIT INFO - -# Do NOT "set -e" - -# PATH should only include /usr/* if it runs after the mountnfs.sh script -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DESC="Jenkins Log Client" -NAME=jenkins-log-client -DAEMON=/usr/local/bin/log-gearman-client.py -PIDFILE=/var/run/$NAME/$NAME.pid -DAEMON_ARGS="-c /etc/logstash/jenkins-log-client.yaml -d /var/log/logstash/log-client-debug.log -p $PIDFILE" -SCRIPTNAME=/etc/init.d/$NAME -USER=logstash - -# Exit if the package is not installed -[ -x "$DAEMON" ] || exit 0 - -# Read configuration variable file if it is present -[ -r /etc/default/$NAME ] && . /etc/default/$NAME - -# Load the VERBOSE setting and other rcS variables -. /lib/init/vars.sh - -# Define LSB log_* functions. -# Depend on lsb-base (>= 3.0-6) to ensure that this file is present. -. /lib/lsb/init-functions - -# -# Function that starts the daemon/service -# -do_start() -{ - # Return - # 0 if daemon has been started - # 1 if daemon was already running - # 2 if daemon could not be started - - mkdir -p /var/run/$NAME - chown $USER /var/run/$NAME - start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \ - || return 1 - start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \ - $DAEMON_ARGS \ - || return 2 - # Add code here, if necessary, that waits for the process to be ready - # to handle requests from services started subsequently which depend - # on this one. As a last resort, sleep for some time. -} - -# -# Function that stops the daemon/service -# -do_stop() -{ - # Return - # 0 if daemon has been stopped - # 1 if daemon was already stopped - # 2 if daemon could not be stopped - # other if a failure occurred - start-stop-daemon --stop --signal 9 --pidfile $PIDFILE - RETVAL="$?" - [ "$RETVAL" = 2 ] && return 2 - rm -f /var/run/$NAME/* - return "$RETVAL" -} - -# -# Function that stops the daemon/service -# -#do_graceful_stop() -#{ -# PID=`cat $PIDFILE` -# kill -USR1 $PID -# -# # wait until really stopped -# if [ -n "${PID:-}" ]; then -# i=0 -# while kill -0 "${PID:-}" 2> /dev/null; do -# if [ $i -eq '0' ]; then -# echo -n " ... waiting " -# else -# echo -n "." -# fi -# i=$(($i+1)) -# sleep 1 -# done -# fi -# -# rm -f /var/run/$NAME/* -#} - -# -# Function that sends a SIGHUP to the daemon/service -# -#do_reload() { -# # -# # If the daemon can reload its configuration without -# # restarting (for example, when it is sent a SIGHUP), -# # then implement that here. -# # -# start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name zuul-server -# return 0 -#} - -case "$1" in - start) - [ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME" - do_start - case "$?" in - 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; - 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; - esac - ;; - stop) - [ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME" - do_stop - case "$?" in - 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; - 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; - esac - ;; - status) - status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $? - ;; -# reload) -# # -# # If do_reload() is not implemented then leave this commented out -# # and leave 'force-reload' as an alias for 'restart'. -# # -# log_daemon_msg "Reloading $DESC" "$NAME" -# do_reload -# log_end_msg $? -# ;; - restart|force-reload) - # - # If the "reload" option is implemented then remove the - # 'force-reload' alias - # - log_daemon_msg "Restarting $DESC" "$NAME" - do_stop - do_start - ;; - *) - echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2 - exit 3 - ;; -esac - -: diff --git a/modules/log_processor/files/log-gearman-client.py b/modules/log_processor/files/log-gearman-client.py deleted file mode 100644 index 03aa038b51..0000000000 --- a/modules/log_processor/files/log-gearman-client.py +++ /dev/null @@ -1,214 +0,0 @@ -#!/usr/bin/python2 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import daemon -import gear -import json -import logging -import os -import os.path -import re -import signal -import threading -import yaml -import zmq - - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -class EventProcessor(threading.Thread): - def __init__(self, zmq_address, gearman_client, files, source_url): - threading.Thread.__init__(self) - self.files = files - self.source_url = source_url - self.gearman_client = gearman_client - self.zmq_address = zmq_address - self._connect_zmq() - - def run(self): - while True: - try: - self._read_event() - except: - # Assume that an error reading data from zmq or deserializing - # data received from zmq indicates a zmq error and reconnect. - logging.exception("ZMQ exception.") - self._connect_zmq() - - def _connect_zmq(self): - logging.debug("Connecting to zmq endpoint.") - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) - event_filter = b"onFinalized" - self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) - self.socket.connect(self.zmq_address) - - def _read_event(self): - string = self.socket.recv().decode('utf-8') - event = json.loads(string.split(None, 1)[1]) - logging.debug("Jenkins event received: " + json.dumps(event)) - for fileopts in self.files: - output = {} - source_url, out_event = self._parse_event(event, fileopts) - job_filter = fileopts.get('job-filter') - if (job_filter and - not re.match(job_filter, out_event['fields']['build_name'])): - continue - build_queue_filter = fileopts.get('build-queue-filter') - if (build_queue_filter and - not re.match(build_queue_filter, - out_event['fields']['build_queue'])): - continue - output['source_url'] = source_url - output['retry'] = fileopts.get('retry-get', False) - output['event'] = out_event - if 'subunit' in fileopts.get('name'): - job = gear.Job(b'push-subunit', - json.dumps(output).encode('utf8')) - else: - job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) - try: - self.gearman_client.submitJob(job) - except: - logging.exception("Exception submitting job to Gearman.") - - def _get_log_dir(self, event): - parameters = event["build"].get("parameters", {}) - base = parameters.get('LOG_PATH', 'UNKNOWN') - return base - - def _parse_fields(self, event, filename): - fields = {} - fields["filename"] = filename - fields["build_name"] = event.get("name", "UNKNOWN") - fields["build_status"] = event["build"].get("status", "UNKNOWN") - fields["build_node"] = event["build"].get("node_name", "UNKNOWN") - fields["build_master"] = event["build"].get("host_name", "UNKNOWN") - parameters = event["build"].get("parameters", {}) - fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN") - # TODO(clarkb) can we do better without duplicated data here? - fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN") - fields["build_short_uuid"] = fields["build_uuid"][:7] - fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN") - fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN") - fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN") - if parameters.get("ZUUL_CHANGE"): - fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN") - fields["build_patchset"] = parameters.get("ZUUL_PATCHSET", - "UNKNOWN") - elif parameters.get("ZUUL_NEWREV"): - fields["build_newrev"] = parameters.get("ZUUL_NEWREV", - "UNKNOWN") - return fields - - def _parse_event(self, event, fileopts): - fields = self._parse_fields(event, fileopts['name']) - log_dir = self._get_log_dir(event) - source_url = fileopts.get('source-url', self.source_url) + '/' + \ - os.path.join(log_dir, fileopts['name']) - fields["log_url"] = source_url - out_event = {} - out_event["fields"] = fields - out_event["tags"] = [os.path.basename(fileopts['name'])] + \ - fileopts.get('tags', []) - return source_url, out_event - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.source_url = self.config['source-url'] - # Pythong logging output file. - self.debuglog = debuglog - self.processors = [] - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def setup_processors(self): - for publisher in self.config['zmq-publishers']: - gearclient = gear.Client() - gearclient.addServer('localhost') - gearclient.waitForServer() - log_processor = EventProcessor( - publisher, gearclient, - self.config['source-files'], self.source_url) - subunit_processor = EventProcessor( - publisher, gearclient, - self.config['subunit-files'], self.source_url) - self.processors.append(log_processor) - self.processors.append(subunit_processor) - - def main(self): - statsd_host = os.environ.get('STATSD_HOST') - statsd_port = int(os.environ.get('STATSD_PORT', 8125)) - statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard') - self.gearserver = gear.Server( - statsd_host=statsd_host, - statsd_port=statsd_port, - statsd_prefix=statsd_prefix) - - self.setup_processors() - for processor in self.processors: - processor.daemon = True - processor.start() - while True: - signal.pause() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-client.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/modules/log_processor/files/log-gearman-worker.py b/modules/log_processor/files/log-gearman-worker.py deleted file mode 100644 index 66007cb770..0000000000 --- a/modules/log_processor/files/log-gearman-worker.py +++ /dev/null @@ -1,430 +0,0 @@ -#!/usr/bin/python2 -# -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import argparse -import cStringIO -import daemon -import gear -import gzip -import json -import logging -import os -import Queue -import re -import select -import socket -import subprocess -import sys -import threading -import time -import urllib2 -import yaml - - -try: - import daemon.pidlockfile as pidfile_mod -except ImportError: - import daemon.pidfile as pidfile_mod - - -def semi_busy_wait(seconds): - # time.sleep() may return early. If it does sleep() again and repeat - # until at least the number of seconds specified has elapsed. - start_time = time.time() - while True: - time.sleep(seconds) - cur_time = time.time() - seconds = seconds - (cur_time - start_time) - if seconds <= 0.0: - return - - -class FilterException(Exception): - pass - - -class CRM114Filter(object): - def __init__(self, script, path, build_status): - self.p = None - self.script = script - self.path = path - self.build_status = build_status - if build_status not in ['SUCCESS', 'FAILURE']: - return - if not os.path.exists(path): - os.makedirs(path) - args = [script, path, build_status] - self.p = subprocess.Popen(args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - close_fds=True) - - def process(self, data): - if not self.p: - return - self.p.stdin.write(data['message'].encode('utf-8') + '\n') - (r, w, x) = select.select([self.p.stdout], [], - [self.p.stdin, self.p.stdout], 20) - if not r: - self.p.kill() - raise FilterException('Timeout reading from CRM114') - r = self.p.stdout.readline() - if not r: - err = self.p.stderr.read() - if err: - raise FilterException(err) - else: - raise FilterException('Early EOF from CRM114') - r = r.strip() - data['error_pr'] = float(r) - - def _catchOSError(self, method): - try: - method() - except OSError: - logging.exception("Subprocess cleanup failed.") - - def close(self): - if not self.p: - return - # CRM114 should die when its stdinput is closed. Close that - # fd along with stdout and stderr then return. - self._catchOSError(self.p.stdin.close) - self._catchOSError(self.p.stdout.close) - self._catchOSError(self.p.stderr.close) - self._catchOSError(self.p.wait) - - -class CRM114FilterFactory(object): - name = "CRM114" - - def __init__(self, script, basepath): - self.script = script - self.basepath = basepath - - def create(self, fields): - filename = re.sub('\.', '_', fields['filename']) - path = os.path.join(self.basepath, filename) - return CRM114Filter(self.script, path, fields['build_status']) - - -class LogRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, logq): - threading.Thread.__init__(self) - self.gearman_worker = gearman_worker - self.filters = filters - self.logq = logq - - def run(self): - while True: - try: - self._handle_event() - except: - logging.exception("Exception retrieving log event.") - - def _handle_event(self): - job = self.gearman_worker.getJob() - try: - arguments = json.loads(job.arguments.decode('utf-8')) - source_url = arguments['source_url'] - retry = arguments['retry'] - event = arguments['event'] - logging.debug("Handling event: " + json.dumps(event)) - fields = event.get('fields') or event.get('@fields') - tags = event.get('tags') or event.get('@tags') - if fields['build_status'] != 'ABORTED': - # Handle events ignoring aborted builds. These builds are - # discarded by zuul. - log_lines = self._retrieve_log(source_url, retry) - - try: - all_filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - all_filters.append(f.create(fields)) - filters = all_filters - - logging.debug("Pushing " + str(len(log_lines)) + - " log lines.") - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in log_lines: - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - try: - f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - self.logq.put(out_event) - finally: - for f in all_filters: - f.close() - job.sendWorkComplete() - except Exception as e: - logging.exception("Exception handling log event.") - job.sendWorkException(str(e).encode('utf-8')) - - def _retrieve_log(self, source_url, retry): - # TODO (clarkb): This should check the content type instead of file - # extension for determining if gzip was used. - gzipped = False - raw_buf = b'' - try: - gzipped, raw_buf = self._get_log_data(source_url, retry) - except urllib2.HTTPError as e: - if e.code == 404: - logging.info("Unable to retrieve %s: HTTP error 404" % - source_url) - else: - logging.exception("Unable to get log data.") - except Exception: - # Silently drop fatal errors when retrieving logs. - # TODO (clarkb): Handle these errors. - # Perhaps simply add a log message to raw_buf? - logging.exception("Unable to get log data.") - if gzipped: - logging.debug("Decompressing gzipped source file.") - raw_strIO = cStringIO.StringIO(raw_buf) - f = gzip.GzipFile(fileobj=raw_strIO) - buf = f.read().decode('utf-8') - raw_strIO.close() - f.close() - else: - logging.debug("Decoding source file.") - buf = raw_buf.decode('utf-8') - return buf.splitlines() - - def _get_log_data(self, source_url, retry): - gzipped = False - try: - # TODO(clarkb): We really should be using requests instead - # of urllib2. urllib2 will automatically perform a POST - # instead of a GET if we provide urlencoded data to urlopen - # but we need to do a GET. The parameters are currently - # hardcoded so this should be ok for now. - logging.debug("Retrieving: " + source_url + ".gz?level=INFO") - req = urllib2.Request(source_url + ".gz?level=INFO") - req.add_header('Accept-encoding', 'gzip') - r = urllib2.urlopen(req) - except urllib2.URLError: - try: - # Fallback on GETting unzipped data. - logging.debug("Retrieving: " + source_url + "?level=INFO") - r = urllib2.urlopen(source_url + "?level=INFO") - except: - logging.exception("Unable to retrieve source file.") - raise - except: - logging.exception("Unable to retrieve source file.") - raise - if ('gzip' in r.info().get('Content-Type', '') or - 'gzip' in r.info().get('Content-Encoding', '')): - gzipped = True - - raw_buf = r.read() - # Hack to read all of Jenkins console logs as they upload - # asynchronously. After each attempt do an exponential backup before - # the next request for up to 255 seconds total, if we do not - # retrieve the entire file. Short circuit when the end of file string - # for console logs, '\n\n', is read. - if (retry and not gzipped and - raw_buf[-8:].decode('utf-8') != '\n\n'): - content_len = len(raw_buf) - backoff = 1 - while backoff < 129: - # Try for up to 255 seconds to retrieve the complete log file. - try: - logging.debug(str(backoff) + " Retrying fetch of: " + - source_url + "?level=INFO") - logging.debug("Fetching bytes=" + str(content_len) + '-') - req = urllib2.Request(source_url + "?level=INFO") - req.add_header('Range', 'bytes=' + str(content_len) + '-') - r = urllib2.urlopen(req) - raw_buf += r.read() - content_len = len(raw_buf) - except urllib2.HTTPError as e: - if e.code == 416: - logging.exception("Index out of range.") - else: - raise - finally: - if raw_buf[-8:].decode('utf-8') == '\n\n': - break - semi_busy_wait(backoff) - backoff += backoff - - return gzipped, raw_buf - - -class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False): - self.logq = logq - self.pretty_print = pretty_print - - def handle_log_event(self): - log = self.logq.get() - if self.pretty_print: - print(json.dumps(log, sort_keys=True, - indent=4, separators=(',', ': '))) - else: - print(json.dumps(log)) - # Push each log event through to keep logstash up to date. - sys.stdout.flush() - - -class INETLogProcessor(object): - socket_type = None - - def __init__(self, logq, host, port): - self.logq = logq - self.host = host - self.port = port - self._connect_socket() - - def _connect_socket(self): - logging.debug("Creating socket.") - self.socket = socket.socket(socket.AF_INET, self.socket_type) - self.socket.connect((self.host, self.port)) - - def handle_log_event(self): - log = self.logq.get() - try: - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - except: - logging.exception("Exception sending INET event.") - # Logstash seems to take about a minute to start again. Wait 90 - # seconds before attempting to reconnect. If logstash is not - # available after 90 seconds we will throw another exception and - # die. - semi_busy_wait(90) - self._connect_socket() - self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - - -class UDPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_DGRAM - - -class TCPLogProcessor(INETLogProcessor): - socket_type = socket.SOCK_STREAM - - -class Server(object): - def __init__(self, config, debuglog): - # Config init. - self.config = config - self.gearman_host = self.config['gearman-host'] - self.gearman_port = self.config['gearman-port'] - self.output_host = self.config['output-host'] - self.output_port = self.config['output-port'] - self.output_mode = self.config['output-mode'] - # Pythong logging output file. - self.debuglog = debuglog - self.retriever = None - self.logqueue = Queue.Queue(131072) - self.processor = None - self.filter_factories = [] - crmscript = self.config.get('crm114-script') - crmdata = self.config.get('crm114-data') - if crmscript and crmdata: - self.filter_factories.append( - CRM114FilterFactory(crmscript, crmdata)) - - def setup_logging(self): - if self.debuglog: - logging.basicConfig(format='%(asctime)s %(message)s', - filename=self.debuglog, level=logging.DEBUG) - else: - # Prevent leakage into the logstash log stream. - logging.basicConfig(level=logging.CRITICAL) - logging.debug("Log pusher starting.") - - def setup_retriever(self): - hostname = socket.gethostname() - gearman_worker = gear.Worker(hostname + b'-pusher') - gearman_worker.addServer(self.gearman_host, - self.gearman_port) - gearman_worker.registerFunction(b'push-log') - self.retriever = LogRetriever(gearman_worker, self.filter_factories, - self.logqueue) - - def setup_processor(self): - if self.output_mode == "tcp": - self.processor = TCPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - elif self.output_mode == "udp": - self.processor = UDPLogProcessor(self.logqueue, - self.output_host, - self.output_port) - else: - # Note this processor will not work if the process is run as a - # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue) - - def main(self): - self.setup_retriever() - self.setup_processor() - - self.retriever.daemon = True - self.retriever.start() - - while True: - try: - self.processor.handle_log_event() - except: - logging.exception("Exception processing log event.") - raise - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", required=True, - help="Path to yaml config file.") - parser.add_argument("-d", "--debuglog", - help="Enable debug log. " - "Specifies file to write log to.") - parser.add_argument("--foreground", action='store_true', - help="Run in the foreground.") - parser.add_argument("-p", "--pidfile", - default="/var/run/jenkins-log-pusher/" - "jenkins-log-gearman-worker.pid", - help="PID file to lock during daemonization.") - args = parser.parse_args() - - with open(args.config, 'r') as config_stream: - config = yaml.load(config_stream) - server = Server(config, args.debuglog) - - if args.foreground: - server.setup_logging() - server.main() - else: - pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) - with daemon.DaemonContext(pidfile=pidfile): - server.setup_logging() - server.main() - - -if __name__ == '__main__': - main() diff --git a/modules/log_processor/manifests/client.pp b/modules/log_processor/manifests/client.pp deleted file mode 100644 index 8081e57153..0000000000 --- a/modules/log_processor/manifests/client.pp +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# == Class: log_processor::client -# -class log_processor::client ( - $config_file, - $statsd_host = '', -) { - - file { '/etc/logstash/jenkins-log-client.yaml': - ensure => present, - owner => 'root', - group => 'root', - mode => '0555', - source => $config_file, - require => File['/etc/logstash'], - } - - file { '/etc/init.d/jenkins-log-client': - ensure => present, - owner => 'root', - group => 'root', - mode => '0555', - source => 'puppet:///modules/log_processor/jenkins-log-client.init', - require => [ - File['/usr/local/bin/log-gearman-client.py'], - File['/etc/logstash/jenkins-log-client.yaml'], - File['/etc/default/jenkins-log-client'], - ], - } - - file { '/etc/default/jenkins-log-client': - ensure => present, - owner => 'root', - group => 'root', - mode => '0444', - content => template('log_processor/jenkins-log-client.default.erb'), - } - - service { 'jenkins-log-client': - enable => true, - hasrestart => true, - subscribe => File['/etc/logstash/jenkins-log-client.yaml'], - require => File['/etc/init.d/jenkins-log-client'], - } - - include logrotate - logrotate::file { 'log-client-debug.log': - log => '/var/log/logstash/log-client-debug.log', - options => [ - 'compress', - 'copytruncate', - 'missingok', - 'rotate 7', - 'daily', - 'notifempty', - ], - require => Service['jenkins-log-client'], - } -} diff --git a/modules/log_processor/manifests/init.pp b/modules/log_processor/manifests/init.pp deleted file mode 100644 index 661ca0ed6d..0000000000 --- a/modules/log_processor/manifests/init.pp +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# == Class: log_processor -# -class log_processor ( -) { - package { 'python-daemon': - ensure => present, - } - - package { 'python-zmq': - ensure => present, - } - - package { 'python-yaml': - ensure => present, - } - - package { 'crm114': - ensure => present, - } - - include pip - package { 'gear': - ensure => latest, - provider => 'pip', - require => Class['pip'], - } - - package { 'statsd': - ensure => latest, - provider => 'pip', - require => Class['pip'], - } - - file { '/var/lib/crm114': - ensure => directory, - owner => 'logstash', - group => 'logstash', - require => User['logstash'], - } - - file { '/usr/local/bin/classify-log.crm': - ensure => present, - owner => 'root', - group => 'root', - mode => '0755', - source => 'puppet:///modules/log_processor/classify-log.crm', - require => [ - Package['crm114'], - ], - } - - file { '/usr/local/bin/log-gearman-client.py': - ensure => present, - owner => 'root', - group => 'root', - mode => '0755', - source => 'puppet:///modules/log_processor/log-gearman-client.py', - require => [ - Package['python-daemon'], - Package['python-zmq'], - Package['python-yaml'], - Package['gear'], - ], - } - - file { '/usr/local/bin/log-gearman-worker.py': - ensure => present, - owner => 'root', - group => 'root', - mode => '0755', - source => 'puppet:///modules/log_processor/log-gearman-worker.py', - require => [ - Package['python-daemon'], - Package['python-zmq'], - Package['python-yaml'], - Package['gear'], - ], - } -} diff --git a/modules/log_processor/manifests/worker.pp b/modules/log_processor/manifests/worker.pp deleted file mode 100644 index ec5653bdac..0000000000 --- a/modules/log_processor/manifests/worker.pp +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# == Class: log_processor::worker -# -define log_processor::worker ( - $config_file, -) { - $suffix = "-${name}" - - file { "/etc/logstash/jenkins-log-worker${suffix}.yaml": - ensure => present, - owner => 'root', - group => 'root', - mode => '0555', - source => $config_file, - require => Class['logstash::indexer'], - } - - file { "/etc/init.d/jenkins-log-worker${suffix}": - ensure => present, - owner => 'root', - group => 'root', - mode => '0555', - content => template('log_processor/jenkins-log-worker.init.erb'), - require => [ - File['/usr/local/bin/log-gearman-worker.py'], - File["/etc/logstash/jenkins-log-worker${suffix}.yaml"], - ], - } - - service { "jenkins-log-worker${suffix}": - enable => true, - hasrestart => true, - subscribe => File["/etc/logstash/jenkins-log-worker${suffix}.yaml"], - require => [ - Class['logstash::indexer'], - File["/etc/init.d/jenkins-log-worker${suffix}"], - ], - } - - include logrotate - logrotate::file { "log-worker${suffix}-debug.log": - log => "/var/log/logstash/log-worker${suffix}-debug.log", - options => [ - 'compress', - 'copytruncate', - 'missingok', - 'rotate 7', - 'daily', - 'notifempty', - ], - require => Service["jenkins-log-worker${suffix}"], - } -} diff --git a/modules/log_processor/templates/jenkins-log-client.default.erb b/modules/log_processor/templates/jenkins-log-client.default.erb deleted file mode 100644 index a048fad87d..0000000000 --- a/modules/log_processor/templates/jenkins-log-client.default.erb +++ /dev/null @@ -1,5 +0,0 @@ -<% if scope.lookupvar("log_processor::client::statsd_host") != "" %> -export STATSD_HOST=<%= scope.lookupvar("log_processor::client::statsd_host") %> -export STATSD_PORT=8125 -export STATSD_PREFIX="logstash.geard" -<% end %> diff --git a/modules/log_processor/templates/jenkins-log-worker.init.erb b/modules/log_processor/templates/jenkins-log-worker.init.erb deleted file mode 100755 index 152a97b1f8..0000000000 --- a/modules/log_processor/templates/jenkins-log-worker.init.erb +++ /dev/null @@ -1,158 +0,0 @@ -#! /bin/sh -### BEGIN INIT INFO -# Provides: jenkins-log-worker -# Required-Start: $remote_fs $syslog -# Required-Stop: $remote_fs $syslog -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: Jenkins Log Worker -# Description: Service to push Jenkins logs into logstash. -### END INIT INFO - -# Do NOT "set -e" - -# PATH should only include /usr/* if it runs after the mountnfs.sh script -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DESC="Jenkins Log Worker" -NAME=jenkins-log-worker<%= suffix %> -DAEMON=/usr/local/bin/log-gearman-worker.py -PIDFILE=/var/run/$NAME/$NAME.pid -DAEMON_ARGS="-c /etc/logstash/jenkins-log-worker<%= suffix %>.yaml -d /var/log/logstash/log-worker<%= suffix %>-debug.log -p $PIDFILE" -SCRIPTNAME=/etc/init.d/$NAME -USER=logstash - -# Exit if the package is not installed -[ -x "$DAEMON" ] || exit 0 - -# Read configuration variable file if it is present -[ -r /etc/default/$NAME ] && . /etc/default/$NAME - -# Load the VERBOSE setting and other rcS variables -. /lib/init/vars.sh - -# Define LSB log_* functions. -# Depend on lsb-base (>= 3.0-6) to ensure that this file is present. -. /lib/lsb/init-functions - -# -# Function that starts the daemon/service -# -do_start() -{ - # Return - # 0 if daemon has been started - # 1 if daemon was already running - # 2 if daemon could not be started - - mkdir -p /var/run/$NAME - chown $USER /var/run/$NAME - start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \ - || return 1 - start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \ - $DAEMON_ARGS \ - || return 2 - # Add code here, if necessary, that waits for the process to be ready - # to handle requests from services started subsequently which depend - # on this one. As a last resort, sleep for some time. -} - -# -# Function that stops the daemon/service -# -do_stop() -{ - # Return - # 0 if daemon has been stopped - # 1 if daemon was already stopped - # 2 if daemon could not be stopped - # other if a failure occurred - start-stop-daemon --stop --signal 9 --pidfile $PIDFILE - RETVAL="$?" - [ "$RETVAL" = 2 ] && return 2 - rm -f /var/run/$NAME/* - return "$RETVAL" -} - -# -# Function that stops the daemon/service -# -#do_graceful_stop() -#{ -# PID=`cat $PIDFILE` -# kill -USR1 $PID -# -# # wait until really stopped -# if [ -n "${PID:-}" ]; then -# i=0 -# while kill -0 "${PID:-}" 2> /dev/null; do -# if [ $i -eq '0' ]; then -# echo -n " ... waiting " -# else -# echo -n "." -# fi -# i=$(($i+1)) -# sleep 1 -# done -# fi -# -# rm -f /var/run/$NAME/* -#} - -# -# Function that sends a SIGHUP to the daemon/service -# -#do_reload() { -# # -# # If the daemon can reload its configuration without -# # restarting (for example, when it is sent a SIGHUP), -# # then implement that here. -# # -# start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name zuul-server -# return 0 -#} - -case "$1" in - start) - [ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME" - do_start - case "$?" in - 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; - 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; - esac - ;; - stop) - [ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME" - do_stop - case "$?" in - 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; - 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; - esac - ;; - status) - status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $? - ;; -# reload) -# # -# # If do_reload() is not implemented then leave this commented out -# # and leave 'force-reload' as an alias for 'restart'. -# # -# log_daemon_msg "Reloading $DESC" "$NAME" -# do_reload -# log_end_msg $? -# ;; - restart|force-reload) - # - # If the "reload" option is implemented then remove the - # 'force-reload' alias - # - log_daemon_msg "Restarting $DESC" "$NAME" - do_stop - do_start - ;; - *) - echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2 - exit 3 - ;; -esac - -: