# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: "elk_metrics_6x"
  queue.type: "persisted"
  config.string: |
    input {
      beats {
        id => "inputBeats"
        client_inactivity_timeout => 300
        port => {{ logstash_beat_input_port }}
        add_field => {
          "[@metadata][source_type]" => "beats"
        }
      }
    }
{% if logstash_syslog_input_enabled | bool %}
    input {
{%   if logstash_syslog_input_mode == 'tcp' %}
      tcp {
        id => "inputSyslogTcp"
        port => {{ logstash_syslog_input_port }}
        type => syslog
        add_field => {
          "[@metadata][source_type]" => "syslog"
        }
      }
{%   elif logstash_syslog_input_mode == 'udp' %}
      udp {
        id => "inputSyslogUdp"
        port => {{ logstash_syslog_input_port }}
        type => syslog
        add_field => {
          "[@metadata][source_type]" => "syslog"
        }
      }
{%   endif %}
    }
{% endif %}
    filter {
      if [@metadata][source_type] == "syslog" {
        mutate {
          add_tag => ["syslog"]
        }
      }

      # NOTE(mnaser): Filebeat doesn't support shipping to different outputs
      #               which means we need to parse `auditd` fileset here rather
      #               than rely on ingest.
      if [fileset][module] == "auditd" {
        grok {
          break_on_match => false
          match => {
            message => [
              "type=%{WORD:[auditd][log][record_type]}",
              "msg=audit\(%{NUMBER:timestamp}:%{NUMBER:[auditd][log][sequence]}\)",
              "a0=\"%{DATA:[auditd][log][a0]}\"",
              "acct=\"%{DATA:[auditd][log][acct]}\"",
              "addr=%{IPORHOST:[auditd][log][addr]}"
            ]
          }
        }

        date {
          match => [ "timestamp", "UNIX" ]
          remove_field => "timestamp"
        }

        if [auditd][log][addr] {
          geoip {
            source => "[auditd][log][addr]"
            target => "[auditd][geoip]"
          }
        }

        # NOTE(mnaser): We don't match all fields so `grok` thinks that we
        #               failed.
        mutate {
          remove_tag => ["_grokparsefailure"]
        }
      }

      if [@metadata][source_type] == "beats" or [@metadata][source_type] == "syslog" {
        if [systemd_slice] {
          mutate {
            copy => { "systemd_slice" => "systemd_slice_tag" }
          }
          mutate {
            gsub => [ "systemd_slice_tag", ".slice", "" ]
          }
          if [systemd_slice_tag] != "-" {
            mutate {
              add_tag => [
                "%{systemd_slice_tag}"
              ]
            }
            mutate {
              add_tag => [
                "filebeat"
              ]
            }
          }
          mutate {
            remove_field => [ "%{systemd_slice_tag}" ]
          }
        }
        if "filebeat" in [tags] {
          if "Traceback" in [message] {
            mutate {
              add_tag => ["traceback"]
              remove_tag => ["_grokparsefailure"]
            }
          }

          if "auth" in [tags] {
            grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" }
            }
            mutate {
              add_field => { "module" => "auth" }
            }
          } else if "elasticsearch" in [tags] {
            grok {
              match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" }
            }
            mutate {
              replace => { "module" => "elasticsearch.%{module}" }
            }
          } else if "ceph" in [tags] {
            grok {
              match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" }
            }
            if "ceph-osd" in [tags] {
              grok {
                match => { "message" => "-- (?<src_host>(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?<dst_host>(%{IPORHOST}\:%{POSINT}/%{POSINT}))" }
              }
            }
          } else if "libvirt" in [tags] {
            grok {
              match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" }
              add_field => { "received_at" => "%{@timestamp}"}
            }
            mutate {
              uppercase => [ "loglevel" ]
            }
          } else if "logstash" in [tags] {
            grok {
              match => {
                "message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<\"%{DATA:exception}\">, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:\"%{LOGLEVEL:loglevel}\"|)\}"
              }
            }
            mutate {
              add_field => { "module" => "logstash" }
              uppercase => [ "loglevel" ]
            }
            if [loglevel] == "WARN" {
              mutate {
                replace => { "loglevel" => "WARNING" }
              }
            } else if ![loglevel] {
              mutate {
                add_field => { "loglevel" => "ERROR" }
              }
            }
          } else if "mysql" in [tags] {
            grok {
              match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" }
            }
            grok {
              match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" }
            }
            grok {
              match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" }
            }
            grok {
              match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" }
            }
            geoip {
              source => "clientip"
            }
            date {
              match => [ "timestamp", "UNIX" ]
            }
            mutate {
              remove_field => "timestamp"
            }
            mutate {
              gsub => [ "logmessage", "^\n", "" ]
              add_field => { "module" => "mysql" }
              add_field => { "loglevel" => "WARNING" }
            }
          } else if "nginx" in [tags] {
            if "nginx-access" in [tags] {
              grok {
                patterns_dir => ["/opt/logstash/patterns"]
                match => {
                  "message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\]  \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}"
                }
              }
              geoip {
                source => "clientip"
              }
            }
            if "nginx-error" in [tags] {
              grok {
                patterns_dir => ["/opt/logstash/patterns"]
                match => {
                  "message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}"
                }
              }
            }
          } else if "openstack" in [tags] {
            if "Can not find policy directory: policy.d" in [message] {
              drop { }
            }
            grok {
              match => {
                "message" => [
                  "^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?<loglevel>AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?",
                  "^%{CISCOTIMESTAMP:journalddate}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{GREEDYDATA:logmessage}"
                ]
              }
            }
            grok {
              match => {
                "logmessage" => ["\[(%{NOTSPACE:request_id} %{NOTSPACE:user} %{NOTSPACE:tenant} %{NOTSPACE:domain} %{NOTSPACE:user_domain} %{NOTSPACE:project_domain}|\-)\] %{GREEDYDATA:logmessage}?"]
              }
              overwrite => [ "logmessage" ]
            }
            date {
              match => [ "logdate", ISO8601 ]
              remove_field => [ "logdate" ]
            }
            if "nova" in [tags] {
              # Instance ID from logs (i.e. "[instance: 5ee83c6e-3604-467a-be54-e48429086e3f]")
              grok {
                match => {
                  "logmessage" => ["(\[instance\: %{NOTSPACE:instance_id}\] )?%{GREEDYDATA:logmessage}?"]
                }
                overwrite => [ "logmessage" ]
              }

              if [module] == "nova.api.openstack.requestlog" {
                grok {
                  match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request}\" status\: %{NUMBER:response} len\: %{NUMBER:bytes} microversion\: (%{NUMBER:microversion}|\-) time\: %{NUMBER:duration:float}" }
                  add_tag => ["api"]
                  remove_field => [ "logmessage", "message" ]
                }
              } else if [module] == "nova.api.openstack.placement.requestlog" {
                grok {
                  match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request}\" status\: %{NUMBER:response} len\: %{NUMBER:bytes} microversion\: (%{NUMBER:microversion}|\-)" }
                  add_tag => ["api"]
                  remove_field => [ "logmessage", "message" ]
                }
              }
            } else if "neutron" in [tags] {
              if [module] == "neutron.wsgi" {
                grok {
                  match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" status\: %{NUMBER:response}  len\: %{NUMBER:bytes} time\: %{NUMBER:duration:float}" }
                  add_tag => ["api"]
                  remove_field => [ "logmessage", "message" ]
                }
              } else if "neutron-ha-tool" in [source] {
                mutate {
                  add_tag => ["neutron-ha-tool"]
                  remove_tag => ["_grokparsefailure"]
                }
              }
              if "starting" in [message] and "_grokparsefailure" in [tags] {
                grok {
                  match => { "logmessage" => "(%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" }
                }
                mutate {
                  remove_tag => ["_grokparsefailure"]
                }
              }
            } else if "glance" in [tags] {
              if [module] == "eventlet.wsgi.server" {
                mutate {
                  gsub => ["logmessage","\"",""]
                }
                grok {
                  match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
                  add_tag => ["api"]
                }
                mutate {
                  replace => { "module" => "glance.%{module}" }
                }
              }
            } else if "cinder" in [tags] {
              if [module] == "cinder.eventlet.wsgi.server" {
                if "accepted" not in [logmessage] {
                  mutate {
                    gsub => ["logmessage","\"",""]
                  }
                  grok {
                    match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
                    add_tag => ["api"]
                  }
                }
                mutate {
                  replace => { "module" => "cinder.%{module}" }
                }
              }
            } else if "horizon" in [tags] {
              grok {
                patterns_dir => ["/opt/logstash/patterns"]
                match => {
                  "message" => [
                    "%{COMMONAPACHELOG}",
                    "\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}",
                    "%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{PROG:prog}%{SPACE}%{IP:clientip}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NUMBER}%{SPACE}%{QS}%{SPACE}%{QS}"
                  ]
                }
              }
              geoip {
                source => "clientip"
              }
              if ![loglevel]  {
                mutate {
                  add_field => { "logmessage" => "%{request}" }
                  add_field => { "module" => "horizon.access" }
                  add_field => { "loglevel" => "INFO" }
                  add_tag => [ "apache-access" ]
                }
              } else {
                mutate {
                  replace => { "module" => "horizon.error.%{module}" }
                  add_tag => [ "apache-error" ]
                  uppercase => [ "loglevel" ]
                }
              }
            } else if "heat" in [tags] {
              if [module] == "eventlet.wsgi.server" {
                if "accepted" not in [logmessage] {
                  mutate {
                    gsub => ["logmessage","\"",""]
                  }
                  grok {
                    match => { "logmessage" => "%{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" }
                    add_tag => ["api"]
                  }
                }
                mutate {
                  replace => { "module" => "heat.%{module}" }
                }
              } else if [module] == "heat.engine.service" {
                grok {
                  match => { "logmessage" => "%{GREEDYDATA:servicemessage}" }
                  add_tag => ["api"]
                }
              }
            } else if "swift-account" in [tags] {
              grok {
                match => {
                  "message" => "%{SYSLOGTIMESTAMP}%{SPACE}%{HOSTNAME}%{SPACE}%{PROG}%{SPACE}%{SYSLOGTIMESTAMP}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{IP}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{POSINT}%{SPACE}%{NOTSPACE}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{SECOND}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NOTSPACE}"
                }
              }
            } else if "swift" in [tags] {
              grok {
                match => {
                 "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}"
                }
              }
              grok {
                patterns_dir => ["/opt/logstash/patterns"]
                match => {
                  "logmessage" => [
                    "%{COMBINEDAPACHELOG}",
                    "%{SWIFTPROXY_ACCESS}",
                    "%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)"
                  ]
                }
                tag_on_failure => []
                overwrite => [ "logmessage" ]
              }

              if [request] {
                mutate {
                  replace => { "logmessage" => "%{request}" }
                }
              }

              mutate {
                replace => { "module" => "swift.%{module}" }
              }

              if [file] =~ "error.log$" {
                mutate {
                  add_field => { "loglevel" => "NOTICE" }
                }
              } else {
                mutate {
                  add_field => { "loglevel" => "INFO" }
                }
              }
            } else if "keystone" in [tags] {
              if [loglevel] == "INFO" and [module] == "keystone.common.wsgi" {
                grok {
                  match => { "logmessage" => "%{WORD:verb} %{NOTSPACE:request}" }
                  remove_field => [ "logmessage", "message" ]
                }
              }
            } else if "magnum" in [tags] {
              if [module] == "eventlet.wsgi.server" {
                mutate {
                  gsub => ["logmessage","\"",""]
                }
                grok {
                  match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
                  add_tag => ["api"]
                }
                mutate {
                  replace => { "module" => "magnum.%{module}" }
                }
              }
            } else if "octavia" in [tags] {
              if [module] == "eventlet.wsgi.server" {
                mutate {
                  gsub => ["logmessage","\"",""]
                }
                grok {
                  match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
                  add_tag => ["api"]
                }
                mutate {
                  replace => { "module" => "octavia.%{module}" }
                }
              }
            }
          } else if "rabbitmq" in [tags] {
            if [message] == "" {
              drop { }
            }
            grok {
              match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" }
            }
            mutate {
              replace => { "module" => "rabbitmq" }
              add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" }
            }
            date {
              match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ]
              remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ]
            }
          }
        }
      }
      if [source.ip] {
        geoip {
          id => "setGeoIpSource"
          source => "source.ip"
        }
      } else if [ip] {
        geoip {
          id => "setGeoIp"
          source => "ip"
        }
      }
      if [message] {
        fingerprint {
          id => "setSHA1"
          target => "[@metadata][fingerprint]"
          method => "SHA1"
          key => "{{ inventory_hostname | to_uuid }}"
        }
      }
    }
    output {
      if [@metadata][fingerprint] {
        if [@metadata][version] {
          elasticsearch {
            id => "elasticsearchDocIDOutputPipeline"
            document_id => "%{[@metadata][fingerprint]}"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
          }
        } else if [@metadata][beat] {
          elasticsearch {
            id => "elasticsearchLegacyDocIDOutputPipeline"
            document_id => "%{[@metadata][fingerprint]}"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
          }
        } else if "syslog" in [tags] {
          elasticsearch {
            id => "elasticsearchSyslogDocIDOutputPipeline"
            document_id => "%{[@metadata][fingerprint]}"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "syslog-%{+YYYY.MM.dd}"
          }
        } else {
          elasticsearch {
            id => "elasticsearchUndefinedDocIDOutputPipeline"
            document_id => "%{[@metadata][fingerprint]}"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "undefined-%{+YYYY.MM.dd}"
          }
        }
      } else {
        if [@metadata][version] {
          elasticsearch {
            id => "elasticsearchOutputPipeline"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
          }
        } else if [@metadata][beat] {
          elasticsearch {
            id => "elasticsearchLegacyOutputPipeline"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
          }
        } else if "syslog" in [tags] {
          elasticsearch {
            id => "elasticsearchSyslogOutputPipeline"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "syslog-%{+YYYY.MM.dd}"
          }
        } else {
          elasticsearch {
            id => "elasticsearchUndefinedOutputPipeline"
            hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
            sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
            manage_template => {{ (data_node | bool) | lower }}
            index => "undefined-%{+YYYY.MM.dd}"
          }
        }
      }

{% if logstash_kafka_options is defined %}
      kafka {
    {% for key, value in logstash_kafka_options.items() %}
    {%   if value is number %}
        {{ key }} => {{ value }}
    {%   elif value is iterable and value is not string %}
        {{ key }} => "{{ value | join(',') }}"
    {%   else %}
        {{ key }} => "{{ value }}"
    {%   endif %}
    {% endfor %}
      }
{% endif %}
    }