diff --git a/modules/log_processor/files/log-gearman-client.py b/modules/log_processor/files/log-gearman-client.py index e9d8383838..e699796402 100644 --- a/modules/log_processor/files/log-gearman-client.py +++ b/modules/log_processor/files/log-gearman-client.py @@ -75,7 +75,11 @@ class EventProcessor(threading.Thread): output['source_url'] = source_url output['retry'] = fileopts.get('retry-get', False) output['event'] = out_event - job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) + if 'subunit' in fileopts.get('name'): + job = gear.Job(b'push-subunit', + json.dumps(output).encode('utf8')) + else: + job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) try: self.gearman_client.submitJob(job) except: @@ -146,10 +150,14 @@ class Server(object): gearclient = gear.Client() gearclient.addServer('localhost') gearclient.waitForServer() - processor = EventProcessor( + log_processor = EventProcessor( publisher, gearclient, self.config['source-files'], self.source_url) - self.processors.append(processor) + subunit_processor = EventProcessor( + publisher, gearclient, + self.config['subunit-files'], self.source_url) + self.processors.append(log_processor) + self.processors.append(subunit_processor) def main(self): statsd_host = os.environ.get('STATSD_HOST') diff --git a/modules/openstack_project/files/logstash/jenkins-log-client.yaml b/modules/openstack_project/files/logstash/jenkins-log-client.yaml index 453c122b20..4d872aab51 100644 --- a/modules/openstack_project/files/logstash/jenkins-log-client.yaml +++ b/modules/openstack_project/files/logstash/jenkins-log-client.yaml @@ -11,6 +11,13 @@ zmq-publishers: - tcp://jenkins06.openstack.org:8888 - tcp://jenkins07.openstack.org:8888 +subunit-files: + - name: logs/testrepository.subunit + retry-get: True + job-filter: 'gate-(tempest|grenade)-dsvm.*' + - name: logs/old/testrepository.subunit + job-filter: 'gate-grenade-dsvm.*' + # List of files to source logs from. source-files: - name: console.html diff --git a/modules/openstack_project/files/logstash/jenkins-subunit.worker.yaml b/modules/openstack_project/files/logstash/jenkins-subunit.worker.yaml new file mode 100644 index 0000000000..4f1f67ff2b --- /dev/null +++ b/modules/openstack_project/files/logstash/jenkins-subunit.worker.yaml @@ -0,0 +1,3 @@ +gearman-host: logstash.openstack.org +gearman-port: 4730 +config: /etc/logstash/subunit2sql.conf diff --git a/modules/openstack_project/manifests/subunit_worker.pp b/modules/openstack_project/manifests/subunit_worker.pp new file mode 100644 index 0000000000..07f7e04af3 --- /dev/null +++ b/modules/openstack_project/manifests/subunit_worker.pp @@ -0,0 +1,43 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# subunit2sql worker glue class. +# +class openstack_project::subunit_worker ( + $sysadmins = [], + $subunit2sql_db_uri +) { + class { 'openstack_project::server': + iptables_public_tcp_ports => [22], + sysadmins => $sysadmins, + } + + include subunit2sql + subunit2sql::worker { 'A': + config_file => 'puppet:///modules/openstack_project/logstash/jenkins-subunit-worker.yaml', + subunit2sql_db_uri => $subunit2sql_db_uri, + } + subunit2sql::worker { 'B': + config_file => 'puppet:///modules/openstack_project/logstash/jenkins-subunit-worker.yaml', + subunit2sql_db_uri => $subunit2sql_db_uri, + } + subunit2sql::worker { 'C': + config_file => 'puppet:///modules/openstack_project/logstash/jenkins-subunit-worker.yaml', + subunit2sql_db_uri => $subunit2sql_db_uri, + } + subunit2sql::worker { 'D': + config_file => 'puppet:///modules/openstack_project/logstash/jenkins-subunit-worker.yaml', + subunit2sql_db_uri => $subunit2sql_db_uri, + } +} diff --git a/modules/subunit2sql/files/subunit-gearman-worker.py b/modules/subunit2sql/files/subunit-gearman-worker.py new file mode 100644 index 0000000000..dd1d125619 --- /dev/null +++ b/modules/subunit2sql/files/subunit-gearman-worker.py @@ -0,0 +1,275 @@ +#!/usr/bin/python2 +# +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import cStringIO +import daemon +import gear +import gzip +import json +import logging +import os +import Queue +import socket +import subprocess +import threading +import time +import urllib2 +import yaml + +from subunit2sql import read_subunit +from subunit2sql import shell + + +try: + import daemon.pidlockfile as pidfile_mod +except ImportError: + import daemon.pidfile as pidfile_mod + + +def semi_busy_wait(seconds): + # time.sleep() may return early. If it does sleep() again and repeat + # until at least the number of seconds specified has elapsed. + start_time = time.time() + while True: + time.sleep(seconds) + cur_time = time.time() + seconds = seconds - (cur_time - start_time) + if seconds <= 0.0: + return + + +class FilterException(Exception): + pass + + +class SubunitRetriever(threading.Thread): + def __init__(self, gearman_worker, filters, subunitq): + threading.Thread.__init__(self) + self.gearman_worker = gearman_worker + self.filters = filters + self.subunitq = subunitq + + def run(self): + while True: + try: + self._handle_event() + except: + logging.exception("Exception retrieving log event.") + + def _handle_event(self): + job = self.gearman_worker.getJob() + try: + arguments = json.loads(job.arguments.decode('utf-8')) + source_url = arguments['source_url'] + retry = arguments['retry'] + event = arguments['event'] + logging.debug("Handling event: " + json.dumps(event)) + fields = event.get('fields') or event.get('@fields') + if fields.pop('build_status') != 'ABORTED': + # Handle events ignoring aborted builds. These builds are + # discarded by zuul. + subunit_io = self._retrieve_subunit_v2(source_url, retry) + logging.debug("Pushing subunit files.") + out_event = fields.copy() + out_event["subunit"] = subunit_io + self.subunitq.put(out_event) + job.sendWorkComplete() + except Exception as e: + logging.exception("Exception handling log event.") + job.sendWorkException(str(e).encode('utf-8')) + + def _subunit_1_to_2(self, raw_file): + call = subprocess.Popen('subunit-1to2', stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + output, err = call.communicate(raw_file.read()) + if err: + raise Exception(err) + buf = cStringIO.StringIO(output) + return buf + + def _retrieve_subunit_v2(self, source_url, retry): + # TODO (clarkb): This should check the content type instead of file + # extension for determining if gzip was used. + gzipped = False + raw_buf = b'' + try: + gzipped, raw_buf = self._get_subunit_data(source_url, retry) + except urllib2.HTTPError as e: + if e.code == 404: + logging.info("Unable to retrieve %s: HTTP error 404" % + source_url) + else: + logging.exception("Unable to get log data.") + except Exception: + # Silently drop fatal errors when retrieving logs. + # TODO (clarkb): Handle these errors. + # Perhaps simply add a log message to raw_buf? + logging.exception("Unable to get log data.") + if gzipped: + logging.debug("Decompressing gzipped source file.") + raw_strIO = cStringIO.StringIO(raw_buf) + f = gzip.GzipFile(fileobj=raw_strIO) + buf = self._subunit_1_to_2(f) + raw_strIO.close() + f.close() + else: + logging.debug("Decoding source file.") + raw_strIO = cStringIO.StringIO(raw_buf) + buf = self._subunit_1_to_2(raw_strIO) + return buf + + def _get_subunit_data(self, source_url, retry): + gzipped = False + try: + # TODO(clarkb): We really should be using requests instead + # of urllib2. urllib2 will automatically perform a POST + # instead of a GET if we provide urlencoded data to urlopen + # but we need to do a GET. The parameters are currently + # hardcoded so this should be ok for now. + logging.debug("Retrieving: " + source_url + ".gz") + req = urllib2.Request(source_url + ".gz") + req.add_header('Accept-encoding', 'gzip') + r = urllib2.urlopen(req) + except urllib2.URLError: + try: + # Fallback on GETting unzipped data. + logging.debug("Retrieving: " + source_url) + r = urllib2.urlopen(source_url) + except: + logging.exception("Unable to retrieve source file.") + raise + except: + logging.exception("Unable to retrieve source file.") + raise + if ('gzip' in r.info().get('Content-Type', '') or + 'gzip' in r.info().get('Content-Encoding', '')): + gzipped = True + + raw_buf = r.read() + # Hack to read all of Jenkins console logs as they upload + return gzipped, raw_buf + + +class Subunit2SQLProcessor(object): + def __init__(self, subunitq, subunit2sql_conf): + self.subunitq = subunitq + self.config = subunit2sql_conf + # Initialize subunit2sql settings + shell.cli_opts() + shell.parse_args([], [self.config]) + + def handle_subunit_event(self): + # Pull subunit event from queue and separate stream from metadata + subunit = self.subunitq.get() + subunit_v2 = subunit.pop('subunit') + # Set run metadata from gearman + log_url = subunit.pop('log_url', None) + if log_url: + log_dir = os.path.dirname(os.path.dirname(log_url)) + shell.CONF.set_override('artifacts', log_dir) + shell.CONF.set_override('run_meta', subunit) + # Parse subunit stream and store in DB + logging.debug('Converting Subunit V2 stream to SQL') + stream = read_subunit.ReadSubunit(subunit_v2) + shell.process_results(stream.get_results()) + + +class Server(object): + def __init__(self, config, debuglog): + # Config init. + self.config = config + self.gearman_host = self.config['gearman-host'] + self.gearman_port = self.config['gearman-port'] + # Pythong logging output file. + self.debuglog = debuglog + self.retriever = None + self.subunitqueue = Queue.Queue(131072) + self.processor = None + self.filter_factories = [] + + def setup_logging(self): + if self.debuglog: + logging.basicConfig(format='%(asctime)s %(message)s', + filename=self.debuglog, level=logging.DEBUG) + else: + # Prevent leakage into the logstash log stream. + logging.basicConfig(level=logging.CRITICAL) + logging.debug("Log pusher starting.") + + def setup_retriever(self): + hostname = socket.gethostname() + gearman_worker = gear.Worker(hostname + b'-pusher') + gearman_worker.addServer(self.gearman_host, + self.gearman_port) + gearman_worker.registerFunction(b'push-subunit') + self.retriever = SubunitRetriever(gearman_worker, + self.filter_factories, + self.subunitqueue) + + def setup_processor(self): + # Note this processor will not work if the process is run as a + # daemon. You must use the --foreground option. + subunit2sql_config = self.config['config'] + self.processor = Subunit2SQLProcessor(self.subunitqueue, + subunit2sql_config) + + def main(self): + self.setup_retriever() + self.setup_processor() + + self.retriever.daemon = True + self.retriever.start() + + while True: + try: + self.processor.handle_subunit_event() + except: + logging.exception("Exception processing log event.") + raise + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", required=True, + help="Path to yaml config file.") + parser.add_argument("-d", "--debuglog", + help="Enable debug log. " + "Specifies file to write log to.") + parser.add_argument("--foreground", action='store_true', + help="Run in the foreground.") + parser.add_argument("-p", "--pidfile", + default="/var/run/jenkins-subunit-pusher/" + "jenkins-subunit-gearman-worker.pid", + help="PID file to lock during daemonization.") + args = parser.parse_args() + + with open(args.config, 'r') as config_stream: + config = yaml.load(config_stream) + server = Server(config, args.debuglog) + + if args.foreground: + server.setup_logging() + server.main() + else: + pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) + with daemon.DaemonContext(pidfile=pidfile): + server.setup_logging() + server.main() + + +if __name__ == '__main__': + main() diff --git a/modules/subunit2sql/manifests/init.pp b/modules/subunit2sql/manifests/init.pp new file mode 100644 index 0000000000..ad0279cf83 --- /dev/null +++ b/modules/subunit2sql/manifests/init.pp @@ -0,0 +1,52 @@ +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# == Class: subunit2sql +# +class subunit2sql ( +) { + include pip + + package {'python-mysqldb': + ensure => present, + } + + package {'python-psycopg2': + ensure => present, + } + + package { 'python-subunit': + ensure => latest, + provider => 'pip', + require => Class['pip'], + } + + package { 'subunit2sql': + ensure => latest, + provider => 'pip', + require => [ + Class['pip'], + Package['python-mysqldb'], + Package['python-psycopg2'] + ], + } + + package { 'testtools': + ensure => latest, + provider => 'pip', + require => Class['pip'], + } + +} diff --git a/modules/subunit2sql/manifests/worker.pp b/modules/subunit2sql/manifests/worker.pp new file mode 100644 index 0000000000..c7de964baa --- /dev/null +++ b/modules/subunit2sql/manifests/worker.pp @@ -0,0 +1,118 @@ +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# == Class: subunit_processor::worker +# +define subunit2sql::worker ( + $config_file, + $subunit2sql_db_uri, +) { + $suffix = "-${name}" + + package { 'python-daemon': + ensure => present, + } + + package { 'python-zmq': + ensure => present, + } + + package { 'python-yaml': + ensure => present, + } + + package { 'gear': + ensure => latest, + provider => 'pip', + require => Class['pip'], + } + + package { 'statsd': + ensure => latest, + provider => 'pip', + require => Class['pip'] + } + + file { '/usr/local/bin/subunit-gearman-worker.py': + ensure => present, + owner => 'root', + group => 'root', + mode => '0755', + source => 'puppet:///modules/log_processor/subunit-gearman-worker.py', + require => [ + Package['python-daemon'], + Package['python-zmq'], + Package['python-yaml'], + Package['gear'], + Package['subunit2sql'], + Package['python-subunit'], + Package['testtools'] + ], + } + + file { '/etc/logstash/subunit2sql.conf': + ensure => present, + owner => 'root', + group => 'root', + mode => '0555', + content => template('subunit2sql/subunit2sql.conf.erb'), + require => Class['logstash::indexer'], + } + + file { "/etc/logstash/jenkins-subunit-worker${suffix}.yaml": + ensure => present, + owner => 'root', + group => 'root', + mode => '0555', + source => $config_file, + require => Class['logstash::indexer'], + } + + file { "/etc/init.d/jenkins-subunit-worker${suffix}": + ensure => present, + owner => 'root', + group => 'root', + mode => '0555', + content => template('subunit2sql/jenkins-subunit-worker.init.erb'), + require => [ + File['/usr/local/bin/subunit-gearman-worker.py'], + File["/etc/logstash/jenkins-subunit-worker${suffix}.yaml"], + ], + } + + service { "jenkins-subunit-worker${suffix}": + enable => true, + hasrestart => true, + subscribe => File["/etc/logstash/jenkins-subunit-worker${suffix}.yaml"], + require => [ + Class['logstash::indexer'], + File["/etc/init.d/jenkins-subunit-worker${suffix}"], + ], + } + + include logrotate + logrotate::file { "subunit-worker${suffix}-debug.log": + log => "/var/log/logstash/subunit-worker${suffix}-debug.log", + options => [ + 'compress', + 'copytruncate', + 'missingok', + 'rotate 7', + 'daily', + 'notifempty', + ], + require => Service["jenkins-subunit-worker${suffix}"], + } +} diff --git a/modules/subunit2sql/templates/jenkins-subunit-worker.init.erb b/modules/subunit2sql/templates/jenkins-subunit-worker.init.erb new file mode 100755 index 0000000000..0b0fdd9462 --- /dev/null +++ b/modules/subunit2sql/templates/jenkins-subunit-worker.init.erb @@ -0,0 +1,158 @@ +#! /bin/sh +### BEGIN INIT INFO +# Provides: jenkins-subunit-worker +# Required-Start: $remote_fs $syslog +# Required-Stop: $remote_fs $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Jenkins Subunit Worker +# Description: Service to push Jenkins subunit files into a SQL DB. +### END INIT INFO + +# Do NOT "set -e" + +# PATH should only include /usr/* if it runs after the mountnfs.sh script +PATH=/sbin:/usr/sbin:/bin:/usr/bin +DESC="Jenkins Subunit Worker" +NAME=jenkins-subunit-worker<%= suffix %> +DAEMON=/usr/local/bin/subunit-gearman-worker.py +PIDFILE=/var/run/$NAME/$NAME.pid +DAEMON_ARGS="-c /etc/logstash/jenkins-subunit-worker<%= suffix %>.yaml -d /var/log/logstash/subunit-worker<%= suffix %>-debug.log -p $PIDFILE" +SCRIPTNAME=/etc/init.d/$NAME +USER=logstash + +# Exit if the package is not installed +[ -x "$DAEMON" ] || exit 0 + +# Read configuration variable file if it is present +[ -r /etc/default/$NAME ] && . /etc/default/$NAME + +# Load the VERBOSE setting and other rcS variables +. /lib/init/vars.sh + +# Define LSB log_* functions. +# Depend on lsb-base (>= 3.0-6) to ensure that this file is present. +. /lib/lsb/init-functions + +# +# Function that starts the daemon/service +# +do_start() +{ + # Return + # 0 if daemon has been started + # 1 if daemon was already running + # 2 if daemon could not be started + + mkdir -p /var/run/$NAME + chown $USER /var/run/$NAME + start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON --test > /dev/null \ + || return 1 + start-stop-daemon --start --quiet --pidfile $PIDFILE -c $USER --exec $DAEMON -- \ + $DAEMON_ARGS \ + || return 2 + # Add code here, if necessary, that waits for the process to be ready + # to handle requests from services started subsequently which depend + # on this one. As a last resort, sleep for some time. +} + +# +# Function that stops the daemon/service +# +do_stop() +{ + # Return + # 0 if daemon has been stopped + # 1 if daemon was already stopped + # 2 if daemon could not be stopped + # other if a failure occurred + start-stop-daemon --stop --signal 9 --pidfile $PIDFILE + RETVAL="$?" + [ "$RETVAL" = 2 ] && return 2 + rm -f /var/run/$NAME/* + return "$RETVAL" +} + +# +# Function that stops the daemon/service +# +#do_graceful_stop() +#{ +# PID=`cat $PIDFILE` +# kill -USR1 $PID +# +# # wait until really stopped +# if [ -n "${PID:-}" ]; then +# i=0 +# while kill -0 "${PID:-}" 2> /dev/null; do +# if [ $i -eq '0' ]; then +# echo -n " ... waiting " +# else +# echo -n "." +# fi +# i=$(($i+1)) +# sleep 1 +# done +# fi +# +# rm -f /var/run/$NAME/* +#} + +# +# Function that sends a SIGHUP to the daemon/service +# +#do_reload() { +# # +# # If the daemon can reload its configuration without +# # restarting (for example, when it is sent a SIGHUP), +# # then implement that here. +# # +# start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE --name zuul-server +# return 0 +#} + +case "$1" in + start) + [ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME" + do_start + case "$?" in + 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; + 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; + esac + ;; + stop) + [ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME" + do_stop + case "$?" in + 0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;; + 2) [ "$VERBOSE" != no ] && log_end_msg 1 ;; + esac + ;; + status) + status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $? + ;; +# reload) +# # +# # If do_reload() is not implemented then leave this commented out +# # and leave 'force-reload' as an alias for 'restart'. +# # +# log_daemon_msg "Reloading $DESC" "$NAME" +# do_reload +# log_end_msg $? +# ;; + restart|force-reload) + # + # If the "reload" option is implemented then remove the + # 'force-reload' alias + # + log_daemon_msg "Restarting $DESC" "$NAME" + do_stop + do_start + ;; + *) + echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2 + exit 3 + ;; +esac + +: diff --git a/modules/subunit2sql/templates/subunit2sql.conf.erb b/modules/subunit2sql/templates/subunit2sql.conf.erb new file mode 100644 index 0000000000..3d16db6db1 --- /dev/null +++ b/modules/subunit2sql/templates/subunit2sql.conf.erb @@ -0,0 +1,4 @@ +[DEFAULT] + +[database] +connection = <%= subunit2sql_db_uri %>