in_kafka Fluent-bit kafka consumer can't reach high throughput
I'm working on replacing promtail kafka consumer with fluent-bit in high-load setup. Overall, our configuration of fluent-bit kafka consumer works and it consumes messages, but the problem is that it can't reach consumption throughput that is successfully handled by promtail. For example, promtail consumer is able to reach 140-150MB/s and max that we can squeeze from fluent-bit kafka consumer running the same resources - 10-15 MB/s.
Configuration
Fluent-bit v2.1.8
Kafka Brokers and Topics:
6 brokers kafka.m5.large (2 vCPU, 8 GB)
Topic with 128 partitions, replication factor = 3
{
"min.insync.replicas" = 2
"local.retention.ms" = "7200000" // 2h
"retention.ms" = "21400000" // 6h
"segment.ms" = "21400000" // 6h
"compression.type" = "snappy"
}
Fluent-bit Config
[SERVICE]
Daemon Off
Parsers_File parsers.conf
Parsers_File custom_parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
Health_Check Off
[INPUT]
name kafka
brokers <kafka brokers>
topics <topic name>
group_id <fluentbit consumer group name>
poll_ms 100
rdkafka.security.protocol ssl
rdkafka.queued.min.messages 500000
rdkafka.fetch.wait.max.ms 250
rdkafka.socket.blocking.max.ms 250
rdkafka.fetch.error.backoff.ms 1000
rdkafka.partition.assignment.strategy roundrobin
[INPUT]
Name fluentbit_metrics
Tag internal_metrics
[FILTER]
Name parser
Match kafka*
Key_Name payload
Reserve_Data true
Parser logs-kafka
[PARSER]
Name logs-kafka
Format json
[OUTPUT]
name null
alias devnull
match*
[OUTPUT]
Name prometheus_exporter
Match internal_metrics
Port 2020
Fluent-bit Resources:
128 pods running at c5n.large:
Limits:
cpu: 2
memory: 4Gi
Requests:
cpu: 1
memory: 2Gi
Promtail Config:
scrapeConfigs: |
- job_name: kafka
kafka:
brokers:
- <kafka brokers>
- ...
group_id: <group name>
authentication:
type: ssl
topics:
- mi-logs
version: 2.8.1
assignor: roundrobin
use_incoming_timestamp: false
relabel_configs:
- action: replace
source_labels:
- __meta_kafka_topic
target_label: kafka_topic
- action: replace
source_labels:
- __meta_kafka_group_id
target_label: kafka_group_id
pipeline_stages:
- json:
expressions:
job: job
instance_id: instance_id
instance: instance
availability_zone: availability_zone
datacenter: datacenter
role: role
filename: filename
- labels:
job:
instance_id:
instance:
availability_zone:
datacenter:
role:
filename:
clients:
- url: <loki url>
backoff_config:
max_period: 600s
min_period: 500ms
max_retries: 144
timeout: 60s
batchsize: 95240
batchwait: 2s
Promtail Resources:
128 pods running at c5n.large:
Limits:
cpu: 2
memory: 4Gi
Requests:
cpu: 1
memory: 2Gi
Our goal is to ensure fluent-bit can reach throughput that is handled by promtail. During POC run (fluent-bit kafka consumer), all kafka partitions in our consumer group are properly distributed between fluent-bit consumers. We use fluentbit_input_bytes_total metric to get the rate of fluent-bit kafka consumer throughput. Fluent-bit doesn't restart or crash - it just consumers messages but slowly.
We noticed that CPU of the fluent-bit consumers doesn't increase higher than 0.04-0.05 (in k8s CPU units) when there are messages available to consume from kafka. Like it waits for some backoff timer and reads messages only occasionally.
Anyone?
I run into the same problem, I can't limit or improve the throughput of consumers through any means and configuration items, except to add instances of fluent-bit, maybe using fluent-bit is not a good idea?
I have also encountered that each Fluent-Bit resource usage rate is not high, but the consumption rate is not up
I am also experiencing the same problem. I have tried and tested many properties of librdkafka, but there hasn't been any improvement. Here is my configuration:
[INPUT]
Name kafka
brokers kafka:9092
topics test-inter
poll_ms 100
group_id test004
rdkafka.queued.min.messages 500000
rdkafka.partition.assignment.strategy roundrobin
rdkafka.auto.commit.interval.ms 1
rdkafka.fetch.wait.max.ms 100
rdkafka.enable.partition.eof false
rdkafka.enable.auto.commit true
rdkafka.enable.auto.offset.store true
rdkafka.statistics.interval.ms 1000
rdkafka.fetch.message.max.bytes 10000000
rdkafka.max.partition.fetch.bytes 10000000
rdkafka.socket.send.buffer.bytes 100
rdkafka.fetch.error.backoff.ms 1000
[OUTPUT]
Name null
Match *
The problem seems linked to the fact that a call to rd_kafka_commit is performed for every single message.
If we look at https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, rd_kafka_commit is called every 1000 messages.
The following change taken for the site above is dramatically improving performance
static void in_kafka_callback(int write_fd, void *data)
{
struct flb_input_thread *it = data;
struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it);
mpack_writer_t *writer = &ctx->it.writer;
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
while (!flb_input_thread_exited(it)) {
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500);
if (rkm) {
process_message(writer, rkm);
fflush(ctx->it.write_file);
rd_kafka_message_destroy(rkm);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}
}
}
In https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, it is mentioned "In this example, we trigger a synchronous commit every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly." I have not enough knowledge in Kafka to figure out what should be the best fix ( time based vs item count commit, asynchronous/synchronous), hope this helps anyway
It seems related to https://github.com/fluent/fluent-bit/issues/8400
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.
This issue was closed because it has been stalled for 5 days with no activity.
cc: @lecaros
any update ?
#9726 I have created a PR for this. It added an option to enable auto-commit to reach high throughput.
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.
This issue was closed because it has been stalled for 5 days with no activity.