fluent-bit
fluent-bit copied to clipboard
out_kafka: fix a bug in avro schema_id
wire format in code did not match in confluent schema wire format(https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format) Related issue: fixes #4560, #4488
Configuration
[SERVICE]
flush 1
log_level debug
parsers_file parsers.conf
[INPUT]
name tail
path ./data.log
parser json
Read_from_Head true
exit_on_eof off
[OUTPUT]
name stdout
json_date_key false
match *
[OUTPUT]
Name kafka
Match *
Brokers localhost:9092
Topics log_topic
Schema_str {"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}
Schema_id 38
Timestamp_Key @timestamp
Retry_Limit false
# hides errors "Receive failed: Disconnected" when kafka kills idle connections
rdkafka.log.connection.close false
# producer buffer is not included in http://fluentbit.io/documentation/0.12/configuration/memory_usage.html#estimating
rdkafka.queue.buffering.max.kbytes 10240
# for logs you'll probably want this ot be 0 or 1, not more
Format avro
rdkafka.request.required.acks 1
Debug Log
[2022/05/10 09:41:14] [ info] Configuration:
[2022/05/10 09:41:14] [ info] flush time | 1.000000 seconds
[2022/05/10 09:41:14] [ info] grace | 5 seconds
[2022/05/10 09:41:14] [ info] daemon | 0
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info] inputs:
[2022/05/10 09:41:14] [ info] tail
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info] filters:
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info] outputs:
[2022/05/10 09:41:14] [ info] stdout.0
[2022/05/10 09:41:14] [ info] kafka.1
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info] collectors:
[2022/05/10 09:41:14] [ info] [fluent bit] version=1.9.3, commit=2100bfac09, pid=17689
[2022/05/10 09:41:14] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2022/05/10 09:41:14] [ info] [storage] version=1.2.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2022/05/10 09:41:14] [ info] [cmetrics] version=0.3.1
[2022/05/10 09:41:14] [debug] [tail:tail.0] created event channels: read=21 write=22
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] flb_tail_fs_inotify_init() initializing inotify tail input
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inotify watch fd=27
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] scanning path ./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inode=27246879 with offset=0 appended as ./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] scan_glob add(): ./data.log, inode 27246879
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] 1 new files found on path './data.log'
[2022/05/10 09:41:14] [debug] [stdout:stdout.0] created event channels: read=29 write=30
[2022/05/10 09:41:14] [debug] [kafka:kafka.1] created event channels: read=36 write=37
[2022/05/10 09:41:14] [ info] [output:stdout:stdout.0] worker #0 started
[2022/05/10 09:41:14] [ info] [output:kafka:kafka.1] brokers='...'
[2022/05/10 09:41:14] [ info] [output:kafka:kafka.1] schemaID='38' schema='{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}'
[2022/05/10 09:41:14] [debug] [router] match rule tail.0:stdout.0
[2022/05/10 09:41:14] [debug] [router] match rule tail.0:kafka.1
[2022/05/10 09:41:14] [ info] [sp] stream processor started
[2022/05/10 09:41:14] [debug] [input chunk] update output instances with new chunk size diff=167
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] [static files] processed 172b
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inode=27246879 file=./data.log promote to TAIL_EVENT
[2022/05/10 09:41:14] [ info] [input:tail:tail.0] inotify_fs_add(): inode=27246879 watch_fd=1 name=./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] [static files] processed 0b, done
[2022/05/10 09:41:14] [debug] [task] created task=0x7f2c11c3d7e0 id=0 OK
[2022/05/10 09:41:14] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2022/05/10 09:41:14] [debug] in produce_message
{"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message[0] tail.0: [1652175674.173015401, {"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message"=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}]
"[2022/05/10 09:41:14] [debug] [out flush] cb_destroy coro_id=0
=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] avro schema ID:38:
[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] avro schema string:{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}:
[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] using default buffer AVRO:len:156:limit:2048:schemaID:38:
[2022/05/10 09:41:14] [debug] in flb_msgpack_raw_to_avro_sds
[2022/05/10 09:41:14] [debug] schemaID:38:
[2022/05/10 09:41:14] [debug] schema string:{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}:
[2022/05/10 09:41:14] [debug] in flb_avro_init:before error::json len:209:
2022-05-03T18:33:28.929+09:00[2022/05/10 09:41:14] [debug] calling flb_msgpack_to_avro
[2022/05/10 09:41:14] [debug] in msgpack2avro
[2022/05/10 09:41:14] [debug] got a map
[2022/05/10 09:41:14] [debug] got key:timestamp:
[2022/05/10 09:41:14] [debug] avro_value_add:key:timestamp:avro error::
1[2022/05/10 09:41:14] [debug] added
[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name
[2022/05/10 09:41:14] [debug] called avro_value_get_by_index
[2022/05/10 09:41:14] [debug] in msgpack2avro
[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "
[2022/05/10 09:41:14] [debug] setting string:2022-05-03T18:33:28.929+09:00:
[2022/05/10 09:41:14] [debug] set string
[2022/05/10 09:41:14] [debug] got key:version:
Weights attr: {}[2022/05/10 09:41:14] [debug] avro_value_add:key:version:avro error::
[2022/05/10 09:41:14] [debug] added
[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name
[2022/05/10 09:41:14] [debug] called avro_value_get_by_index
[2022/05/10 09:41:14] [debug] in msgpack2avro
[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "
[2022/05/10 09:41:14] [debug] setting string:1:
[2022/05/10 09:41:14] [debug] set string
[2022/05/10 09:41:14] [debug] got key:message:
[2022/05/10 09:41:14] [debug] avro_value_add:key:message:avro error::
[2022/05/10 09:41:14] [debug] added
org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name
[2022/05/10 09:41:14] [debug] called avro_value_get_by_index
[2022/05/10 09:41:14] [debug] in msgpack2avro
[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "
[2022/05/10 09:41:14] [debug] setting string:Weights attr: {}:
[2022/05/10 09:41:14] [debug] set string
[2022/05/10 09:41:14] [debug] got key:logger_name:
[2022/05/10 09:41:14] [debug] avro_value_add:key:logger_name:avro error::
[2022/05/10 09:41:14] [debug] added
[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name
[2022/05/10 09:41:14] [debug] called avro_value_get_by_index
[2022/05/10 09:41:14] [debug] in msgpack2avro
[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "
[2022/05/10 09:41:14] [debug] setting string:org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter:
[2022/05/10 09:41:14] [debug] set string
[2022/05/10 09:41:14] [debug] before avro_writer_memory
[2022/05/10 09:41:14] [debug] before avro_writer_flush
[2022/05/10 09:41:14] [debug] after memory free:bytes written:123:
[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] enqueued message (123 bytes) for topic 'log_topic'
[2022/05/10 09:41:14] [debug] [out flush] cb_destroy coro_id=0
Valgrind output
==18112== Memcheck, a memory error detector
==18112== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==18112== Using Valgrind-3.16.1 and LibVEX; rerun with -h for copyright info
==18112== Command: ../build/bin/fluent-bit -c fluent-bit.conf -R ../conf/parsers.conf
==18112==
Fluent Bit v1.9.3
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2022/05/10 09:51:36] [ info] [fluent bit] version=1.9.3, commit=2100bfac09, pid=18112
[2022/05/10 09:51:36] [ info] [storage] version=1.2.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2022/05/10 09:51:36] [ info] [output:stdout:stdout.0] worker #0 started
[2022/05/10 09:51:36] [ info] [cmetrics] version=0.3.1
[2022/05/10 09:51:37] [ info] [output:kafka:kafka.1] brokers='b-1.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092,b-2.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092,b-3.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092' topics='watch-log-dev-test.v2'
[2022/05/10 09:51:37] [ info] [output:kafka:kafka.1] schemaID='38' schema='{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}'
[2022/05/10 09:51:37] [ info] [sp] stream processor started
[2022/05/10 09:51:37] [ info] [input:tail:tail.0] inotify_fs_add(): inode=27246879 watch_fd=1 name=./data.log
==18112== Warning: client switching stacks? SP change: 0x6ff5678 --> 0x5ccf800
==18112== to suppress, use: --max-stackframe=20078200 or greater
==18112== Warning: client switching stacks? SP change: 0x5ccf768 --> 0x6ff5678
==18112== to suppress, use: --max-stackframe=20078352 or greater
==18112== Warning: client switching stacks? SP change: 0x6ff5898 --> 0x5ccf768
==18112== to suppress, use: --max-stackframe=20078896 or greater
==18112== further instances of this message will not be shown.
[0] tail.0: [1652176297.260046678, {"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message"=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}]
^C[2022/05/10 09:51:44] [engine] caught signal (SIGINT)
[2022/05/10 09:51:44] [ info] [input] pausing tail.0
[2022/05/10 09:51:44] [ warn] [engine] service will shutdown in max 5 seconds
[2022/05/10 09:51:45] [ info] [engine] service has stopped (0 pending tasks)
[2022/05/10 09:51:45] [ info] [input:tail:tail.0] inotify_fs_remove(): inode=27246879 watch_fd=1
[2022/05/10 09:51:45] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2022/05/10 09:51:45] [ info] [output:stdout:stdout.0] thread worker #0 stopped
[2022/05/10 09:51:45] [ warn] [output:kafka:kafka.1] fluent-bit#producer-1: [thrd:app]: Producer terminating with 1 message (123 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
==18112==
==18112== HEAP SUMMARY:
==18112== in use at exit: 208,252 bytes in 4,385 blocks
==18112== total heap usage: 9,706 allocs, 5,321 frees, 1,449,684 bytes allocated
==18112==
==18112== LEAK SUMMARY:
==18112== definitely lost: 0 bytes in 0 blocks
==18112== indirectly lost: 0 bytes in 0 blocks
==18112== possibly lost: 0 bytes in 0 blocks
==18112== still reachable: 208,252 bytes in 4,385 blocks
==18112== suppressed: 0 bytes in 0 blocks
==18112== Reachable blocks (those to which a pointer was found) are not shown.
==18112== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==18112==
==18112== For lists of detected and suppressed errors, rerun with: -s
==18112== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Enter [N/A]
in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Attached local packaging test output showing all targets (including any new ones) build.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Not sure why but unit tests have not run, possibly an issue with Github actions at the time. Can you rebase to retrigger?
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
Can we have this PR merged soon? AVRO support with Schema-Registry only works with this fix.
The Avro encoding in master is broken and useless with those extra 12 bytes in addition to the actual 5 magic+schemaID bytes. This PR fixes that. But I guess this AVRO_SCHEMA_OVERHEAD in the Kafka plugin should be changed as well? https://github.com/fluent/fluent-bit/blob/5384cc600f5fa23624d9738ac045d5892e481275/plugins/out_kafka/kafka.c#L132-L138
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
Is this PR still relevant?
I believe this is still relevant unless the schema_id
was not intended to be used with Confluent's Schema Registry.
The schema_id
encoded in an message still can't be recognized by Schema Registry due to the reason stated in the beginning of the thread.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
Can confirm that this patch is required to get messages to be accepted by Confluent Kafka with schema validation enabled.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.