fluent-plugin-kafka
fluent-plugin-kafka copied to clipboard
Output with kafka2 cannot dynamically define topic
Describe the bug
When I add this to the match-tag (@type kafka2):
topic_key topic_${tag[0]}_${tag[1]}
then the topic name is set to the literal string topic_${tag[0]}_${tag[1]}
.
Also the following variables do not have the desired effect:
-
${tag}
-
$.my_record.my_element
To Reproduce
Create a Fluentd configuration with variables in the topic_key.
Expected behavior
Variables are resolved, like it is done in other places in the configuration.
Your Environment
- Fluentd version: 1.15.0
- fluent-plugin-kafka version: 0.17.5
- ruby-kafka version: 1.5.0
- TD Agent version,
- Operating system,
- Kernel version: As available in image bitnami/fluentd:1.15.0-debian-11-r3
Your Configuration
<match kube-audit.**>
@type route
remove_tag_prefix kube-audit
add_tag_prefix kube.audit
<route **>
copy
@label @KAFKA
</route>
</match>
<label @KAFKA>
<filter kube.audit.**>
@type parser
key_name log
<parse>
@type json
</parse>
</filter>
# Attempt to set the topic from a value in the record
<filter **>
@type record_transformer
<record>
topic_name topic_${tag_parts[0]}_${tag_parts[1]}
</record>
</filter>
<match **>
@type kafka2
get_kafka_client_log false
brokers my_broker:9999
required_acks 1
<buffer topic>
@type file
chunk_limit_size 10M
total_limit_size 1G
path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
flush_thread_count 1
flush_mode interval
flush_interval 5s
retry_max_times 2
retry_max_interval 30
</buffer>
# topic settings
#topic_key topic_${tag[0]}_${tag[1]}
#default_topic topic_${tag[0]}_${tag[1]}
#topic_key topic_$.topic_name
#default_topic topic_$.topic_name
topic_key topic_kube_audit
default_topic topic_kube_audit
<format>
@type json
</format>
</match>
</label>
Your Error Log
2022-07-26 12:41:32 +0000 [warn]: #0 Send exception occurred: Kafka::InvalidTopic
2022-07-26 12:41:32 +0000 [warn]: #0 Exception Backtrace : /opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/protocol.rb:160:in `handle_error'
/opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
/opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:186:in `partitions_for'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:248:in `assign_partitions!'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:211:in `block in deliver_messages_with_retries'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:202:in `loop'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:202:in `deliver_messages_with_retries'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:128:in `deliver_messages'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/out_kafka2.rb:299:in `write'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:1180:in `try_flush'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:501:in `block (2 levels) in start'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
Additional context
Given this issue it should already have been fixed: #343
So maybe I'm just doing this wrong? Anyway, I cannot find in the documentation how to use this, so an update in the docs would be good.
@repeatedly any tips?
@cosmo0920 @ashie Maybe one of you can shed any light on this?
Hey @mdraijer, make sure add in the chunk key tag
in your Buffer section e.g.
<buffer tag>
# etc
</buffer>
To use built-in placeholders, you should add tag and other custom metadata that are created from records in <buffer [[THIS PLACE ACCEPTS ARRAYS OF ATTRIBUTES]]>
attributes.
ref: https://docs.fluentd.org/configuration/buffer-section#placeholders
@raytung @cosmo0920 thanks for the tips.
However: I tried it and got this logging (left out irrelevant/sensitive parts):
2022-08-15 09:05:51 +0000 [debug]: #0 36 messages send.
2022-08-15 09:05:51 +0000 [info]: #0 New topics added to target list: sp_${tag[0]}_${tag[1]}_${tag[2]}
2022-08-15 09:05:51 +0000 [info]: #0 Fetching cluster metadata from kafka://xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Opening connection to xxxx with client id xxxx...
[...]
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Sending topic_metadata API request 1 to xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Waiting for response 1 from xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 Created new chunk chunk_id="5e643f03eb564242946d6bd50170dc83" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="xxxx.yyyy.zzzz.somethingelse", variables=nil, seq=0>
[...]
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Received response 1 from xxxx
2022-08-15 09:05:51 +0000 [info]: #0 Discovered cluster metadata; nodes: xxxx (node_id=1001), xxxx (node_id=1003), xxxx (node_id=1002)
2022-08-15 09:05:51 +0000 [debug]: #0 Closing socket to xxxx
2022-08-15 09:05:51 +0000 [error]: #0 Failed to assign partitions to 36 messages in sp_${tag[0]}_${tag[1]}_${tag[2]}
2022-08-15 09:05:51 +0000 [warn]: #0 Failed to send all messages to ; attempting retry 1 of 2 after 1s
based on this config (left out irrelevant/sensitive parts):
# Send the messages to Kafka
<match **>
@type kafka2
@log_level debug
get_kafka_client_log true
brokers xxxx,xxxx,xxxx
required_acks 1
<buffer tag>
@type file
chunk_limit_size 10M
total_limit_size 1G
path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
flush_thread_count 1
flush_mode interval
flush_interval 5s
retry_max_times 2
retry_max_interval 30
</buffer>
topic_key sp_${tag[0]}_${tag[1]}_${tag[2]}
default_topic sp_${tag[0]}_${tag[1]}_${tag[2]}
What confuses me a bit is that the definition of the buffer would impact the behaviour of the topic_key and default_topic, since those tags are not inside buffer. And, well, in my test it also didn't.
Hey @mdraijer, note that only the topic
parameter supports placeholders, topic_key
and default_topic
doesn't.
The documentation (both https://docs.fluentd.org/output/kafka and https://github.com/fluent/fluent-plugin-kafka#output-plugin) says nothing about a parameter topic
, just topic_key
and default_topic
...
@mdraijer Agreed that we should update the documentation but it is in there https://github.com/fluent/fluent-plugin-kafka/blob/master/lib/fluent/plugin/out_kafka2.rb#L18
Great! Thanks, I'll try that.
Found it now:
- Use parameters
topic
anddefault_topic
, the latter with a fixed value; do not usetopic_key
. - The variable that is used in the value for
topic
(in my case$tag
), use that as chunk key in thebuffer
part. E.g.<buffer tag>
(note that the topic uses tag parts (${tag[0]}
etc) but that still makes the chunk keytag
).
Thanks for the help!
Hi @mdraijer . Could you provide the configuration that resolves this issue? According to the documentation, I still don't quite understand.
Thanks firstly.
<match **>
@type kafka2
get_kafka_client_log false
brokers [snip]
required_acks 1
# # buffer settings
<buffer tag>
@type file
chunk_limit_size 1MB # relates to max message size in Kafka, do not increase above 1M
total_limit_size 1G
path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
flush_thread_count 8
flush_mode interval
flush_interval 5s
retry_timeout 1h
</buffer>
# topic settings
topic sp_${tag[0]}_${tag[1]}_${tag[2]}
default_topic sp_unknown_topic
# data type settings
<format>
@type json
</format>
</match>
Thank you very much @mdraijer . I will try later.
And my service is running ok by a new way (if record has 'topic' field, kafka plugin will uses it.) 参考文档