fluentd
fluentd copied to clipboard
When BufferOverflowError is happening in forward output plugin, Fluentd lost data.
Describe the bug
I have a cluster that use fluentd(collect) -> fluentd(forward) -> Loki. (and the overflow_action is set to throw_exception)
When Loki is disconnected, log event is storaged in fluentd(forward) buffer. After fluentd(forward) buffer is fulling, fluentd(forward) throws the BufferOverflowError exception to the in_forward plugin.
This output doc said, "forward input returns an error to forward output."
But the out_forward plugin in fluentd(collect) does nothing, in_tail is reading log from file, log event is still flushing to the fluentd(forward), and the chunk has purged from the fluentd(collect) buffer. Because the fluentd(collect) does not stop read and flush operation, the data loss occurs.
To Reproduce
- two fluentd(collect and forward)
- use forward plugin
- overflow_action is set to throw_exception
- fluentd(forward) BufferOverflowError
- in_tail does not stop read
Expected behavior
- out_forward plugin handle/throw the BufferOverflowError exception which from in_forward plugin
- in_tail stop read
- log can't be lost.(storage in the buffer file)
Your Environment
- Fluentd version:1.12.4(1.12.0 and 1.14.6 have the same problem)
- TD Agent version:
- Operating system:CentOS Linux release 8.3.2011
- Kernel version:4.18.0-240.1.1.el8_3.x86_64
Your Configuration
fluentd(forward):
<system>
log_level trace
</system>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match **>
@type loki
# log_level debug
url http://loki-distributor:3100
tenant user1
extra_labels {"env":"dev"}
<buffer>
@type file
path /var/log/fluent-forward
retry_forever true
flush_mode interval
flush_interval 1s
flush_at_shutdown true
chunk_limit_size 4m
retry_type periodic
retry_wait 180
total_limit_size 2m
overflow_action throw_exception
</buffer>
</match>
fluentd(collect):
<system>
log_level trace
</system>
<source>
@type tail
@id in_tail_container_logs
path /var/log/LoadTest/*.log
pos_file /var/log/fluentd-containers.log.pos
tag test.*
read_from_head true
read_lines_limit 4000
path_key tailed_file
<parse>
@type multi_format
<pattern>
format json
time_key time
time_type string
time_format "%Y-%m-%dT%H:%M:%S.%NZ"
keep_time_key false
</pattern>
</parse>
emit_unmatched_lines true
</source>
<match **>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<server>
name myserver1
host fluent-forward-fluentd
port 24224
weight 60
</server>
<buffer>
@type file
path /var/log/fluent-collect
retry_forever true
flush_mode interval
flush_interval 1s
flush_at_shutdown true
retry_type periodic
retry_wait 180
overflow_action throw_exception
total_limit_size 4m
</buffer>
</match>
Your Error Log
fluentd(forward):
2022-05-10 10:06:13 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:1009:in 'block in handle_stream_simple'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:893:in 'write_guard'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:1008:in 'handle_stream_simple'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:883:in 'execute_chunking'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:813:in 'emit_buffered'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/out_relabel.rb:29:in 'process'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:802:in 'emit_sync'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:160:in 'emit_events'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/out_relabel.rb:29:in 'process'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:802:in 'emit_sync'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:160:in 'emit_events'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:318:in 'on_message'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:226:in 'block in handle_connection'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:263:in 'block (3 levels) in read_messages'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:262:in 'feed_each'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:262:in 'block (2 levels) in read_messages'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:271:in 'block in read_messages'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/server.rb:613:in 'on_read_without_connection'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in 'on_readable'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in 'on_readable'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in 'run_once'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in 'run'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/event_loop.rb:93:in 'block in start'
2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/thread.rb:78:in 'block in thread_create'
2022-05-10 10:06:13 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
2022-05-10 10:06:13 +0000 [warn]: #0 suppressed same stacktrace
...
2022-05-10 10:37:28 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:throw_exception
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [error]: #0 unexpected error on reading data host="10.42.1.102" port=46470 error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
2022-05-10 10:37:28 +0000 [error]: #0 suppressed same stacktrace
fluentd(collect):
2022-05-10 10:37:26 +0000 [trace]: #0 writing events into buffer instance=69887115737760 metadata_size=1
2022-05-10 10:37:26 +0000 [debug]: #0 Created new chunk chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:26 +0000 [trace]: #0 chunk /var/log/fluent-collect/buffer.b5dea5e9267b87ad2ead8d8c8d368ef5e.log size_added: 182 new_size: 182
2022-05-10 10:37:26 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:26 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:26 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:27 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:27 +0000 [trace]: #0 enqueueing chunk instance=69887115737760 metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:27 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:27 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:28 +0000 [trace]: #0 dequeueing a chunk instance=69887115737760
2022-05-10 10:37:28 +0000 [trace]: #0 chunk dequeued instance=69887115737760 metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 trying flush for a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 adding write count instance=69887119493940
2022-05-10 10:37:28 +0000 [trace]: #0 executing sync write chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:28 +0000 [trace]: #0 write operation done, committing chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 committing write operation to a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e" delayed=false
2022-05-10 10:37:28 +0000 [trace]: #0 purging a chunk instance=69887115737760 chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 chunk purged instance=69887115737760 chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 done to commit a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:28 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:28 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:29 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:29 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:29 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:30 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:30 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:30 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:31 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:31 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
Additional context
How to avoid data loss?
I can reproduce that same bug on a cluster that use fluentbit(field collector) -> fluentd(forward) -> OpenSearch.
This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days
out_forward plugin applies at-most-one semantic by default: https://docs.fluentd.org/output/forward
The out_forward plugin supports at-most-once and at-least-once semantics. The default is at-most-once.
It means that out_forward plugin doesn't guarantee that sent messages are reached.
If you want to guarantee to reach, you need to set require_ack_response
:
https://docs.fluentd.org/output/forward#require_ack_response
This output doc said, "forward input returns an error to forward output."
Hmm, sorry for that, the document is wrong.
in_forward never send error, it will send ack
response only when require_ack_response
is set to out_forward and in_forward stores the data to buffer.
FYI: The protocol specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#response
I'll fix the document.
Hmm, sorry for that, the document is wrong. in_forward never send error, it will send
ack
response only whenrequire_ack_response
is set to out_forward and in_forward stores the data to buffer. FYI: The protocol specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#responseI'll fix the document.
Thanks for your answer,after the require_ack_response
is set to out_forward, data loss issue is resolved.
Is the require_ack_response
flag needed when I send data from fluentbit doku to fluentd via forward? Because then it changes to sending data as a chunk, right?