openstack-ansible-ops/elk_metrics_6x/templates/logstash-pipelines.yml.j2
Kevin Carter 0d4a4a92c7
Converg the logstash pipelines and enhance memory backed queues
The multi-logstash pipeline setup, while amazingly fast, was crashing
and causing index errors when under high load for a long period of time.
Because of the crashing behavior and the fact that the folks from
Elastic describe multi-pipeline queues to be "beta" at this time the
logstash pipelines have been converted back into a single pipeline.

The memory backed queue options are now limited by a ram disk (tmpfs)
which will ensure that a burst within the queue does not cause OOM
issues and ensures a highly performant deployment and limiting memory
usage at the same time. Memory backed queues will be enabled when the
underlying system is using "rotational" media as detected by ansible
facts. This will ensure a fast and consistent experience across all
deployment types.

Pipeline/ml/template/dashboard setup has been added to the beat
configurations which will ensure beats are properly configured even
when running in an isolated deployment and outside of normal operations
where beats are generally configured on the first data node.

Change-Id: Ie3c775f98b14f71bcbed05db9cb1c5aa46d9c436
Signed-off-by: Kevin Carter <kevin.carter@rackspace.com>
2018-09-16 23:44:58 -05:00

501 lines
22 KiB
Django/Jinja

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: "elk_metrics_6x"
queue.type: "persisted"
config.string: |
input {
beats {
id => "inputBeats"
port => {{ logstash_beat_input_port }}
add_field => {
"[@metadata][source_type]" => "beats"
}
}
}
{% if logstash_syslog_input_enabled | bool %}
input {
{% if logstash_syslog_input_mode == 'tcp' %}
tcp {
id => "inputSyslogTcp"
port => {{ logstash_syslog_input_port }}
type => syslog
add_field => {
"[@metadata][source_type]" => "syslog"
}
}
{% elif logstash_syslog_input_mode == 'udp' %}
udp {
id => "inputSyslogUdp"
port => {{ logstash_syslog_input_port }}
type => syslog
add_field => {
"[@metadata][source_type]" => "syslog"
}
}
{% endif %}
}
{% endif %}
filter {
if [@metadata][source_type] == "syslog" {
mutate {
add_tag => ["syslog"]
}
}
if [@metadata][source_type] == "beats" or [@metadata][source_type] == "syslog" {
if [systemd_slice] {
mutate {
copy => { "systemd_slice" => "systemd_slice_tag" }
}
mutate {
gsub => [ "systemd_slice_tag", ".slice", "" ]
}
if [systemd_slice_tag] != "-" {
mutate {
add_tag => [
"%{systemd_slice_tag}"
]
}
mutate {
add_tag => [
"filebeat"
]
}
}
mutate {
remove_field => [ "%{systemd_slice_tag}" ]
}
}
if "filebeat" in [tags] {
if "Traceback" in [message] {
mutate {
add_tag => ["traceback"]
remove_tag => ["_grokparsefailure"]
}
}
if "auth" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" }
}
mutate {
add_field => { "module" => "auth" }
}
} else if "elasticsearch" in [tags] {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" }
}
mutate {
replace => { "module" => "elasticsearch.%{module}" }
}
} else if "ceph" in [tags] {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" }
}
if "ceph-osd" in [tags] {
grok {
match => { "message" => "-- (?<src_host>(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?<dst_host>(%{IPORHOST}\:%{POSINT}/%{POSINT}))" }
}
}
} else if "libvirt" in [tags] {
grok {
match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" }
add_field => { "received_at" => "%{@timestamp}"}
}
mutate {
uppercase => [ "loglevel" ]
}
} else if "logstash" in [tags] {
grok {
match => {
"message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<\"%{DATA:exception}\">, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:\"%{LOGLEVEL:loglevel}\"|)\}"
}
}
mutate {
add_field => { "module" => "logstash" }
uppercase => [ "loglevel" ]
}
if [loglevel] == "WARN" {
mutate {
replace => { "loglevel" => "WARNING" }
}
} else if ![loglevel] {
mutate {
add_field => { "loglevel" => "ERROR" }
}
}
} else if "mysql" in [tags] {
grok {
match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" }
}
grok {
match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" }
}
grok {
match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" }
}
grok {
match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" }
}
geoip {
source => "clientip"
}
date {
match => [ "timestamp", "UNIX" ]
}
mutate {
remove_field => "timestamp"
}
mutate {
gsub => [ "logmessage", "^\n", "" ]
add_field => { "module" => "mysql" }
add_field => { "loglevel" => "WARNING" }
}
} else if "nginx" in [tags] {
if "nginx-access" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}"
}
}
geoip {
source => "clientip"
}
}
if "nginx-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}"
}
}
}
} else if "openstack" in [tags] {
if "oslofmt" in [tags] {
if "Can not find policy directory: policy.d" in [message] {
drop { }
}
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?<loglevel>AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?",
"^%{CISCOTIMESTAMP:journalddate}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{GREEDYDATA:logmessage}"
]
}
add_field => { "received_at" => "%{@timestamp}" }
}
}
if "nova" in [tags] {
mutate {
gsub => ["logmessage","\"",""]
}
if [module] == "nova.osapi_compute.wsgi.server" {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes:int} time\: %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
} else if [module] == "nova.api.ec2" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NUMBER:seconds}s %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} None\:None %{NUMBER:response} %{GREEDYDATA:user_agent}" }
add_tag => ["apimetrics"]
}
} else if [module] == "nova.metadata.wsgi.server" {
grok {
match => { "logmessage" => "\[%{GREEDYDATA:requestid}\] %{NOTSPACE:requesterip} %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} status\: %{NUMBER:response} len\: %{NUMBER:bytes} time\: %{NUMBER:seconds}" }
add_tag => ["apimetrics"]
}
}
} else if "neutron" in [tags] {
if [module] == "neutron.wsgi" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
}
} else if "neutron-ha-tool" in [source] {
mutate {
add_tag => ["neutron-ha-tool"]
remove_tag => ["_grokparsefailure"]
}
}
if "starting" in [message] and "_grokparsefailure" in [tags] {
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid}|\-)\](%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" }
}
mutate {
remove_tag => ["_grokparsefailure"]
}
}
} else if "glance" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
mutate {
replace => { "module" => "glance.%{module}" }
}
}
} else if "cinder" in [tags] {
if [module] == "cinder.eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
}
mutate {
replace => { "module" => "cinder.%{module}" }
}
}
} else if "horizon" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => [
"%{COMMONAPACHELOG}",
"\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}",
"%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{PROG:prog}%{SPACE}%{IP:clientip}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NUMBER}%{SPACE}%{QS}%{SPACE}%{QS}"
]
}
}
geoip {
source => "clientip"
}
if ![loglevel] {
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "horizon.access" }
add_field => { "loglevel" => "INFO" }
add_tag => [ "apache-access" ]
}
} else {
mutate {
replace => { "module" => "horizon.error.%{module}" }
add_tag => [ "apache-error" ]
uppercase => [ "loglevel" ]
}
}
} else if "heat" in [tags] {
if [module] == "eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE}\] %{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" }
add_tag => ["apimetrics"]
}
}
mutate {
replace => { "module" => "heat.%{module}" }
}
} else if [module] == "heat.engine.service" {
grok {
match => { "logmessage" => "\[%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} %{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{GREEDYDATA:servicemessage}" }
add_tag => ["apimetrics"]
}
}
} else if "swift-account" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP}%{SPACE}%{HOSTNAME}%{SPACE}%{PROG}%{SPACE}%{SYSLOGTIMESTAMP}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{IP}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{POSINT}%{SPACE}%{NOTSPACE}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{SECOND}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NOTSPACE}"
}
}
} else if "swift" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}"
}
}
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"logmessage" => [
"%{COMBINEDAPACHELOG}",
"%{SWIFTPROXY_ACCESS}",
"%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)"
]
}
tag_on_failure => []
overwrite => [ "logmessage" ]
}
if [request] {
mutate {
replace => { "logmessage" => "%{request}" }
}
}
mutate {
replace => { "module" => "swift.%{module}" }
}
if [file] =~ "error.log$" {
mutate {
add_field => { "loglevel" => "NOTICE" }
}
} else {
mutate {
add_field => { "loglevel" => "INFO" }
}
}
} else if "keystone-access" in [tags] {
grok {
match => { "message" => "%{CISCOTIMESTAMP:keystone_access_timestamp}%{SPACE}%{SYSLOGHOST:log_host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:keystone_timestmp}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{SYSLOG5424SD:requestid}%{SPACE}%{WORD:verb}%{SPACE}%{NOTSPACE:request}" }
}
} else if "keystone" in [tags] {
if "apache-access" in [tags] {
grok {
match => { "message" => "%{COMMONAPACHELOG}" }
}
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "keystone.access" }
add_field => { "loglevel" => "INFO" }
}
} else if "apache-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => { "message" => "%{KEYSTONE_SUBSECOND_TIMESTAMP:keystone_subsecond_timestamp} %{STANDARD_TIMESTAMP:standard_timestamp} %{NUMBER:pid} %{DATA:loglevel} %{DATA:module} \[%{DATA:requestid}\] %{WORD:verb} %{NOTSPACE:request}" }
}
mutate {
replace => { "module" => "keystone.error.%{module}" }
uppercase => [ "loglevel" ]
}
}
} else if "magnum" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
mutate {
replace => { "module" => "magnum.%{module}" }
}
}
} else if "octavia" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "\[(%{NOTSPACE:requestid} %{NOTSPACE:user_id} %{NOTSPACE:tenant} \- \- \-|\-)\] %{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["apimetrics"]
}
mutate {
replace => { "module" => "octavia.%{module}" }
}
}
}
} else if "rabbitmq" in [tags] {
if [message] == "" {
drop { }
}
grok {
match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" }
}
mutate {
replace => { "module" => "rabbitmq" }
add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" }
}
date {
match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ]
remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ]
}
}
}
}
if [source.ip] {
geoip {
id => "setGeoIpSource"
source => "source.ip"
}
} else if [ip] {
geoip {
id => "setGeoIp"
source => "ip"
}
}
if [message] {
fingerprint {
id => "setSHA1"
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "{{ inventory_hostname | to_uuid }}"
}
} else {
fingerprint {
target => "[@metadata][fingerprint]"
method => "UUID"
}
}
}
output {
if [@metadata][version] {
elasticsearch {
id => "elasticsearchOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
} else if [@metadata][beat] {
elasticsearch {
id => "elasticsearchLegacyOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
} else if "syslog" in [tags] {
elasticsearch {
id => "elasticsearchSyslogOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "syslog-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
id => "elasticsearchUndefinedOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (not data_node | bool) | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "undefined-%{+YYYY.MM.dd}"
}
}
{% if logstash_kafka_options is defined %}
kafka {
{% for key, value in logstash_kafka_options.items() %}
{% if value is number %}
{{ key }} => {{ value }}
{% elif value is iterable and value is not string %}
{{ key }} => "{{ value | join(',') }}"
{% else %}
{{ key }} => "{{ value }}"
{% endif %}
{% endfor %}
}
{% endif %}
}