fluent-plugin-kafka
fluent-plugin-kafka copied to clipboard
Scram mechanism is not supported
trafficstars
Describe the bug
I've configured fluentd to fetch data from AWS MSK and transferring it to coralogix. Unfortunately, fluentd fails with the error "SCRAM-SHA-512 is not supported".
Note: msk cluster has been tested using kcat from pods
To Reproduce
- build the custom image
Dockerfile:
FROM coralogixrepo/coralogix-fluentd-multiarch:v0.0.9
USER root
RUN apt update \
&& apt -y upgrade \
&& apt install -y ruby-dev kafkacat \
&& gem install --no-document fluent-plugin-grok-parser fluent-plugin-kafka \
&& gem sources --clear-all \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem \
&& ulimit -n 65536
- pull coralogix helm chart and a) create secrets to provide private key and password, or specify them directly inside of fluentd.conf file b) update image to one you built previously as well as fluentd.conf
https://coralogix.com/docs/fluentd-helm-chart-for-kubernetes/
fluentd:
fullnameOverride: "fluentd-coralogix"
image:
repository: fluentd-test
tag: 0.0.1
resources:
requests:
cpu: 800m
memory: 900Mi
limits:
cpu: 800m
memory: 900Mi
configMapConfigs:
- fluentd-prometheus-conf
# - fluentd-systemd-conf
tolerations:
- operator: Exists
dashboards:
enabled: false
env:
- name: APP_NAME
value: $kubernetes.namespace_name
- name: SUB_SYSTEM
value: $kubernetes.container_name
- name: APP_NAME_SYSTEMD
value: systemd
- name: SUB_SYSTEM_SYSTEMD
value: kubelet.service
- name: ENDPOINT
value: api.coralogix.com
- name: "FLUENTD_CONF"
value: "../../etc/fluent/fluent.conf"
- name: LOG_LEVEL
value: error
- name: MAX_LOG_BUFFER_SIZE
value: "12582912"
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
envFrom:
- secretRef:
name: coralogix-keys
metrics:
serviceMonitor:
enabled: false
fileConfigs:
coralogix.conf: |-
<source>
@type kafka_group
brokers broker1,broker2,broker3
topics fluentd-topic
format json
consumer_group fluentd
sasl_over_ssl false
username "msk_username"
password "#{ENV['msk_password']}"
scram_mechanism "sha512"
ssl_ca_certs_from_system true
</source>
<match>
@type copy
<store ignore_error>
@type relabel
@label @coralogix
</store>
</match>
<label @coralogix>
<filter **>
@type record_transformer
enable_ruby
<record>
applicationName $${record["app"]}
subsystemName $${record["subsystem"]}
text $${record.to_json}
</record>
</filter>
<match **>
@type http
http_method post
endpoint https://api.coralogix.com/logs/rest/singles
headers "{\"private_key\" : \"#{ENV['PRIVATE_KEY']}\"}"
error_response_as_unrecoverable false
<buffer>
@type file
path /fluentd/log/buffer/coralogix
queue_limit_length 4
flush_thread_count 8
flush_mode interval
flush_interval 3s
total_limit_size 5000MB
chunk_limit_size 8MB
retry_max_interval 30
overflow_action throw_exception
</buffer>
</match>
</label>
- deploy the helm chart
Expected behavior
Fluentd should be able to authenticate against MSK
Your Environment
- Base image: coralogixrepo/coralogix-fluentd-multiarch:v0.0.9
- Fluentd version: 1.14.6
- fluent-plugin-kafka version: 0.18.1
- ruby-kafka version: 1.5.0
- Helm chart version: fluentd-coralogix-0.0.6.tgz
Your Configuration
<source>
@type kafka_group
brokers broker1,broker2,broker3
topics fluentd-topic
format json
consumer_group fluentd
sasl_over_ssl false
username "msk_username"
password "#{ENV['msk_password']}"
scram_mechanism "sha512"
ssl_ca_certs_from_system true
</source>
<match>
@type copy
<store ignore_error>
@type relabel
@label @coralogix
</store>
</match>
<label @coralogix>
<filter **>
@type record_transformer
enable_ruby
<record>
applicationName $${record["app"]}
subsystemName $${record["subsystem"]}
text $${record.to_json}
</record>
</filter>
<match **>
@type http
http_method post
endpoint https://api.coralogix.com/logs/rest/singles
headers "{\"private_key\" : \"#{ENV['PRIVATE_KEY']}\"}"
error_response_as_unrecoverable false
<buffer>
@type file
path /fluentd/log/buffer/coralogix
queue_limit_length 4
flush_thread_count 8
flush_mode interval
flush_interval 3s
total_limit_size 5000MB
chunk_limit_size 8MB
retry_max_interval 30
overflow_action throw_exception
</buffer>
</match>
</label>
Your Error Log
[ssm-user@ip-10-0-109-149 ~]$ k logs fluentd-coralogix-vsd5z
2023-02-10 15:20:22 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/../../etc/fluent/fluent.conf"
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-concat' version '2.5.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-coralogix' version '1.0.9'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-detect-exceptions' version '0.0.14'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '5.2.4'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-grok-parser' version '2.6.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-json-in-json-2' version '1.0.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kafka' version '0.18.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.11.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.9.5'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-multi-format-parser' version '1.0.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-parser-cri' version '0.1.1'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-prometheus' version '2.0.3'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-prometheus' version '2.0.2'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-record-modifier' version '2.1.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.4.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-sampling-filter' version '1.2.0'
2023-02-10 15:20:22 +0000 [info]: gem 'fluent-plugin-systemd' version '1.0.5'
2023-02-10 15:20:22 +0000 [info]: gem 'fluentd' version '1.14.6'
2023-02-10 15:20:23 +0000 [warn]: Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:23 +0000 [warn]: Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:23 +0000 [info]: Will watch for topics fluentd-topic at brokers b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098 and 'fluentd' group
2023-02-10 15:20:24 +0000 [info]: using configuration file: <ROOT>
<label @FLUENT_LOG>
<match **>
@type null
@id ignore_fluent_logs
</match>
</label>
<source>
@type kafka_group
brokers "b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098"
topics "fluentd-topic"
format "json"
consumer_group "fluentd"
sasl_over_ssl false
username "msk_username"
password xxxxxx
scram_mechanism "sha512"
ssl_ca_certs_from_system true
</source>
<match>
@type copy
<store ignore_error>
@type "relabel"
@label @coralogix
</store>
</match>
<label @coralogix>
<filter **>
@type record_transformer
enable_ruby
<record>
applicationName ${record["app"]}
subsystemName ${record["subsystem"]}
text ${record.to_json}
</record>
</filter>
<match **>
@type http
http_method post
endpoint "https://api.coralogix.com/logs/rest/singles"
headers {"private_key" : "XXXXXXX"}
error_response_as_unrecoverable false
<buffer>
@type "file"
path "/fluentd/log/buffer/coralogix"
queue_limit_length 4
flush_thread_count 8
flush_mode interval
flush_interval 3s
total_limit_size 5000MB
chunk_limit_size 8MB
retry_max_interval 30
overflow_action throw_exception
</buffer>
</match>
</label>
<source>
@type prometheus
@id in_prometheus
bind "0.0.0.0"
port 24231
metrics_path "/metrics"
</source>
<source>
@type prometheus_monitor
@id in_prometheus_monitor
</source>
<source>
@type prometheus_output_monitor
@id in_prometheus_output_monitor
</source>
</ROOT>
2023-02-10 15:20:24 +0000 [info]: starting fluentd-1.14.6 pid=6 ruby="2.7.5"
2023-02-10 15:20:24 +0000 [info]: spawn command to main: cmdline=["/usr/local/bin/ruby", "-Eascii-8bit:ascii-8bit", "/fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd", "-c", "/fluentd/etc/../../etc/fluent/fluent.conf", "-p", "/fluentd/plugins", "--gemfile", "/fluentd/Gemfile", "--under-supervisor"]
2023-02-10 15:20:29 +0000 [info]: adding match in @FLUENT_LOG pattern="**" type="null"
2023-02-10 15:20:29 +0000 [info]: adding filter in @coralogix pattern="**" type="record_transformer"
2023-02-10 15:20:29 +0000 [info]: adding match in @coralogix pattern="**" type="http"
2023-02-10 15:20:30 +0000 [warn]: #0 Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:30 +0000 [warn]: #0 Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish
2023-02-10 15:20:30 +0000 [info]: adding match pattern="**" type="copy"
2023-02-10 15:20:30 +0000 [info]: adding source type="kafka_group"
2023-02-10 15:20:30 +0000 [info]: #0 Will watch for topics fluentd-topic at brokers b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098,b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098 and 'fluentd' group
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus"
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus_monitor"
2023-02-10 15:20:30 +0000 [info]: adding source type="prometheus_output_monitor"
2023-02-10 15:20:31 +0000 [info]: #0 starting fluentd worker pid=11 ppid=6 worker=0
2023-02-10 15:20:31 +0000 [info]: #0 Subscribe to topic fluentd-topic
2023-02-10 15:20:32 +0000 [error]: #0 unexpected error error_class=Kafka::ConnectionError error="Could not connect to any of the seed brokers:\n- kafka://b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported."
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:454:in `fetch_cluster_info'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:405:in `cluster_info'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:105:in `refresh_metadata!'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/cluster.rb:59:in `add_target_topics'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer_group.rb:32:in `subscribe'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:624:in `subscribe_to_topic'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:608:in `block in scan_for_subscribing'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:601:in `each'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:601:in `scan_for_subscribing'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/ruby-kafka-1.5.0/lib/kafka/consumer.rb:118:in `subscribe'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:232:in `block in setup_consumer'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:224:in `each'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:224:in `setup_consumer'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluent-plugin-kafka-0.18.1/lib/fluent/plugin/in_kafka_group.rb:205:in `start'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/compat/call_super_mixin.rb:42:in `start'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:203:in `block in start'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:192:in `block (2 levels) in lifecycle'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:191:in `each'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:191:in `block in lifecycle'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:178:in `each'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:178:in `lifecycle'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/root_agent.rb:202:in `start'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/engine.rb:248:in `start'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/engine.rb:147:in `run'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:720:in `block in run_worker'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:971:in `main_process'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/supervisor.rb:711:in `run_worker'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/lib/fluent/command/fluentd.rb:376:in `<top (required)>'
2023-02-10 15:20:32 +0000 [error]: #0 /usr/local/lib/ruby/2.7.0/rubygems/core_ext/kernel_require.rb:83:in `require'
2023-02-10 15:20:32 +0000 [error]: #0 /usr/local/lib/ruby/2.7.0/rubygems/core_ext/kernel_require.rb:83:in `require'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/gems/fluentd-1.14.6/bin/fluentd:15:in `<top (required)>'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd:23:in `load'
2023-02-10 15:20:32 +0000 [error]: #0 /fluentd/vendor/bundle/ruby/2.7.0/bin/fluentd:23:in `<main>'
2023-02-10 15:20:32 +0000 [error]: #0 unexpected error error_class=Kafka::ConnectionError error="Could not connect to any of the seed brokers:\n- kafka://b-3.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-2.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported.\n- kafka://b-1.broker.c3.kafka.us-east-1.amazonaws.com:9098: SCRAM-SHA-512 is not supported."
2023-02-10 15:20:32 +0000 [error]: #0 suppressed same stacktrace
2023-02-10 15:20:32 +0000 [error]: Worker 0 finished unexpectedly with status 1
Additional context
No response
try using rdkafka instead
<match file.generic>
@type rdkafka2
enable_ruby
brokers "<broker>"
get_kafka_client_log true
default_topic custom
use_event_time true
username "#{ENV["nonprd_ULFF_KAFKA_USER"]}"
password "#{ENV["nonprd_ULFF_KAFKA_PASS"]}"
rdkafka_options {
"log_level":6,
"sasl.mechanism": "SCRAM-SHA-512",
"security.protocol": "sasl_ssl"
}