fluent-plugin-s3
fluent-plugin-s3 copied to clipboard
When match_regexp not matched, message is discarded
Describe the bug
Although the aws_sdk_sqs/queue_poller module does indeed delete only at the end of the code block passed to it while polling (assuming :skip_delete is false), the use of next unless @match_regexp.match?(key) short circuits the block and the delete action occurs.
My set up involves multiple Fluentd nodes pointing to one SQS queue. Fluentd stores events in S3 using the hostname as part of the path, and a regex to match the hostname is used to pull events back into Fluentd, because I want the node that originally processed the event to process it the second time for sending to OpenSearch.
To Reproduce
Using multiple Fluentd nodes, send events to S3 using the unique hostname as part of the path.
Ingest the events using the S3 input plugin and an appropriately configured SQS queue, using match_regexp to match the hostname part of the path.
If an event processed by host A is picked up by host B, for example, the event won't be processed but will be deleted and will never make its way to your ultimate destination.
Expected behavior
The expected behavior would be that the event would not be deleted, but left on the queue for the appropriate host to process.
Your Environment
- Fluentd version: 1.5.13
- fluent-plugin-s3 version: 1.7.2
- aws-sdk-s3 version: 1.119.1
- aws-sdk-sqs version: 1.53.0
- Operating system:
NAME="Ubuntu"
VERSION="20.04.6 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.6 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal
- Kernel version: 5.15.0-1031-aws
Your Configuration
## ingest from UDP 25001 on localhost
<source>
@type udp
port 25001
bind localhost
tag port_25001
<parse>
@type json
</parse>
</source>
## ingest from S3 via SQS
<source>
@type s3
tag from_s3
s3_bucket BUCKET
s3_region REGION
add_object_metadata true
store_as json
match_regexp /logs/HOSTNAME/.*
<sqs>
queue_name QUEUE_NAME
</sqs>
</source>
## send traffic from port 25001 to S3, storing it by using HOSTNAME as part of the key
<match port_25001>
@type s3
s3_bucket BUCKET
s3_region REGION
store_as json
path /logs/HOSTNAME/${tag}/%Y-%m-%d/%H/%M%S
s3_object_key_format %{path}-%{uuid_flush}.%{file_extension}
auto_create_bucket false
check_object false
check_bucket false
slow_flush_log_threshold 120s
utc true
<buffer tag,time>
@type file
timekey 5m
timekey_wait 1m
timekey_use_utc true
path /data/var/s3_buffer
flush_thread_count 16
chunk_limit_size 32M
queue_limit_length 16
retry_max_interval 30
retry_forever true
</buffer>
</match>
## send items coming from S3 to stdout
<match from_s3>
@type stdout
format pretty_json
</match>
Your Error Log
There is no error log. The events just don't show up in the output if another host grabs the message in the queue.
Additional context
I have a patch that works to prevent this issue. You all may prefer a more nuanced approach, but this works for me:
--- lib/fluent/plugin/in_s3_orig.rb 2023-03-29 18:46:58.772216442 +0000
+++ lib/fluent/plugin/in_s3.rb 2023-03-29 18:19:44.867117653 +0000
@@ -211,7 +211,7 @@
if @match_regexp
raw_key = get_raw_key(body)
key = CGI.unescape(raw_key)
- next unless @match_regexp.match?(key)
+ throw :skip_delete unless @match_regexp.match?(key)
end
process(body)
rescue => e
A caveat about the strategy I used above - if you're facing high log volumes, you may realize that lots of events get "stuck" in SQS. As the unprocessed events pile up in SQS, individual Fluentd node performance will drop, likely because as the SQS queue is polled a BUNCH of events come back and it's unable to process them efficiently.
You probably won't see this adverse reaction in a single node architecture. My architecture has three Fluentd nodes polling a single SQS queue. Without my patch, I was losing data, but with my patch, I lost data anyway due to needing to purge the SQS queue periodically.
This patch may be useful for some situations, and should probably be implemented as a configuration option like toss_event_if_no_regexp_match or something like that.