Not able to send messages from fluentd to kafka with zstd compression codec
Describe the bug
I am trying to send messages from fluentd to kafka with compression codec zstd. But the message passing fails with an error saying that libzstd not available at build time. The error is thrown by rdkafka plugin. I have installed the gem from fluent-package-v5.0.7 package.
To Reproduce
I am using below config with compression codec set to zstd with @type rdkafk2
<source>
@type exec
tag kafka.message
command echo '{"message":"Hello there [rdkafka2 - GSSAPI - test3 zstd]!"}'
run_interval 10s
<parse>
@type json
</parse>
</source>
<match kafka.message>
@type rdkafka2
brokers kf-test-kafka-0.kf-test-kafka-headless.preeti-krb-system.svc.cluster.local:9092
topic_key topic
default_topic fluentd-zstd
use_event_time true
principal <<krb_principle>>
keytab /etc/kerberos/secret/keytab
compression_codec zstd
ssl_ca_cert /etc/kerberos/kafka/tls.crt
ssl_client_cert /etc/kerberos/kafka/tls.crt
ssl_client_cert_key /etc/kerberos/kafka/tls.key
ssl_client_cert_key_password test1234
# ssl_verify_hostname false
<format>
@type json
</format>
<buffer topic>
@type file
path /var/log/fluentd-buffers/kafka
flush_mode interval
flush_interval 5s
chunk_limit_size 1m
queue_limit_length 32
retry_max_interval 30
retry_forever true
</buffer>
log_level debug
</match>
I have setup a kafka broker for receiving messages.
Expected behavior
rdkafka is supporting gzip, snappy and lz4 compression codecs except for zstd. Ideally it would be expected that all compression codecs are supported. fluent-plugin-kafka needs additional gem for supporting compression codecs other than gzip, which is properly documented as well. Is this an expected behavior? Since the gem is installed with fluent-package-5.0.7? But I have tried with fluent-package-6.0.0 as well where it has gem rdkafka 0.21.0 version which is greater than 0.12.0 present in 5.0.7 package, and it does not work in latest version as well.
Your Environment
- Fluentd version: 1.16.9
- fluent-plugin-kafka version: 0.19.3
- ruby-kafka version: 1.5.0
- rdkafka version: 0.12.0
- Operating system: Rocky 8 Linux
- Kernel version: 5.14
Your Configuration
<match kafka.message>
@type rdkafka2
brokers kf-test-kafka-0.kf-test-kafka-headless.preeti-krb-system.svc.cluster.local:9092
topic_key topic
default_topic fluentd-zstd
use_event_time true
principal <<krb_principle>>
keytab /etc/kerberos/secret/keytab
compression_codec zstd
ssl_ca_cert /etc/kerberos/kafka/tls.crt
ssl_client_cert /etc/kerberos/kafka/tls.crt
ssl_client_cert_key /etc/kerberos/kafka/tls.key
ssl_client_cert_key_password test1234
# ssl_verify_hostname false
<format>
@type json
</format>
<buffer topic>
@type file
path /var/log/fluentd-buffers/kafka
flush_mode interval
flush_interval 5s
chunk_limit_size 1m
queue_limit_length 32
retry_max_interval 30
retry_forever true
</buffer>
log_level debug
</match>
Your Error Log
#0 Send exception occurred: Unsupported value "zstd" for configuration property "compression.codec": libzstd not available at build time at /opt/fluent/lib/ruby/gems/3.2.0/gems/rdkafka-0.12.0/lib/rdkafka/config.rb:226:in `block (2 levels) in native_config'getting this error when i am trying to send messages to kafka using rdkafka plugin and zstd compression codec
Additional context
Now, I do understand that the error outcome is a result of the fact that the rdkafka 0.12.0 (coming from fluent-package-v5.0.7) has not been built with zstd support. Because when I switch to explicit installation of the rdkafka gem with libzstd-devel package, it works and does not throw the above mentioned error.
I wanted to confirm if that is the case and the rdkafka gem present in fluent-package-v5.0.7 does not support zstd by default?
According to the test code of rdkafka-ruby,
it seems necessary to set Rdkafka::Config.new('compression.codec' => 'zstd').
https://github.com/karafka/rdkafka-ruby/blob/4802abf1c7dd043ef418888ebbe8adddc49ade59/spec/lib/rdkafka/config_spec.rb#L227
rdkafka gem bundled in fluent-package does not enable zstd feature at build time.
So, you need to re-install rdkafka gem.
$ fluent-gem install rdkafka
Then, you need to add rdkafka_options { "compression.codec" : "zstd" } in your fluentd configuration, like:
<source>
@type sample
sample { "message" : "hello" }
tag sample
</source>
<match sample>
@type rdkafka2
# broker
brokers "10.66.228.1:9092"
# topic settings
default_topic test_topic
# Specify compression.codec to use zstd feature.
rdkafka_options { "compression.codec" : "zstd" }
...
</match>
Thank you @Watson1978 for confirming this.
If you can install rdkafka 0.22.0 or above, I think it is precompiled with libraries such as librdkaka and libzstd.
When installing, the platform name such as XXX-linux-gnu should be displayed.
$ fluent-gem install rdkafka
Fetching rdkafka-0.23.1-x86_64-linux-gnu.gem
Successfully installed rdkafka-0.23.1-x86_64-linux-gnu
1 gem installed
If you can install rdkafka 0.22.0 or above, I think it is precompiled with libraries such as librdkaka and libzstd.
Yes it is (author here)