in_kafka: add enable_auto_commit option for Kafka consumer
This change introduces a new configuration option enable_auto_commit to control Kafka message commit behavior. When disabled (default), messages are committed only after successful processing, providing better control over message processing guarantees.
Users can enable auto-commit by setting enable_auto_commit true in their configuration if they prefer automatic batch commits. When enabled the overall throughput will be higher.
Fixes #9813
Enter [N/A] in the box, if an item is not applicable to your change.
Testing
Tested locally with the config enable_auto_commit option.
[INPUT]
Name kafka
Tag kafka-test
Brokers <broker endpoint>
Topics <topic-name>
poll_ms 100
poll_timeout_ms 1000
enable_auto_commit true
threaded true
Results: Below shows Fluent Bit input metrics captured at every 10s intervals. (note: the throughput depends on the number of partition count, compression, etc.)
fluentbit_input_bytes_total{name="kafka.0"} 136051025 1748731345204
fluentbit_input_records_total{name="kafka.0"} 82713 1748731345204
fluentbit_input_bytes_total{name="kafka.0"} 536194834 1748731355209
fluentbit_input_records_total{name="kafka.0"} 322963 1748731355209
fluentbit_input_bytes_total{name="kafka.0"} 936335445 1748731365215
fluentbit_input_records_total{name="kafka.0"} 563360 1748731365215
fluentbit_input_bytes_total{name="kafka.0"} 1336472098 1748731375221
fluentbit_input_records_total{name="kafka.0"} 805113 1748731375221
fluentbit_input_bytes_total{name="kafka.0"} 1736605503 1748731385227
fluentbit_input_records_total{name="kafka.0"} 1044700 1748731385227
fluentbit_input_bytes_total{name="kafka.0"} 1975992700 1748731395232
fluentbit_input_records_total{name="kafka.0"} 1188824 1748731395232
Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [ ] Debug log output from testing the change
- [ ] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [ ] Run local packaging test showing all targets (including any new ones) build.
- [x] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [x] Documentation required for this feature
Backporting
- [ ] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
@nareshku Could you use in_kafka: prefix in your latest commit message? It would be nice to follow the contribution guideline.
Thanks for this contribution, I have rebased this in a separate PR https://github.com/fluent/fluent-bit/pull/10558
merged in https://github.com/fluent/fluent-bit/pull/10558, thank you