fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

out_kafka: Introduce raw_log_key to write a single value to kafka

Open zecke opened this issue 1 year ago • 7 comments

Implement a new option called raw_log_key that allows to write a single value to kafka. This allows to use message_key to put some fields into the Kafka message_key and another field as the payload. Make this work as an additional format.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing Before we can approve your change; please submit the following in a comment:

  • [x] Example configuration file for the change
  • [x] Debug log output from testing the change
  • [x] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [x] Documentation required for this feature

Backporting

  • [ ] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

zecke avatar Apr 01 '24 11:04 zecke

Example config used for testing this:

[INPUT]
    Name  random

[FILTER]
    Name type_converter
    Match *
    uint_key rand_value rand_value string

[FILTER]
    Name modify
    Match *
    Add message Foo123

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     127.0.0.1:9092
    Topics      test-foo
    Format      raw
    raw_log_key rand_value
    message_key message

Using kcat to read from kafka:

$ kcat -C -b localhost -t test-foo
% Reached end of topic test-foo [0] at offset 470
12174776462032818091
% Reached end of topic test-foo [0] at offset 471
3397822772916645746
% Reached end of topic test-foo [0] at offset 472
3284960532694686745
% Reached end of topic test-foo [0] at offset 473
18034299562221828230
% Reached end of topic test-foo [0] at offset 474
2331075259719154332

output:

./bin/fluent-bit -c ~/source/sre/fb_kafka.ini -vv
Fluent Bit v3.0.1
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/04/01 19:55:52] [ info] Configuration:
[2024/04/01 19:55:52] [ info]  flush time     | 1.000000 seconds
[2024/04/01 19:55:52] [ info]  grace          | 5 seconds
[2024/04/01 19:55:52] [ info]  daemon         | 0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  inputs:
[2024/04/01 19:55:52] [ info]      random
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  filters:
[2024/04/01 19:55:52] [ info]      type_converter.0
[2024/04/01 19:55:52] [ info]      modify.1
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  outputs:
[2024/04/01 19:55:52] [ info]      kafka.0
[2024/04/01 19:55:52] [ info] ___________
[2024/04/01 19:55:52] [ info]  collectors:
[2024/04/01 19:55:52] [ info] [fluent bit] version=3.0.1, commit=cf3e53ce03, pid=81063
[2024/04/01 19:55:52] [debug] [engine] coroutine stack size: 36864 bytes (36.0K)
[2024/04/01 19:55:52] [ info] [storage] ver=1.5.1, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/04/01 19:55:52] [ info] [cmetrics] version=0.7.1
[2024/04/01 19:55:52] [ info] [ctraces ] version=0.4.0
[2024/04/01 19:55:52] [ info] [input:random:random.0] initializing
[2024/04/01 19:55:52] [ info] [input:random:random.0] storage_strategy='memory' (memory only)
[2024/04/01 19:55:52] [debug] [random:random.0] created event channels: read=21 write=22
[2024/04/01 19:55:52] [debug] [input:random:random.0] interval_sec=1 interval_nsec=0
[2024/04/01 19:55:52] [debug] [filter:modify:modify.1] Initialized modify filter with 0 conditions and 1 rules
[2024/04/01 19:55:52] [debug] [kafka:kafka.0] created event channels: read=23 write=24
[2024/04/01 19:55:52] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:9092' topics='test-foo'
[2024/04/01 19:55:52] [ info] [sp] stream processor started
[2024/04/01 19:55:53] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:53] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
[2024/04/01 19:55:54] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:54] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:54] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:54] [debug] [input chunk] update output instances with new chunk size diff=89, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:54] [debug] in produce_message

=>17896051301435224995, "rand_value"=>"17896051301435224995", "message"=>"Foo123"}[2024/04/01 19:55:54] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:54] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:54] [debug] [out flush] cb_destroy coro_id=0
[2024/04/01 19:55:54] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:54] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
[2024/04/01 19:55:55] [trace] [task 0x600000da0000] created (id=0)
[2024/04/01 19:55:55] [debug] [task] created task=0x600000da0000 id=0 OK
[2024/04/01 19:55:55] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:55] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value[2024/04/01 19:55:55] [debug] in produce_message

"=>15726046629086841672, "rand_value"=>"15726046629086841672", "message"=>"Foo123"}[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] enqueued message (20 bytes) for topic 'test-foo'
[2024/04/01 19:55:55] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:55] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:55] [debug] [out flush] cb_destroy coro_id=1
[2024/04/01 19:55:55] [trace] [coro] destroy coroutine=0x600003aa4040 data=0x600003aa4058
[2024/04/01 19:55:55] [debug] [task] destroy task=0x600000da0000 (task_id=0)
[2024/04/01 19:55:56] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:56] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:56] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:56] [debug] [input chunk] update output instances with new chunk size diff=87, records=1, input=random.0
{"rand_value"=>1477704631817544547, "rand_value"=>"1477704631817544547", "message[2024/04/01 19:55:56] [debug] in produce_message

"=>"Foo123"}[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:56] [debug] [output:kafka:kafka.0] message delivered (20 bytes, partition 0)
[2024/04/01 19:55:56] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:56] [debug] [out flush] cb_destroy coro_id=2
[2024/04/01 19:55:56] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:56] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:57] [trace] [task 0x600000da4000] created (id=0)
[2024/04/01 19:55:57] [debug] [task] created task=0x600000da4000 id=0 OK
[2024/04/01 19:55:57] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:57] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"=>567065323002200912[2024/04/01 19:55:57] [debug] in produce_message

, "rand_value"=>"567065323002200912", "message"=>"Foo123"}[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] enqueued message (18 bytes) for topic 'test-foo'
[2024/04/01 19:55:57] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)
[2024/04/01 19:55:57] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:57] [debug] [out flush] cb_destroy coro_id=3
[2024/04/01 19:55:57] [trace] [coro] destroy coroutine=0x600003aa8040 data=0x600003aa8058
[2024/04/01 19:55:57] [debug] [task] destroy task=0x600000da4000 (task_id=0)
[2024/04/01 19:55:58] [trace] [task 0x600000dbc000] created (id=0)
[2024/04/01 19:55:58] [debug] [task] created task=0x600000dbc000 id=0 OK
[2024/04/01 19:55:58] [trace] [filter:modify:modify.1 at fluent-bit/plugins/filter_modify/modify.c:1427] Input map size 2 elements, output map size 3 elements
[2024/04/01 19:55:58] [debug] [input chunk] update output instances with new chunk size diff=88, records=1, input=random.0
{"rand_value"[2024/04/01 19:55:58] [debug] in produce_message

=>3899690440799581283, "rand_value"=>"3899690440799581283", "message"=>"Foo123"}[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] enqueued message (19 bytes) for topic 'test-foo'
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (18 bytes, partition 0)
[2024/04/01 19:55:58] [trace] [engine] [task event] task_id=0 out_id=0 return=OK
[2024/04/01 19:55:58] [debug] [out flush] cb_destroy coro_id=4
[2024/04/01 19:55:58] [trace] [coro] destroy coroutine=0x600003aa0040 data=0x600003aa0058
[2024/04/01 19:55:58] [debug] [task] destroy task=0x600000dbc000 (task_id=0)
^C[2024/04/01 19:55:58] [engine] caught signal (SIGINT)
[2024/04/01 19:55:58] [ info] [input] pausing random.0
[2024/04/01 19:55:58] [debug] [output:kafka:kafka.0] message delivered (19 bytes, partition 0)

zecke avatar Apr 01 '24 11:04 zecke

Fixed an unrelated issue found by valgrind (and added a dedicated commit). The reachable bits belong to global state created by curl (and then two dependencies down...)

==4403== 
==4403== HEAP SUMMARY:
==4403==     in use at exit: 192 bytes in 12 blocks
==4403==   total heap usage: 7,620 allocs, 7,608 frees, 5,391,102 bytes allocated
==4403== 
==4403== 48 bytes in 6 blocks are still reachable in loss record 1 of 2
==4403==    at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403==    by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57D7C5B: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403==    by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403==    by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403== 
==4403== 144 bytes in 6 blocks are still reachable in loss record 2 of 2
==4403==    at 0x48850C8: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-arm64-linux.so)
==4403==    by 0x576F83F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x5770E2F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57D7C4F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576F73F: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x57708DB: ??? (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x576C443: gcry_control (in /usr/lib/aarch64-linux-gnu/libgcrypt.so.20.2.8)
==4403==    by 0x50F2BC3: libssh2_init (in /usr/lib/aarch64-linux-gnu/libssh2.so.1.0.1)
==4403==    by 0x4DDF8CF: ??? (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x4D8D1FB: curl_global_init (in /usr/lib/aarch64-linux-gnu/libcurl.so.4.7.0)
==4403==    by 0x104FF87: rd_http_global_init (rdhttp.c:443)
==4403==    by 0xEC4177: rd_kafka_global_init0 (rdkafka.c:160)
==4403== 
==4403== LEAK SUMMARY:
==4403==    definitely lost: 0 bytes in 0 blocks
==4403==    indirectly lost: 0 bytes in 0 blocks
==4403==      possibly lost: 0 bytes in 0 blocks
==4403==    still reachable: 192 bytes in 12 blocks
==4403==         suppressed: 0 bytes in 0 blocks

zecke avatar Apr 01 '24 13:04 zecke

I have rebased this against the master branch.

zecke avatar Jun 15 '24 02:06 zecke

@cosmo0920 would you have some time to take a look

agup006 avatar Jun 15 '24 03:06 agup006

@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md

I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.

patrick-stephens avatar Jun 17 '24 13:06 patrick-stephens

@zecke can you link the docs PR for this as well? It's a new parameter I believe is what you're saying so should be documented here (ideally with an example): https://github.com/fluent/fluent-bit-docs/blob/master/pipeline/outputs/kafka.md

I opened https://github.com/fluent/fluent-bit-docs/pull/1397 Not sure how to link these two together.

I see you've ticked the backport option as well, that typically means also linking a PR to the branch for the backport version.

I have unticked the box. Getting this into any future release will be great.

zecke avatar Jun 19 '24 12:06 zecke

The doc PR has been merged (https://github.com/fluent/fluent-bit-docs/pull/1397)

zecke avatar Jul 08 '24 22:07 zecke