fluent-plugin-kafka icon indicating copy to clipboard operation
fluent-plugin-kafka copied to clipboard

Not able to send messages from fluentd to kafka with zstd compression codec

Open RamHaridas opened this issue 3 months ago • 4 comments

Describe the bug

I am trying to send messages from fluentd to kafka with compression codec zstd. But the message passing fails with an error saying that libzstd not available at build time. The error is thrown by rdkafka plugin. I have installed the gem from fluent-package-v5.0.7 package.

To Reproduce

I am using below config with compression codec set to zstd with @type rdkafk2

   <source>
      @type exec
      tag kafka.message
      command echo '{"message":"Hello there [rdkafka2 - GSSAPI - test3 zstd]!"}'
      run_interval 10s
      <parse>
        @type json
      </parse>
    </source>
    <match kafka.message>
      @type rdkafka2
      brokers kf-test-kafka-0.kf-test-kafka-headless.preeti-krb-system.svc.cluster.local:9092
      topic_key topic
      default_topic fluentd-zstd
      use_event_time true
      principal <<krb_principle>>
      keytab /etc/kerberos/secret/keytab
      compression_codec zstd
      ssl_ca_cert /etc/kerberos/kafka/tls.crt
      ssl_client_cert /etc/kerberos/kafka/tls.crt
      ssl_client_cert_key /etc/kerberos/kafka/tls.key
      ssl_client_cert_key_password test1234
      # ssl_verify_hostname false
      <format>
        @type json
      </format>
      <buffer topic>
        @type file
        path /var/log/fluentd-buffers/kafka
        flush_mode interval
        flush_interval 5s
        chunk_limit_size 1m
        queue_limit_length 32
        retry_max_interval 30
        retry_forever true
      </buffer>
      log_level debug
    </match>

I have setup a kafka broker for receiving messages.

Expected behavior

rdkafka is supporting gzip, snappy and lz4 compression codecs except for zstd. Ideally it would be expected that all compression codecs are supported. fluent-plugin-kafka needs additional gem for supporting compression codecs other than gzip, which is properly documented as well. Is this an expected behavior? Since the gem is installed with fluent-package-5.0.7? But I have tried with fluent-package-6.0.0 as well where it has gem rdkafka 0.21.0 version which is greater than 0.12.0 present in 5.0.7 package, and it does not work in latest version as well.

Your Environment

- Fluentd version: 1.16.9
- fluent-plugin-kafka version: 0.19.3
- ruby-kafka version: 1.5.0
- rdkafka version: 0.12.0
- Operating system: Rocky 8 Linux
- Kernel version: 5.14

Your Configuration

    <match kafka.message>
      @type rdkafka2
      brokers kf-test-kafka-0.kf-test-kafka-headless.preeti-krb-system.svc.cluster.local:9092
      topic_key topic
      default_topic fluentd-zstd
      use_event_time true
      principal <<krb_principle>>
      keytab /etc/kerberos/secret/keytab
      compression_codec zstd
      ssl_ca_cert /etc/kerberos/kafka/tls.crt
      ssl_client_cert /etc/kerberos/kafka/tls.crt
      ssl_client_cert_key /etc/kerberos/kafka/tls.key
      ssl_client_cert_key_password test1234
      # ssl_verify_hostname false
      <format>
        @type json
      </format>
      <buffer topic>
        @type file
        path /var/log/fluentd-buffers/kafka
        flush_mode interval
        flush_interval 5s
        chunk_limit_size 1m
        queue_limit_length 32
        retry_max_interval 30
        retry_forever true
      </buffer>
      log_level debug
    </match>

Your Error Log

#0 Send exception occurred: Unsupported value "zstd" for configuration property "compression.codec": libzstd not available at build time at /opt/fluent/lib/ruby/gems/3.2.0/gems/rdkafka-0.12.0/lib/rdkafka/config.rb:226:in `block (2 levels) in native_config'getting this error when i am trying to send messages to kafka using rdkafka plugin and zstd compression codec

Additional context

Now, I do understand that the error outcome is a result of the fact that the rdkafka 0.12.0 (coming from fluent-package-v5.0.7) has not been built with zstd support. Because when I switch to explicit installation of the rdkafka gem with libzstd-devel package, it works and does not throw the above mentioned error.

I wanted to confirm if that is the case and the rdkafka gem present in fluent-package-v5.0.7 does not support zstd by default?

RamHaridas avatar Sep 24 '25 13:09 RamHaridas

According to the test code of rdkafka-ruby, it seems necessary to set Rdkafka::Config.new('compression.codec' => 'zstd'). https://github.com/karafka/rdkafka-ruby/blob/4802abf1c7dd043ef418888ebbe8adddc49ade59/spec/lib/rdkafka/config_spec.rb#L227

rdkafka gem bundled in fluent-package does not enable zstd feature at build time. So, you need to re-install rdkafka gem.

$ fluent-gem install rdkafka

Then, you need to add rdkafka_options { "compression.codec" : "zstd" } in your fluentd configuration, like:

<source>
  @type sample
  sample { "message" : "hello" }
  tag sample
</source>

<match sample>
  @type rdkafka2

  # broker
  brokers "10.66.228.1:9092"

  # topic settings
  default_topic test_topic

  # Specify compression.codec to use zstd feature.
  rdkafka_options { "compression.codec" : "zstd" }


...

</match>

Watson1978 avatar Oct 06 '25 09:10 Watson1978

Thank you @Watson1978 for confirming this.

RamHaridas avatar Oct 06 '25 12:10 RamHaridas

If you can install rdkafka 0.22.0 or above, I think it is precompiled with libraries such as librdkaka and libzstd.

When installing, the platform name such as XXX-linux-gnu should be displayed.

$ fluent-gem install rdkafka
Fetching rdkafka-0.23.1-x86_64-linux-gnu.gem
Successfully installed rdkafka-0.23.1-x86_64-linux-gnu
1 gem installed

Watson1978 avatar Oct 07 '25 01:10 Watson1978

If you can install rdkafka 0.22.0 or above, I think it is precompiled with libraries such as librdkaka and libzstd.

Yes it is (author here)

mensfeld avatar Nov 14 '25 07:11 mensfeld