fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

When BufferOverflowError is happening in forward output plugin, Fluentd lost data.

Open jianggj opened this issue 2 years ago • 1 comments

Describe the bug

I have a cluster that use fluentd(collect) -> fluentd(forward) -> Loki. (and the overflow_action is set to throw_exception)

When Loki is disconnected, log event is storaged in fluentd(forward) buffer. After fluentd(forward) buffer is fulling, fluentd(forward) throws the BufferOverflowError exception to the in_forward plugin.

This output doc said, "forward input returns an error to forward output."

But the out_forward plugin in fluentd(collect) does nothing, in_tail is reading log from file, log event is still flushing to the fluentd(forward), and the chunk has purged from the fluentd(collect) buffer. Because the fluentd(collect) does not stop read and flush operation, the data loss occurs.

To Reproduce

  • two fluentd(collect and forward)
  • use forward plugin
  • overflow_action is set to throw_exception
  • fluentd(forward) BufferOverflowError
  • in_tail does not stop read

Expected behavior

  • out_forward plugin handle/throw the BufferOverflowError exception which from in_forward plugin
  • in_tail stop read
  • log can't be lost.(storage in the buffer file)

Your Environment

- Fluentd version:1.12.4(1.12.0 and 1.14.6 have the same problem)
- TD Agent version:
- Operating system:CentOS Linux release 8.3.2011
- Kernel version:4.18.0-240.1.1.el8_3.x86_64

Your Configuration

fluentd(forward):
  <system>
    log_level trace
  </system>
  <source>
    @type forward
    port 24224
    bind 0.0.0.0
  </source>
  <match **>
    @type loki
    # log_level debug
    url http://loki-distributor:3100
    tenant user1
    extra_labels {"env":"dev"}
    <buffer>
      @type file
      path /var/log/fluent-forward
      retry_forever true
      flush_mode interval
      flush_interval 1s
      flush_at_shutdown true
      chunk_limit_size 4m
      retry_type periodic
      retry_wait 180
      total_limit_size 2m
      overflow_action throw_exception
    </buffer>
  </match>
  
fluentd(collect):
  <system>
    log_level trace
  </system>
  <source>
    @type tail
    @id in_tail_container_logs
    path /var/log/LoadTest/*.log
    pos_file /var/log/fluentd-containers.log.pos
    tag test.*
    read_from_head true
    read_lines_limit 4000
    path_key tailed_file
    <parse>
      @type multi_format
      <pattern>
        format json
        time_key time
        time_type string
        time_format "%Y-%m-%dT%H:%M:%S.%NZ"
        keep_time_key false
      </pattern>
    </parse>
    emit_unmatched_lines true
  </source>
  <match **>
    @type forward
    send_timeout 60s
    recover_wait 10s
    hard_timeout 60s
    <server>
      name myserver1
      host fluent-forward-fluentd
      port 24224
      weight 60
    </server>
    <buffer>
      @type file
      path /var/log/fluent-collect
      retry_forever true
      flush_mode interval
      flush_interval 1s
      flush_at_shutdown true
      retry_type periodic
      retry_wait 180
      overflow_action throw_exception
      total_limit_size 4m
    </buffer>
  </match>

Your Error Log

fluentd(forward):

2022-05-10 10:06:13 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:1009:in 'block in handle_stream_simple'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:893:in 'write_guard'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:1008:in 'handle_stream_simple'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:883:in 'execute_chunking'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:813:in 'emit_buffered'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/out_relabel.rb:29:in 'process'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:802:in 'emit_sync'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:160:in 'emit_events'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/out_relabel.rb:29:in 'process'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/output.rb:802:in 'emit_sync'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:160:in 'emit_events'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/event_router.rb:97:in 'emit_stream'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:318:in 'on_message'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:226:in 'block in handle_connection'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:263:in 'block (3 levels) in read_messages'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:262:in 'feed_each'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:262:in 'block (2 levels) in read_messages'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/in_forward.rb:271:in 'block in read_messages'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/server.rb:613:in 'on_read_without_connection'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:123:in 'on_readable'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in 'on_readable'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in 'run_once'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in 'run'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/event_loop.rb:93:in 'block in start'
  2022-05-10 10:06:13 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin_helper/thread.rb:78:in 'block in thread_create'
2022-05-10 10:06:13 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
  2022-05-10 10:06:13 +0000 [warn]: #0 suppressed same stacktrace

...

2022-05-10 10:37:28 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:throw_exception
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
  2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
  2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [warn]: #0 emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.12.4/lib/fluent/plugin/buffer.rb:271:in 'write'" tag="test.var.log.LoadTest.testlog.log"
  2022-05-10 10:37:28 +0000 [warn]: #0 suppressed same stacktrace
2022-05-10 10:37:28 +0000 [error]: #0 unexpected error on reading data host="10.42.1.102" port=46470 error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
  2022-05-10 10:37:28 +0000 [error]: #0 suppressed same stacktrace


fluentd(collect):

2022-05-10 10:37:26 +0000 [trace]: #0 writing events into buffer instance=69887115737760 metadata_size=1
2022-05-10 10:37:26 +0000 [debug]: #0 Created new chunk chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:26 +0000 [trace]: #0 chunk /var/log/fluent-collect/buffer.b5dea5e9267b87ad2ead8d8c8d368ef5e.log size_added: 182 new_size: 182
2022-05-10 10:37:26 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:26 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:26 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:27 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:27 +0000 [trace]: #0 enqueueing chunk instance=69887115737760 metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:27 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:27 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:28 +0000 [trace]: #0 dequeueing a chunk instance=69887115737760
2022-05-10 10:37:28 +0000 [trace]: #0 chunk dequeued instance=69887115737760 metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 trying flush for a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 adding write count instance=69887119493940
2022-05-10 10:37:28 +0000 [trace]: #0 executing sync write chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:28 +0000 [trace]: #0 write operation done, committing chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 committing write operation to a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e" delayed=false
2022-05-10 10:37:28 +0000 [trace]: #0 purging a chunk instance=69887115737760 chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 chunk purged instance=69887115737760 chunk_id="5dea5e9267b87ad2ead8d8c8d368ef5e" metadata=#u003cstruct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="test.var.log.LoadTest.testlog.log", variables=nil, seq=0u003e
2022-05-10 10:37:28 +0000 [trace]: #0 done to commit a chunk chunk="5dea5e9267b87ad2ead8d8c8d368ef5e"
2022-05-10 10:37:28 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:28 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:28 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:29 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:29 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:29 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:30 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:30 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport
2022-05-10 10:37:30 +0000 [debug]: #0 connect new socket
2022-05-10 10:37:31 +0000 [trace]: #0 enqueueing all chunks in buffer instance=69887115737760
2022-05-10 10:37:31 +0000 [trace]: #0 sending heartbeat host="fluent-forward-fluentd" port=24224 heartbeat_type=:transport

Additional context

How to avoid data loss?

jianggj avatar May 13 '22 07:05 jianggj

I can reproduce that same bug on a cluster that use fluentbit(field collector) -> fluentd(forward) -> OpenSearch.

leowinterde avatar May 30 '22 12:05 leowinterde

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] avatar Aug 29 '22 10:08 github-actions[bot]

out_forward plugin applies at-most-one semantic by default: https://docs.fluentd.org/output/forward

The out_forward plugin supports at-most-once and at-least-once semantics. The default is at-most-once.

It means that out_forward plugin doesn't guarantee that sent messages are reached. If you want to guarantee to reach, you need to set require_ack_response: https://docs.fluentd.org/output/forward#require_ack_response

This output doc said, "forward input returns an error to forward output."

Hmm, sorry for that, the document is wrong. in_forward never send error, it will send ack response only when require_ack_response is set to out_forward and in_forward stores the data to buffer. FYI: The protocol specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#response

I'll fix the document.

ashie avatar Sep 05 '22 02:09 ashie

Hmm, sorry for that, the document is wrong. in_forward never send error, it will send ack response only when require_ack_response is set to out_forward and in_forward stores the data to buffer. FYI: The protocol specification: https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#response

I'll fix the document.

Thanks for your answer,after the require_ack_response is set to out_forward, data loss issue is resolved.

jianggj avatar Sep 07 '22 10:09 jianggj

Is the require_ack_response flag needed when I send data from fluentbit doku to fluentd via forward? Because then it changes to sending data as a chunk, right?

leowinterde avatar Sep 07 '22 10:09 leowinterde