Define retry_tag for unprocessed Fluentd logs

The Kolla Ansible Fluentd config is not idempotent. In practise this
can be an issue if some logs are rejected by OpenSearch /
ElasticSearch bulk API. In this case, by default, the unprocessed
logs will be reprocessed from the start of the Fluentd pipeline,
leading to error messages.

The solution proposed here is to explicitly set the retry_tag as
documented in [1,2], and add a dedicated output for retried logs.

An alternative fix could be to make the pipeline idempotent. This
would require a compromise of either duplicating content, or having
non-standard fields for the log payload/message.

[1] https://github.com/fluent/fluent-plugin-opensearch?tab=readme-ov-file#retry_tag
[2] https://github.com/uken/fluent-plugin-elasticsearch?tab=readme-ov-file#retry_tag

Closes-Bug: #2064104
Change-Id: I310fc1b8e002ce9f0ba60f8bb67b7f372a589314
This commit is contained in:
Doug Szumski 2024-04-29 16:13:43 +01:00
parent 891568d6d7
commit a2273026d7
4 changed files with 29 additions and 2 deletions

View File

@ -37,6 +37,7 @@
logstash_format true
logstash_prefix {{ opensearch_log_index_prefix }}
reconnect_on_error true
retry_tag retry_es
request_timeout {{ fluentd_elasticsearch_request_timeout }}
suppress_type_name true
bulk_message_request_threshold {{ fluentd_bulk_message_request_threshold }}
@ -70,6 +71,7 @@
logstash_format true
logstash_prefix {{ opensearch_log_index_prefix }}
reconnect_on_error true
retry_tag retry_os
request_timeout {{ fluentd_opensearch_request_timeout }}
suppress_type_name true
bulk_message_request_threshold {{ fluentd_bulk_message_request_threshold }}

View File

@ -1,4 +1,5 @@
<match **>
{% for match_pattern in ['retry_es', '**',] %}
<match {{ match_pattern }}>
@type copy
<store>
@type elasticsearch
@ -22,14 +23,22 @@
logstash_format true
logstash_prefix {{ opensearch_log_index_prefix }}
reconnect_on_error true
{% if match_pattern != 'retry_es' %}
retry_tag retry_es
{% endif %}
request_timeout {{ fluentd_elasticsearch_request_timeout }}
suppress_type_name true
bulk_message_request_threshold {{ fluentd_bulk_message_request_threshold }}
<buffer>
@type file
{% if match_pattern == 'retry_es' %}
path /var/lib/fluentd/data/elasticsearch.buffer/openstack_retry.*
{% else %}
path /var/lib/fluentd/data/elasticsearch.buffer/openstack.*
{% endif %}
flush_interval 15s
chunk_limit_size {{ fluentd_buffer_chunk_limit_size }}
</buffer>
</store>
</match>
{% endfor %}

View File

@ -1,4 +1,5 @@
<match **>
{% for match_pattern in ['retry_os', '**',] %}
<match {{ match_pattern }} >
@type copy
<store>
@type opensearch
@ -22,14 +23,22 @@
logstash_format true
logstash_prefix {{ opensearch_log_index_prefix }}
reconnect_on_error true
{% if match_pattern != 'retry_os' %}
retry_tag retry_os
{% endif %}
request_timeout {{ fluentd_opensearch_request_timeout }}
suppress_type_name true
bulk_message_request_threshold {{ fluentd_bulk_message_request_threshold }}
<buffer>
@type file
{% if match_pattern == 'retry_os' %}
path /var/lib/fluentd/data/opensearch.buffer/openstack_retry.*
{% else %}
path /var/lib/fluentd/data/opensearch.buffer/openstack.*
{% endif %}
flush_interval 15s
chunk_limit_size {{ fluentd_buffer_chunk_limit_size }}
</buffer>
</store>
</match>
{% endfor %}

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Set retry_tag in ElasticSearch/OpenSearch Fluentd output plugins. This
is to prevent log messages from being re-processed by non-idempotent
Fluentd pipeline configuration. See
`LP#2064104 <https://launchpad.net/bugs/2064104>`__