fluent-plugin-kafka icon indicating copy to clipboard operation
fluent-plugin-kafka copied to clipboard

Output with kafka2 cannot dynamically define topic

Open mdraijer opened this issue 2 years ago • 4 comments

Describe the bug

When I add this to the match-tag (@type kafka2):

topic_key topic_${tag[0]}_${tag[1]}

then the topic name is set to the literal string topic_${tag[0]}_${tag[1]}.

Also the following variables do not have the desired effect:

  • ${tag}
  • $.my_record.my_element

To Reproduce

Create a Fluentd configuration with variables in the topic_key.

Expected behavior

Variables are resolved, like it is done in other places in the configuration.

Your Environment

- Fluentd version: 1.15.0
- fluent-plugin-kafka version: 0.17.5
- ruby-kafka version: 1.5.0
- TD Agent version,
- Operating system,
- Kernel version: As available in image bitnami/fluentd:1.15.0-debian-11-r3

Your Configuration

      <match kube-audit.**>
        @type route
        remove_tag_prefix kube-audit
        add_tag_prefix kube.audit
        <route **>
          copy
          @label @KAFKA
        </route>
      </match>
      <label @KAFKA>
        <filter kube.audit.**>
          @type parser
          key_name log
          <parse>
            @type json
          </parse>
        </filter>
        # Attempt to set the topic from a value in the record
        <filter **>
          @type record_transformer
          <record>
            topic_name topic_${tag_parts[0]}_${tag_parts[1]}
          </record>
        </filter>
        <match **>
          @type kafka2
          get_kafka_client_log false 
          brokers my_broker:9999
          required_acks 1
          <buffer topic>
            @type file
            chunk_limit_size 10M
            total_limit_size 1G
            path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
            flush_thread_count 1
            flush_mode interval
            flush_interval 5s
            retry_max_times 2
            retry_max_interval 30
          </buffer>
          # topic settings
          #topic_key topic_${tag[0]}_${tag[1]}
          #default_topic topic_${tag[0]}_${tag[1]}
          #topic_key topic_$.topic_name
          #default_topic topic_$.topic_name
          topic_key topic_kube_audit
          default_topic topic_kube_audit
          <format>
            @type json
          </format>
        </match>
      </label>

Your Error Log

2022-07-26 12:41:32 +0000 [warn]: #0 Send exception occurred: Kafka::InvalidTopic
2022-07-26 12:41:32 +0000 [warn]: #0 Exception Backtrace : /opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/protocol.rb:160:in `handle_error'
/opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/protocol/metadata_response.rb:134:in `partitions_for'
/opt/bitnami/fluentd/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:186:in `partitions_for'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:248:in `assign_partitions!'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:211:in `block in deliver_messages_with_retries'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:202:in `loop'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:202:in `deliver_messages_with_retries'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/kafka_producer_ext.rb:128:in `deliver_messages'
/opt/bitnami/fluentd/gems/fluent-plugin-kafka-0.17.5/lib/fluent/plugin/out_kafka2.rb:299:in `write'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:1180:in `try_flush'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:1501:in `flush_thread_run'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/output.rb:501:in `block (2 levels) in start'
/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

Additional context

Given this issue it should already have been fixed: #343

So maybe I'm just doing this wrong? Anyway, I cannot find in the documentation how to use this, so an update in the docs would be good.

mdraijer avatar Jul 26 '22 13:07 mdraijer

@repeatedly any tips?

mdraijer avatar Jul 28 '22 07:07 mdraijer

@cosmo0920 @ashie Maybe one of you can shed any light on this?

mdraijer avatar Aug 08 '22 07:08 mdraijer

Hey @mdraijer, make sure add in the chunk key tag in your Buffer section e.g.

<buffer tag>
# etc
</buffer>

raytung avatar Aug 14 '22 05:08 raytung

To use built-in placeholders, you should add tag and other custom metadata that are created from records in <buffer [[THIS PLACE ACCEPTS ARRAYS OF ATTRIBUTES]]> attributes.

ref: https://docs.fluentd.org/configuration/buffer-section#placeholders

cosmo0920 avatar Aug 15 '22 02:08 cosmo0920

@raytung @cosmo0920 thanks for the tips.

However: I tried it and got this logging (left out irrelevant/sensitive parts):

2022-08-15 09:05:51 +0000 [debug]: #0 36 messages send.
2022-08-15 09:05:51 +0000 [info]: #0 New topics added to target list: sp_${tag[0]}_${tag[1]}_${tag[2]}
2022-08-15 09:05:51 +0000 [info]: #0 Fetching cluster metadata from kafka://xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Opening connection to xxxx with client id xxxx...
[...]
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Sending topic_metadata API request 1 to xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Waiting for response 1 from xxxx
2022-08-15 09:05:51 +0000 [debug]: #0 Created new chunk chunk_id="5e643f03eb564242946d6bd50170dc83" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="xxxx.yyyy.zzzz.somethingelse", variables=nil, seq=0>
[...]
2022-08-15 09:05:51 +0000 [debug]: #0 [topic_metadata] Received response 1 from xxxx
2022-08-15 09:05:51 +0000 [info]: #0 Discovered cluster metadata; nodes: xxxx (node_id=1001), xxxx (node_id=1003), xxxx (node_id=1002)
2022-08-15 09:05:51 +0000 [debug]: #0 Closing socket to xxxx
2022-08-15 09:05:51 +0000 [error]: #0 Failed to assign partitions to 36 messages in sp_${tag[0]}_${tag[1]}_${tag[2]}
2022-08-15 09:05:51 +0000 [warn]: #0 Failed to send all messages to ; attempting retry 1 of 2 after 1s

based on this config (left out irrelevant/sensitive parts):

      # Send the messages to Kafka
      <match **>
        @type kafka2
        @log_level debug
        get_kafka_client_log true
        brokers xxxx,xxxx,xxxx
        required_acks 1
        <buffer tag>
          @type file
          chunk_limit_size 10M
          total_limit_size 1G
          path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
          flush_thread_count 1
          flush_mode interval
          flush_interval 5s
          retry_max_times 2
          retry_max_interval 30
        </buffer>
        topic_key sp_${tag[0]}_${tag[1]}_${tag[2]}
        default_topic sp_${tag[0]}_${tag[1]}_${tag[2]}

What confuses me a bit is that the definition of the buffer would impact the behaviour of the topic_key and default_topic, since those tags are not inside buffer. And, well, in my test it also didn't.

mdraijer avatar Aug 15 '22 09:08 mdraijer

Hey @mdraijer, note that only the topic parameter supports placeholders, topic_key and default_topic doesn't.

raytung avatar Aug 15 '22 23:08 raytung

The documentation (both https://docs.fluentd.org/output/kafka and https://github.com/fluent/fluent-plugin-kafka#output-plugin) says nothing about a parameter topic, just topic_key and default_topic...

mdraijer avatar Aug 16 '22 06:08 mdraijer

@mdraijer Agreed that we should update the documentation but it is in there https://github.com/fluent/fluent-plugin-kafka/blob/master/lib/fluent/plugin/out_kafka2.rb#L18

raytung avatar Aug 16 '22 06:08 raytung

Great! Thanks, I'll try that.

mdraijer avatar Aug 16 '22 06:08 mdraijer

Found it now:

  • Use parameters topic and default_topic, the latter with a fixed value; do not use topic_key.
  • The variable that is used in the value for topic (in my case $tag), use that as chunk key in the buffer part. E.g. <buffer tag> (note that the topic uses tag parts (${tag[0]} etc) but that still makes the chunk key tag).

Thanks for the help!

mdraijer avatar Aug 16 '22 07:08 mdraijer

Hi @mdraijer . Could you provide the configuration that resolves this issue? According to the documentation, I still don't quite understand.

Thanks firstly.

YuWan1117 avatar Jul 27 '23 06:07 YuWan1117

  <match **>
    @type kafka2
    get_kafka_client_log false

    brokers [snip]

    required_acks 1

    # # buffer settings
    <buffer tag>
      @type file
      chunk_limit_size 1MB # relates to max message size in Kafka, do not increase above 1M
      total_limit_size 1G
      path /opt/bitnami/fluentd/logs/buffers/kafka.buffer
      flush_thread_count 8
      flush_mode interval
      flush_interval 5s
      retry_timeout 1h
    </buffer>

    # topic settings
    topic sp_${tag[0]}_${tag[1]}_${tag[2]}
    default_topic sp_unknown_topic
    # data type settings
    <format>
      @type json
    </format>
  </match>

mdraijer avatar Jul 27 '23 07:07 mdraijer

Thank you very much @mdraijer . I will try later.

And my service is running ok by a new way (if record has 'topic' field, kafka plugin will uses it.) 参考文档

YuWan1117 avatar Jul 27 '23 07:07 YuWan1117