From 6501547284426c129f58d96f788f0c92b42f8abc Mon Sep 17 00:00:00 2001
From: "James E. Blair" <jeblair@openstack.org>
Date: Fri, 13 Dec 2013 21:34:16 +0000
Subject: [PATCH] Process logs with CRM114

Try to identify the probability that each log line indicates an error.
Pass that information on to logstash.

Change-Id: I0b298c2e8c00d8fdf1c907215a7bbf27086bc80c
---
 .../files/logstash/classify-log.crm           | 90 +++++++++++++++++++
 .../files/logstash/jenkins-log-worker.yaml    |  2 +
 .../files/logstash/log-gearman-worker.py      | 82 ++++++++++++++++-
 .../manifests/logstash_worker.pp              | 22 +++++
 4 files changed, 194 insertions(+), 2 deletions(-)
 create mode 100755 modules/openstack_project/files/logstash/classify-log.crm

diff --git a/modules/openstack_project/files/logstash/classify-log.crm b/modules/openstack_project/files/logstash/classify-log.crm
new file mode 100755
index 0000000000..a2e7a22945
--- /dev/null
+++ b/modules/openstack_project/files/logstash/classify-log.crm
@@ -0,0 +1,90 @@
+#! /usr/bin/crm
+#
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# This script trains an OSB (Orthogonal Sparse Bigram) bayesian filter
+# with log lines from test runs and classifies each line according to
+# the likelyhood it indicates an error.  Very little experimentation
+# has been done to determine the best classifier and training method;
+# further experimentation may be useful.
+
+# The training method is TET -- Train Every Thing.  This is not
+# normally advised as a training method for Bayesian filters.  In
+# experiments, it identified about twice as many lines as being
+# associated with errers as were indicated by a TOE (Train On Error)
+# method.  Some of them were false positives, but many were not, and
+# of those, it had a much higher (pR ~= 37) confidence in them than
+# TOE.  TET seems to give qualitatively better results when filtering
+# for higher pR values.
+
+# Set unbuffered IO
+window
+
+# Base component of path to data files
+isolate (:prefix:) /:*:_arg2:/
+
+# Whether this run is for a SUCCESS or FAILURE result
+isolate (:target:) /:*:_arg3:/
+
+# Train each file on a newline just to make sure it exists
+learn [:_nl:] <osb unique microgroom> (:*:prefix:/SUCCESS.css)
+learn [:_nl:] <osb unique microgroom> (:*:prefix:/FAILURE.css)
+{
+    # Iterate over each line
+    window <bychar> /\n/ /\n/
+    {
+        isolate (:stats:)
+        isolate (:result:)
+        isolate (:prob:)
+        isolate (:pr:)
+        # Save a copy of this line
+        isolate (:line:) /:*:_dw:/
+        {
+            {
+                # Remove things that look like timestamps from the beginning of the line
+                match (:timestamp:) /^[-.0-9 |:]+/
+                alter (:timestamp:) //
+            }
+            # Train on the line
+            learn <osb unique microgroom> (:*:prefix:/:*:target:.css)
+            # Classify the line to see if it looks more like a SUCCESS or FAILURE line
+            classify <osb unique microgroom> (:*:prefix:/SUCCESS.css :*:prefix:/FAILURE.css) (:stats:)
+            {
+                # The stats variable looks like:
+                #   CLASSIFY succeeds; success probability: 1.0000  pR: 304.6527
+                #   Best match to file #0 (/tmp/crm114/console_html/SUCCESS.css) prob: 0.9933  pR: 2.1720
+                #   Total features in input file: 20
+                #   #0 (/tmp/crm114/console_html/SUCCESS.css): features: 3544235, hits: 901854, prob: 9.93e-01, pR:   2.17
+                #   #1 (/tmp/crm114/console_html/FAILURE.css): features: 1, hits: 0, prob: 6.69e-03, pR:  -2.17
+                # Pull out the filename, probability, and pR (a kind of logarithmic probability, see CRM docs)
+                match [:stats:] <nomultiline> /^Best match to .*\/([A-Za-z]+).css\) prob: ([-.0-9]+)  pR: ([-.0-9]+)/ ( :: :result: :prob: :pr: )
+                {
+                    # If this line is classified as FAILURE, negate
+                    # the pR value (which will always be positive).
+                    # Do this by prepending a '-' or the empty string.
+                    {
+                        match [:result:] /FAILURE/
+                        alter (:result:) /-/
+                    } alius {
+                        alter (:result:) //
+                    }
+                }
+                # Output the sign and pR value for this line.
+                output /:*:result::*:pr:\n/
+            }
+        }
+    }
+    liaf
+}
diff --git a/modules/openstack_project/files/logstash/jenkins-log-worker.yaml b/modules/openstack_project/files/logstash/jenkins-log-worker.yaml
index 5f601601f8..6702f1b972 100644
--- a/modules/openstack_project/files/logstash/jenkins-log-worker.yaml
+++ b/modules/openstack_project/files/logstash/jenkins-log-worker.yaml
@@ -3,3 +3,5 @@ gearman-port: 4730
 output-host: localhost
 output-port: 9999
 output-mode: tcp
+crm114-script: /usr/local/bin/classify-log.crm
+crm114-data: /var/lib/crm114
diff --git a/modules/openstack_project/files/logstash/log-gearman-worker.py b/modules/openstack_project/files/logstash/log-gearman-worker.py
index 39f3d4b2c6..ea8bcd9051 100644
--- a/modules/openstack_project/files/logstash/log-gearman-worker.py
+++ b/modules/openstack_project/files/logstash/log-gearman-worker.py
@@ -21,8 +21,12 @@ import gear
 import gzip
 import json
 import logging
+import os
 import Queue
+import re
+import select
 import socket
+import subprocess
 import sys
 import threading
 import time
@@ -48,10 +52,68 @@ def semi_busy_wait(seconds):
             return
 
 
+class CRM114Filter(object):
+    def __init__(self, script, path, build_status):
+        self.p = None
+        self.script = script
+        self.path = path
+        self.build_status = build_status
+        if build_status not in ['SUCCESS', 'FAILURE']:
+            return
+        if not os.path.exists(path):
+            os.makedirs(path)
+        args = [script, path, build_status]
+        self.p = subprocess.Popen(args,
+                                  stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE,
+                                  stdin=subprocess.PIPE)
+
+    def process(self, data):
+        if not self.p:
+            return
+        self.p.stdin.write(data['message'].encode('utf-8') + '\n')
+        (r, w, x) = select.select([self.p.stdout], [],
+                                  [self.p.stdin, self.p.stdout], 20)
+        if not r:
+            self.p.kill()
+            raise Exception('Timeout reading from CRM114')
+        r = self.p.stdout.readline()
+        if not r:
+            err = self.p.stderr.read()
+            if err:
+                raise Exception(err)
+            else:
+                raise Exception('Early EOF from CRM114')
+        r = r.strip()
+        data['error_pr'] = float(r)
+
+    def close(self):
+        if not self.p:
+            return
+        self.p.stdin.close()
+        self.p.stdout.read()
+        self.p.stderr.read()
+        self.p.wait()
+
+
+class CRM114FilterFactory(object):
+    name = "CRM114"
+
+    def __init__(self, script, basepath):
+        self.script = script
+        self.basepath = basepath
+
+    def create(self, fields):
+        filename = re.sub('\.', '_', fields['filename'])
+        path = os.path.join(self.basepath, filename)
+        return CRM114Filter(self.script, path, fields['build_status'])
+
+
 class LogRetriever(threading.Thread):
-    def __init__(self, gearman_worker, logq):
+    def __init__(self, gearman_worker, filters, logq):
         threading.Thread.__init__(self)
         self.gearman_worker = gearman_worker
+        self.filters = filters
         self.logq = logq
 
     def run(self):
@@ -76,6 +138,11 @@ class LogRetriever(threading.Thread):
                 # discarded by zuul.
                 log_lines = self._retrieve_log(source_url, retry)
 
+                filters = []
+                for f in self.filters:
+                    logging.debug("Adding filter: %s" % f.name)
+                    filters.append(f.create(fields))
+
                 logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
                 base_event = {}
                 base_event.update(fields)
@@ -83,7 +150,11 @@ class LogRetriever(threading.Thread):
                 for line in log_lines:
                     out_event = base_event.copy()
                     out_event["message"] = line
+                    for f in filters:
+                        f.process(out_event)
                     self.logq.put(out_event)
+                for f in filters:
+                    f.close()
             job.sendWorkComplete()
         except Exception as e:
             logging.exception("Exception handling log event.")
@@ -248,6 +319,12 @@ class Server(object):
         self.retriever = None
         self.logqueue = Queue.Queue(131072)
         self.processor = None
+        self.filter_factories = []
+        crmscript = self.config.get('crm114-script')
+        crmdata = self.config.get('crm114-data')
+        if crmscript and crmdata:
+            self.filter_factories.append(
+                CRM114FilterFactory(crmscript, crmdata))
 
     def setup_logging(self):
         if self.debuglog:
@@ -264,7 +341,8 @@ class Server(object):
         gearman_worker.addServer(self.gearman_host,
                                  self.gearman_port)
         gearman_worker.registerFunction(b'push-log')
-        self.retriever = LogRetriever(gearman_worker, self.logqueue)
+        self.retriever = LogRetriever(gearman_worker, self.filter_factories,
+                                      self.logqueue)
 
     def setup_processor(self):
         if self.output_mode == "tcp":
diff --git a/modules/openstack_project/manifests/logstash_worker.pp b/modules/openstack_project/manifests/logstash_worker.pp
index 219891358f..bf10f8f93e 100644
--- a/modules/openstack_project/manifests/logstash_worker.pp
+++ b/modules/openstack_project/manifests/logstash_worker.pp
@@ -43,6 +43,10 @@ class openstack_project::logstash_worker (
     ensure => present,
   }
 
+  package { 'crm114':
+    ensure => present,
+  }
+
   include pip
   package { 'gear':
     ensure   => latest,
@@ -50,6 +54,24 @@ class openstack_project::logstash_worker (
     require  => Class['pip'],
   }
 
+  file { '/var/lib/crm114':
+    ensure  => directory,
+    owner   => 'logstash',
+    group   => 'logstash',
+    require => User['logstash'],
+  }
+
+  file { '/usr/local/bin/classify-log.crm':
+    ensure  => present,
+    owner   => 'root',
+    group   => 'root',
+    mode    => '0755',
+    source  => 'puppet:///modules/openstack_project/logstash/classify-log.crm',
+    require => [
+      Package['crm114'],
+    ],
+  }
+
   file { '/usr/local/bin/log-gearman-worker.py':
     ensure  => present,
     owner   => 'root',