fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

in_kafka Fluent-bit kafka consumer can't reach high throughput

Open anosulchik opened this issue 2 years ago • 10 comments

I'm working on replacing promtail kafka consumer with fluent-bit in high-load setup. Overall, our configuration of fluent-bit kafka consumer works and it consumes messages, but the problem is that it can't reach consumption throughput that is successfully handled by promtail. For example, promtail consumer is able to reach 140-150MB/s and max that we can squeeze from fluent-bit kafka consumer running the same resources - 10-15 MB/s.

Configuration

Fluent-bit v2.1.8

Kafka Brokers and Topics: 6 brokers kafka.m5.large (2 vCPU, 8 GB) Topic with 128 partitions, replication factor = 3

{
    "min.insync.replicas" = 2
    "local.retention.ms"  = "7200000"  // 2h
    "retention.ms"        = "21400000" // 6h
    "segment.ms"          = "21400000" // 6h
    "compression.type"    = "snappy"
  }

Fluent-bit Config

      [SERVICE]
          Daemon Off
          Parsers_File parsers.conf
          Parsers_File custom_parsers.conf
          HTTP_Server On
          HTTP_Listen 0.0.0.0
          HTTP_Port 2020
          Health_Check Off

      [INPUT]
          name kafka
          brokers <kafka brokers>
          topics <topic name>
          group_id <fluentbit consumer group name>
          poll_ms 100
          rdkafka.security.protocol ssl
          rdkafka.queued.min.messages 500000
          rdkafka.fetch.wait.max.ms 250
          rdkafka.socket.blocking.max.ms 250
          rdkafka.fetch.error.backoff.ms 1000
          rdkafka.partition.assignment.strategy roundrobin

      [INPUT]
          Name fluentbit_metrics
          Tag internal_metrics

      [FILTER]
          Name parser
          Match kafka*
          Key_Name payload
          Reserve_Data true
          Parser logs-kafka

      [PARSER]
          Name logs-kafka
          Format json

      [OUTPUT]
          name null
          alias devnull
          match*

      [OUTPUT]
          Name prometheus_exporter
          Match internal_metrics
          Port 2020

Fluent-bit Resources:

128 pods running at c5n.large:

    Limits:
      cpu:     2
      memory:  4Gi
    Requests:
      cpu:      1
      memory:   2Gi

Promtail Config:

scrapeConfigs: |
        - job_name: kafka
          kafka:
            brokers:
            - <kafka brokers>
            - ...
            group_id: <group name>
            authentication:
              type: ssl
            topics:
              - mi-logs
            version: 2.8.1
            assignor: roundrobin
            use_incoming_timestamp: false
          relabel_configs:
          - action: replace
            source_labels:
              - __meta_kafka_topic
            target_label: kafka_topic
          - action: replace
            source_labels:
              - __meta_kafka_group_id
            target_label: kafka_group_id
          pipeline_stages:
            - json:
                expressions:
                  job: job
                  instance_id: instance_id
                  instance: instance
                  availability_zone: availability_zone
                  datacenter: datacenter
                  role: role
                  filename: filename
            - labels:
                job:
                instance_id:
                instance:
                availability_zone:
                datacenter:
                role:
                filename:
    clients:
      - url: <loki url>
        backoff_config:
          max_period: 600s
          min_period: 500ms
          max_retries: 144
        timeout: 60s
        batchsize: 95240
        batchwait: 2s

Promtail Resources:

128 pods running at c5n.large:

    Limits:
      cpu:     2
      memory:  4Gi
    Requests:
      cpu:      1
      memory:   2Gi

Our goal is to ensure fluent-bit can reach throughput that is handled by promtail. During POC run (fluent-bit kafka consumer), all kafka partitions in our consumer group are properly distributed between fluent-bit consumers. We use fluentbit_input_bytes_total metric to get the rate of fluent-bit kafka consumer throughput. Fluent-bit doesn't restart or crash - it just consumers messages but slowly.

We noticed that CPU of the fluent-bit consumers doesn't increase higher than 0.04-0.05 (in k8s CPU units) when there are messages available to consume from kafka. Like it waits for some backoff timer and reads messages only occasionally.

Loki___Fluentbit_-Loki-Dashboards-_Grafana

anosulchik avatar Sep 29 '23 21:09 anosulchik

Anyone?

anosulchik avatar Oct 09 '23 15:10 anosulchik

I run into the same problem, I can't limit or improve the throughput of consumers through any means and configuration items, except to add instances of fluent-bit, maybe using fluent-bit is not a good idea?

1123183721 avatar Dec 08 '23 02:12 1123183721

I have also encountered that each Fluent-Bit resource usage rate is not high, but the consumption rate is not up

tianhao98 avatar Dec 19 '23 03:12 tianhao98

I am also experiencing the same problem. I have tried and tested many properties of librdkafka, but there hasn't been any improvement. Here is my configuration:

[INPUT]
    Name kafka
    brokers kafka:9092
    topics test-inter
    poll_ms 100
    group_id test004
    rdkafka.queued.min.messages 500000
    rdkafka.partition.assignment.strategy roundrobin

    rdkafka.auto.commit.interval.ms 1
    rdkafka.fetch.wait.max.ms 100
    rdkafka.enable.partition.eof false
    rdkafka.enable.auto.commit true
    rdkafka.enable.auto.offset.store true
    rdkafka.statistics.interval.ms 1000
    rdkafka.fetch.message.max.bytes 10000000
    rdkafka.max.partition.fetch.bytes 10000000
    rdkafka.socket.send.buffer.bytes 100
    rdkafka.fetch.error.backoff.ms 1000

[OUTPUT]
    Name null
    Match *

aliSadegh avatar Dec 26 '23 10:12 aliSadegh

The problem seems linked to the fact that a call to rd_kafka_commit is performed for every single message. If we look at https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, rd_kafka_commit is called every 1000 messages.

The following change taken for the site above is dramatically improving performance

static void in_kafka_callback(int write_fd, void *data)
{
    struct flb_input_thread *it = data;
    struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it);
    mpack_writer_t *writer = &ctx->it.writer;
    static const int MIN_COMMIT_COUNT = 1000;

    int msg_count = 0;

    while (!flb_input_thread_exited(it)) {
        rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500);

        if (rkm) {
            process_message(writer, rkm);
            fflush(ctx->it.write_file);
            rd_kafka_message_destroy(rkm);

            if ((++msg_count % MIN_COMMIT_COUNT) == 0)
                rd_kafka_commit(ctx->kafka.rk, NULL, 0);
        }
    }
}

In https://docs.huihoo.com/apache/kafka/confluent/3.1/clients/consumer.html, it is mentioned "In this example, we trigger a synchronous commit every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly." I have not enough knowledge in Kafka to figure out what should be the best fix ( time based vs item count commit, asynchronous/synchronous), hope this helps anyway

It seems related to https://github.com/fluent/fluent-bit/issues/8400

simontilmant avatar Mar 05 '24 10:03 simontilmant

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] avatar Jun 08 '24 01:06 github-actions[bot]

This issue was closed because it has been stalled for 5 days with no activity.

github-actions[bot] avatar Jun 13 '24 01:06 github-actions[bot]

cc: @lecaros

edsiper avatar Oct 11 '24 20:10 edsiper

any update ?

weily2 avatar Oct 23 '24 06:10 weily2

#9726 I have created a PR for this. It added an option to enable auto-commit to reach high throughput.

lyhgo avatar Dec 19 '24 04:12 lyhgo

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] avatar Mar 22 '25 02:03 github-actions[bot]

This issue was closed because it has been stalled for 5 days with no activity.

github-actions[bot] avatar Mar 27 '25 02:03 github-actions[bot]