fluent-bit
fluent-bit copied to clipboard
out_kafka: Introduce raw_log_key to write a single value to kafka
Implement a new option called raw_log_key that allows to write a single value to kafka. This allows to use message_key to put some fields into the Kafka message_key and another field as the payload. Make this work as an additional format.
Enter [N/A] in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [x] Documentation required for this feature
Backporting
- [ ] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Example config used for testing this:
[INPUT]
Name random
[FILTER]
Name type_converter
Match *
uint_key rand_value rand_value string
[FILTER]
Name modify
Match *
Add message Foo123
[OUTPUT]
Name kafka
Match *
Brokers 127.0.0.1:9092
Topics test-foo
Format raw
raw_log_key rand_value
message_key message
Using kcat to read from kafka:
$ kcat -C -b localhost -t test-foo
% Reached end of topic test-foo [0] at offset 470
12174776462032818091
% Reached end of topic test-foo [0] at offset 471
3397822772916645746
% Reached end of topic test-foo [0] at offset 472
3284960532694686745
% Reached end of topic test-foo [0] at offset 473
18034299562221828230
% Reached end of topic test-foo [0] at offset 474
2331075259719154332
output:
./bin/fluent-bit -c ~/source/sre/fb_kafka.ini -vv
Fluent Bit v3.0.1
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
___________.__ __ __________.__ __ ________
\_ _____/| | __ __ ____ _____/ |_ \______ \__|/ |_ ___ _\_____ \
| __) | | | | \_/ __ \ / \ __\ | | _/ \ __\ \ \/ / _(__ <
| \ | |_| | /\ ___/| | \ | | | \ || | \ / / \
\___ / |____/____/ \___ >___| /__| |______ /__||__| \_/ /______ /
\/ \/ \/ \/ \/
[2024/04/01 19:55:52] [ info] Configuration:
[2024/04/01 19:55:52] [ info] flush time | 1.000000 seconds
[2024/04/01 19:55:52] [ info] grace | 5 seconds
[2024/04/01 19:55:52] [ info] daemon | 0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info] inputs:
[2024/04/01 19:55:52] [ info] random
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info] filters:
[2024/04/01 19:55:52] [ info] type_converter.0
[2024/04/01 19:55:52] [ info] modify.1
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info] outputs:
[2024/04/01 19:55:52] [ info] kafka.0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info] collectors:
[2024/04/01 19:55:52] [ info] [fluent bit] version=3.0.1, commit=cf3e53ce03, pid=81063
[2024/04/01 19:55:52] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/04/01 19:55:52] [ info] [storage] ver=1.5.1, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/04/01 19:55:52] [ info] [cmetrics] version=0.7.1
[2024/04/01 19:55:52] [ info] [ctraces ] version=0.4.0
[2024/04/01 19:55:52] [ info] [input:random:random.0] initializing
[2024/04/01 19:55:52] [ info] [input:random:random.0] storage_strategy='memory' (memory only)
[2024/04/01 19:55:52] [debug] [random:random.0] created event channels: read=21 write=22
[2024/04/01 19:55:52] [debug] [input:random:random.0] interval_sec=1 interval_nsec=0
[2024/04/01 19:55:52] [debug] [filter:modify:modify.1] Initialized modify filter with 0 conditions and 1 rules
[2024/04/01 19:55:52] [debug] [kafka:kafka.0] created event channels: read=23 write=24
[2024/04/01 19:55:52] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:9092' topics='test-foo'
[2024/04/01 19:55:52] [ info] [sp] stream processor started
[2024/04/01 19:55:53] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:53] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
[2024/04/01 19:55:54] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:54] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:54] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:54] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:54] [debug] in produce_message
=>17896051301435224995, "rand_value"=>"17896051301435224995", "message"=>"Foo123"}[2024/04/01 19:55:54] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:54] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:54] [debug] [out flush] cb_destroy coro_id=0
[2024/04/01 19:55:54] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:54] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
[2024/04/01 19:55:55] [trace] [task 0x600000da0000] created (id=0)
[2024/04/01 19:55:55] [debug] [task] created task=0x600000da0000 id=0 OK
[2024/04/01 19:55:55] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:55] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value[2024/04/01 19:55:55] [debug] in produce_message
"=>15726046629086841672, "rand_value"=>"15726046629086841672", "message"=>"Foo123"}[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:55] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:55] [debug] [out flush] cb_destroy coro_id=1
[2024/04/01 19:55:55] [trace] [coro] destroy coroutine=0x600003aa4040 data=0x600003aa4058
[2024/04/01 19:55:55] [debug] [task] destroy task=0x600000da0000 (task_id=0)
[2024/04/01 19:55:56] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:56] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:56] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:56] [debug] [input chunk] update output instances with new chunk size diff=87, records=1, input=random.0
{"rand_value"=>1477704631817544547, "rand_value"=>"1477704631817544547", "message[2024/04/01 19:55:56] [debug] in produce_message
"=>"Foo123"}[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:56] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:56] [debug] [out flush] cb_destroy coro_id=2
[2024/04/01 19:55:56] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:56] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:57] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:57] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:57] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:57] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"=>567065323002200912[2024/04/01 19:55:57] [debug] in produce_message
, "rand_value"=>"567065323002200912", "message"=>"Foo123"}[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] enqueued message (18 bytes) for topic 'test-foo'
[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)
[2024/04/01 19:55:57] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:57] [debug] [out flush] cb_destroy coro_id=3
[2024/04/01 19:55:57] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:57] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:58] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:58] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:58] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:58] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:58] [debug] in produce_message
=>3899690440799581283, "rand_value"=>"3899690440799581283", "message"=>"Foo123"}[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (18 bytes, partition 0)
[2024/04/01 19:55:58] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:58] [debug] [out flush] cb_destroy coro_id=4
[2024/04/01 19:55:58] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:58] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
^C[2024/04/01 19:55:58] [engine] caught signal (SIGINT)
[2024/04/01 19:55:58] [ info] [input] pausing random.0
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)
Fixed an unrelated issue found by valgrind (and added a dedicated commit). The reachable bits belong to global state created by curl (and then two dependencies down...)
==4403==
==4403== HEAP SUMMARY:
==4403== in use at exit: 192 bytes in 12 blocks
==4403== total heap usage: 7,620 allocs, 7,608 frees, 5,391,102 bytes allocated
==4403==
==4403== 48 bytes in 6 blocks are still reachable in loss record 1 of 2
==4403== at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403== by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x57D7C5B: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403== by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403== by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403== by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403== by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403==
==4403== 144 bytes in 6 blocks are still reachable in loss record 2 of 2
==4403== at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403== by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x57D7C4F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403== by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403== by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403== by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403== by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403== by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403==
==4403== LEAK SUMMARY:
==4403== definitely lost: 0 bytes in 0 blocks
==4403== indirectly lost: 0 bytes in 0 blocks
==4403== possibly lost: 0 bytes in 0 blocks
==4403== still reachable: 192 bytes in 12 blocks
==4403== suppressed: 0 bytes in 0 blocks
I have rebased this against the master branch.
@cosmo0920 would you have some time to take a look
@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md
I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.
@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md
I opened https://github.com/fluent/fluent-bit-docs/pull/1397 Not sure how to link these two together.
I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.
I have unticked the box. Getting this into any future release will be great.
The doc PR has been merged (https://github.com/fluent/fluent-bit-docs/pull/1397)