fluent-plugin-kafka
fluent-plugin-kafka copied to clipboard
Newly introduced share_producer setting in 0.18.0 for out_kafka2 plugin results in BufferOverflowError
Describe the bug
Producing to Kafka works as expected, with 0.17.5
However, with 0.18.0, producing to Kafka results in BufferOverflowError (essentially less performant?). This is true if share_producer setting is marked as true or even false
To Reproduce
Flow of traffic ~7000 events/sec
`
@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10
max_send_limit_bytes 2850000
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
`
Have tried multiple settings, to increase flush_thread_count and queued_chunks_limit_size, chunk size and total limit size etc, to remove overflow_action as well.
this fails even in a staging environment with very less traffic.
Using this in conjunction with s3-sqs input plugin.
Expected behavior
expected share_producer setting to work, without impact.
Your Environment
- Fluentd version: 1.15.1
- TD Agent version:
- fluent-plugin-kafka version: 0.18.0
- ruby-kafka version: 1.5.0
- Operating system:
- Kernel version:
Your Configuration
@type kafka2
# Hence, decreasing verbosity to warn
# For more info, see https://docs.fluentd.org/deployment/logging
brokers "#{ENV['KAFKA_BOOTSTRAP_SERVER']}"
default_topic events
# authentication settings
ssl_ca_cert ["/code/lumosingest/kafka-certs/cert.2.pem", "/code/lumosingest/kafka-certs/cert.1.pem"]
username "writer"
password "#{ENV['PW']}"
scram_mechanism "sha256"
sasl_over_ssl true
ssl_verify_hostname false
# producer settings
required_acks -1
max_send_retries 10
<inject>
time_key collected_at
time_type string
time_format %Y-%m-%dT%H:%M:%S.%3NZ
</inject>
<format>
@type json
</format>
get_kafka_client_log true
max_send_limit_bytes 2850000
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
Your Error Log
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:39:04 +0000 chunk="5e4f82e09e9659921a00f223c259a9ff" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:324:in `write'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:31:13 +0000 chunk="5e4f82d4f428c362d34da82c4fc7a24c" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:1180:in `try_flush'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 failed to flush the buffer. retry_times=12 next_retry_time=2022-07-29 23:33:00 +0000 chunk="5e4f82dabac326b0b9bc430e6c802f90" error_class=Kafka::BufferOverflow error="Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:210:in `produce'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [warn]: #0 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/event.rb:314:in `each'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.0/lib/fluent/plugin/out_kafka2.rb:363:in `block in write'
Jul 29, 2022 @ 15:24:49.811 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/output.rb:501:in `block (2 levels) in start'
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [debug]: #0 taking back chunk for errors. chunk="5e4f82dbda43ccd13a5217dd5fc50351"
Jul 29, 2022 @ 15:24:49.811 2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 suppressed same stacktrace
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 Send exception occurred: Cannot produce to cloudtrail_events, max buffer size (1000 messages) reached
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [info]: #0 initialized kafka producer: fluentd
Jul 29, 2022 @ 15:24:49.810 2022-07-29 22:24:49 +0000 [warn]: #0 Exception Backtrace : /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/producer.rb:525:in `buffer_overflow'
Jul 29, 2022 @ 15:24:49.810 /code/lumosingest/app/fluentd/gems/ruby/2.7.0/gems/fluentd-1.15.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in `open'
Additional context
No response
This issue also occurring with older version of fluentd (1.15.0) also if that helps. Also When using the
With adding the chunk_limit_records (set to 800), The following errors came
2022-08-08 16:44:37 +0000 [warn]: #3 chunk size limit exceeds for an emitted event stream: 1016records
2022-08-08 16:44:37 +0000 [warn]: #3 emit transaction failed: error_class=NoMethodError error="undefined method `synchronize' for nil:NilClass" location="/usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/buffer.rb:419:in `block in write'" tag="<log file name>"
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:198:in `rescue in emit_events'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:195:in `emit_events'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/event_router.rb:115:in `emit_stream'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:620:in `receive_lines'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1109:in `block in handle_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1145:in `with_io'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1069:in `handle_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `block in on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `synchronize'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:1011:in `on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:822:in `on_notify'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:452:in `construct_watcher'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:457:in `block in start_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `each_value'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:456:in `start_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:389:in `refresh_watchers'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/plugin/in_tail.rb:259:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:203:in `block in start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:192:in `block (2 levels) in lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `each'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:191:in `block in lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `each'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:178:in `lifecycle'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/root_agent.rb:202:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:248:in `start'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/engine.rb:147:in `run'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:760:in `block in run_worker'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:1036:in `main_process'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/supervisor.rb:751:in `run_worker'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/lib/fluent/command/fluentd.rb:386:in `<top (required)>'
2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
2022-08-08 16:44:37 +0000 [warn]: #3 <internal:/usr/share/rubygems/rubygems/core_ext/kernel_require.rb>:85:in `require'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/share/gems/gems/fluentd-1.15.1/bin/fluentd:15:in `<top (required)>'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `load'
2022-08-08 16:44:37 +0000 [warn]: #3 /usr/local/bin/fluentd:23:in `<main>'
Config:
<match **>
@type copy
<store>
@type kafka2
brokers <broker address>
default_topic <kafka topic>
max_send_retries 3
required_acks -1
get_kafka_client_log false
compression_codec gzip
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer
chunk_limit_size 128MB
total_limit_size 2560MB
flush_mode immediate
flush_thread_count 15
flush_thread_interval 1
retry_type exponential_backoff
retry_max_interval 90
chunk_limit_records 800
compress gzip
</buffer>
</store>
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of outgoing records
<labels>
hostname "#{IPSocket.getaddress(Socket.gethostname)}"
</labels>
</metric>
</store>
</match>
Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config
<source>
@type sample
sample {"hello": "world"}
rate 7000
tag sample
</source>
<match **>
@type kafka2
brokers "broker:29092"
default_topic events
get_kafka_client_log true
shared_producer true
<format>
@type json
</format>
<buffer events>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
flush_thread_count 16
queued_chunks_limit_size 16
chunk_limit_size 3MB
chunk_full_threshold 0.8
total_limit_size 1024MB
overflow_action block
</buffer>
</match>
Adding chunk_limit_records 1250
to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.
Hey @pchheda-lyft thanks for reporting this. I am able to reproduce this with the following config
<source> @type sample sample {"hello": "world"} rate 7000 tag sample </source> <match **> @type kafka2 brokers "broker:29092" default_topic events get_kafka_client_log true shared_producer true <format> @type json </format> <buffer events> flush_at_shutdown true flush_mode interval flush_interval 1s flush_thread_count 16 queued_chunks_limit_size 16 chunk_limit_size 3MB chunk_full_threshold 0.8 total_limit_size 1024MB overflow_action block </buffer> </match>
Adding
chunk_limit_records 1250
to the buffer section (1000 message / 0.8 threshold) seems to have "fixed" the issue. I'm guessing this is far from your ideal throughput, however. I have identified the root cause and will be putting up a fix soon.
Thanks for working on this! Yeah, I don't thinking adding chunk_limit_records should be needed. Looking forward to the fix! Appreciate the work!