fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

out_kafka: fix a bug in avro schema_id

Open siji-on opened this issue 2 years ago • 10 comments

wire format in code did not match in confluent schema wire format(https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format) Related issue: fixes #4560, #4488

Configuration

[SERVICE]
    flush        1
    log_level    debug
    parsers_file parsers.conf

[INPUT]
    name         tail
    path         ./data.log
    parser       json
    Read_from_Head true
    exit_on_eof  off


[OUTPUT]
    name       stdout
    json_date_key false
    match      *


[OUTPUT]
    Name           kafka
    Match          *
    Brokers        localhost:9092
    Topics         log_topic
    Schema_str    {"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}
    Schema_id      38
    Timestamp_Key  @timestamp
    Retry_Limit    false
    # hides errors "Receive failed: Disconnected" when kafka kills idle connections
    rdkafka.log.connection.close false
    # producer buffer is not included in http://fluentbit.io/documentation/0.12/configuration/memory_usage.html#estimating
    rdkafka.queue.buffering.max.kbytes 10240
    # for logs you'll probably want this ot be 0 or 1, not more
    Format avro
    rdkafka.request.required.acks 1

Debug Log

[2022/05/10 09:41:14] [ info] Configuration:
[2022/05/10 09:41:14] [ info]  flush time     | 1.000000 seconds
[2022/05/10 09:41:14] [ info]  grace          | 5 seconds
[2022/05/10 09:41:14] [ info]  daemon         | 0
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info]  inputs:
[2022/05/10 09:41:14] [ info]      tail
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info]  filters:
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info]  outputs:
[2022/05/10 09:41:14] [ info]      stdout.0
[2022/05/10 09:41:14] [ info]      kafka.1
[2022/05/10 09:41:14] [ info] ___________
[2022/05/10 09:41:14] [ info]  collectors:
[2022/05/10 09:41:14] [ info] [fluent bit] version=1.9.3, commit=2100bfac09, pid=17689
[2022/05/10 09:41:14] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2022/05/10 09:41:14] [ info] [storage] version=1.2.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2022/05/10 09:41:14] [ info] [cmetrics] version=0.3.1
[2022/05/10 09:41:14] [debug] [tail:tail.0] created event channels: read=21 write=22
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] flb_tail_fs_inotify_init() initializing inotify tail input
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inotify watch fd=27
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] scanning path ./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inode=27246879 with offset=0 appended as ./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] scan_glob add(): ./data.log, inode 27246879
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] 1 new files found on path './data.log'
[2022/05/10 09:41:14] [debug] [stdout:stdout.0] created event channels: read=29 write=30
[2022/05/10 09:41:14] [debug] [kafka:kafka.1] created event channels: read=36 write=37
[2022/05/10 09:41:14] [ info] [output:stdout:stdout.0] worker #0 started
[2022/05/10 09:41:14] [ info] [output:kafka:kafka.1] brokers='...'
[2022/05/10 09:41:14] [ info] [output:kafka:kafka.1] schemaID='38' schema='{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}'
[2022/05/10 09:41:14] [debug] [router] match rule tail.0:stdout.0
[2022/05/10 09:41:14] [debug] [router] match rule tail.0:kafka.1
[2022/05/10 09:41:14] [ info] [sp] stream processor started
[2022/05/10 09:41:14] [debug] [input chunk] update output instances with new chunk size diff=167
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] [static files] processed 172b
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] inode=27246879 file=./data.log promote to TAIL_EVENT
[2022/05/10 09:41:14] [ info] [input:tail:tail.0] inotify_fs_add(): inode=27246879 watch_fd=1 name=./data.log
[2022/05/10 09:41:14] [debug] [input:tail:tail.0] [static files] processed 0b, done
[2022/05/10 09:41:14] [debug] [task] created task=0x7f2c11c3d7e0 id=0 OK
[2022/05/10 09:41:14] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2022/05/10 09:41:14] [debug] in produce_message

{"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message[0] tail.0: [1652175674.173015401, {"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message"=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}]
"[2022/05/10 09:41:14] [debug] [out flush] cb_destroy coro_id=0
=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] avro schema ID:38:

[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] avro schema string:{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}:

[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] using default buffer AVRO:len:156:limit:2048:schemaID:38:

[2022/05/10 09:41:14] [debug] in flb_msgpack_raw_to_avro_sds

[2022/05/10 09:41:14] [debug] schemaID:38:

[2022/05/10 09:41:14] [debug] schema string:{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}:

[2022/05/10 09:41:14] [debug] in flb_avro_init:before error::json len:209:

2022-05-03T18:33:28.929+09:00[2022/05/10 09:41:14] [debug] calling flb_msgpack_to_avro

[2022/05/10 09:41:14] [debug] in msgpack2avro

[2022/05/10 09:41:14] [debug] got a map

[2022/05/10 09:41:14] [debug] got key:timestamp:

[2022/05/10 09:41:14] [debug] avro_value_add:key:timestamp:avro error::

1[2022/05/10 09:41:14] [debug] added

[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name

[2022/05/10 09:41:14] [debug] called avro_value_get_by_index

[2022/05/10 09:41:14] [debug] in msgpack2avro

[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "

[2022/05/10 09:41:14] [debug] setting string:2022-05-03T18:33:28.929+09:00:

[2022/05/10 09:41:14] [debug] set string

[2022/05/10 09:41:14] [debug] got key:version:

Weights attr: {}[2022/05/10 09:41:14] [debug] avro_value_add:key:version:avro error::

[2022/05/10 09:41:14] [debug] added

[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name

[2022/05/10 09:41:14] [debug] called avro_value_get_by_index

[2022/05/10 09:41:14] [debug] in msgpack2avro

[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "

[2022/05/10 09:41:14] [debug] setting string:1:

[2022/05/10 09:41:14] [debug] set string

[2022/05/10 09:41:14] [debug] got key:message:

[2022/05/10 09:41:14] [debug] avro_value_add:key:message:avro error::

[2022/05/10 09:41:14] [debug] added

org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name

[2022/05/10 09:41:14] [debug] called avro_value_get_by_index

[2022/05/10 09:41:14] [debug] in msgpack2avro

[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "

[2022/05/10 09:41:14] [debug] setting string:Weights attr: {}:

[2022/05/10 09:41:14] [debug] set string

[2022/05/10 09:41:14] [debug] got key:logger_name:

[2022/05/10 09:41:14] [debug] avro_value_add:key:logger_name:avro error::

[2022/05/10 09:41:14] [debug] added

[2022/05/10 09:41:14] [debug] calling avro_value_get_by_name

[2022/05/10 09:41:14] [debug] called avro_value_get_by_index

[2022/05/10 09:41:14] [debug] in msgpack2avro

[2022/05/10 09:41:14] [debug] got a string: "
[2022/05/10 09:41:14] [debug] "

[2022/05/10 09:41:14] [debug] setting string:org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter:

[2022/05/10 09:41:14] [debug] set string

[2022/05/10 09:41:14] [debug] before avro_writer_memory

[2022/05/10 09:41:14] [debug] before avro_writer_flush

[2022/05/10 09:41:14] [debug] after memory free:bytes written:123:

[2022/05/10 09:41:14] [debug] [output:kafka:kafka.1] enqueued message (123 bytes) for topic 'log_topic'
[2022/05/10 09:41:14] [debug] [out flush] cb_destroy coro_id=0

Valgrind output

==18112== Memcheck, a memory error detector
==18112== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==18112== Using Valgrind-3.16.1 and LibVEX; rerun with -h for copyright info
==18112== Command: ../build/bin/fluent-bit -c fluent-bit.conf -R ../conf/parsers.conf
==18112==
Fluent Bit v1.9.3
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2022/05/10 09:51:36] [ info] [fluent bit] version=1.9.3, commit=2100bfac09, pid=18112
[2022/05/10 09:51:36] [ info] [storage] version=1.2.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2022/05/10 09:51:36] [ info] [output:stdout:stdout.0] worker #0 started
[2022/05/10 09:51:36] [ info] [cmetrics] version=0.3.1
[2022/05/10 09:51:37] [ info] [output:kafka:kafka.1] brokers='b-1.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092,b-2.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092,b-3.ohouse-dev-kafka.p6y28m.c3.kafka.ap-northeast-2.amazonaws.com:9092' topics='watch-log-dev-test.v2'
[2022/05/10 09:51:37] [ info] [output:kafka:kafka.1] schemaID='38' schema='{"type":"record","name":"avro_logging","fields":[{"name":"timestamp","type":"string","default":""},{"name":"version","type":"string"},{"name":"message","type":"string"},{"name":"logger_name","type":"string"}]}'
[2022/05/10 09:51:37] [ info] [sp] stream processor started
[2022/05/10 09:51:37] [ info] [input:tail:tail.0] inotify_fs_add(): inode=27246879 watch_fd=1 name=./data.log
==18112== Warning: client switching stacks?  SP change: 0x6ff5678 --> 0x5ccf800
==18112==          to suppress, use: --max-stackframe=20078200 or greater
==18112== Warning: client switching stacks?  SP change: 0x5ccf768 --> 0x6ff5678
==18112==          to suppress, use: --max-stackframe=20078352 or greater
==18112== Warning: client switching stacks?  SP change: 0x6ff5898 --> 0x5ccf768
==18112==          to suppress, use: --max-stackframe=20078896 or greater
==18112==          further instances of this message will not be shown.
[0] tail.0: [1652176297.260046678, {"timestamp"=>"2022-05-03T18:33:28.929+09:00", "version"=>"1", "message"=>"Weights attr: {}", "logger_name"=>"org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter"}]
^C[2022/05/10 09:51:44] [engine] caught signal (SIGINT)
[2022/05/10 09:51:44] [ info] [input] pausing tail.0
[2022/05/10 09:51:44] [ warn] [engine] service will shutdown in max 5 seconds
[2022/05/10 09:51:45] [ info] [engine] service has stopped (0 pending tasks)
[2022/05/10 09:51:45] [ info] [input:tail:tail.0] inotify_fs_remove(): inode=27246879 watch_fd=1
[2022/05/10 09:51:45] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2022/05/10 09:51:45] [ info] [output:stdout:stdout.0] thread worker #0 stopped
[2022/05/10 09:51:45] [ warn] [output:kafka:kafka.1] fluent-bit#producer-1: [thrd:app]: Producer terminating with 1 message (123 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
==18112==
==18112== HEAP SUMMARY:
==18112==     in use at exit: 208,252 bytes in 4,385 blocks
==18112==   total heap usage: 9,706 allocs, 5,321 frees, 1,449,684 bytes allocated
==18112==
==18112== LEAK SUMMARY:
==18112==    definitely lost: 0 bytes in 0 blocks
==18112==    indirectly lost: 0 bytes in 0 blocks
==18112==      possibly lost: 0 bytes in 0 blocks
==18112==    still reachable: 208,252 bytes in 4,385 blocks
==18112==         suppressed: 0 bytes in 0 blocks
==18112== Reachable blocks (those to which a pointer was found) are not shown.
==18112== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==18112==
==18112== For lists of detected and suppressed errors, rerun with: -s
==18112== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Enter [N/A] in the box, if an item is not applicable to your change.

Testing Before we can approve your change; please submit the following in a comment:

  • [x] Example configuration file for the change
  • [x] Debug log output from testing the change
  • [x] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Attached local packaging test output showing all targets (including any new ones) build.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

siji-on avatar May 10 '22 10:05 siji-on

Not sure why but unit tests have not run, possibly an issue with Github actions at the time. Can you rebase to retrigger?

patrick-stephens avatar Jun 14 '22 20:06 patrick-stephens

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Sep 14 '22 02:09 github-actions[bot]

Can we have this PR merged soon? AVRO support with Schema-Registry only works with this fix.

flarno11 avatar Sep 27 '22 14:09 flarno11

The Avro encoding in master is broken and useless with those extra 12 bytes in addition to the actual 5 magic+schemaID bytes. This PR fixes that. But I guess this AVRO_SCHEMA_OVERHEAD in the Kafka plugin should be changed as well? https://github.com/fluent/fluent-bit/blob/5384cc600f5fa23624d9738ac045d5892e481275/plugins/out_kafka/kafka.c#L132-L138

TorsteinOtterlei avatar Feb 14 '23 14:02 TorsteinOtterlei

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar May 16 '23 01:05 github-actions[bot]

Is this PR still relevant?

leonardo-albertovich avatar May 16 '23 08:05 leonardo-albertovich

I believe this is still relevant unless the schema_id was not intended to be used with Confluent's Schema Registry. The schema_id encoded in an message still can't be recognized by Schema Registry due to the reason stated in the beginning of the thread.

falau avatar Jun 01 '23 10:06 falau

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Aug 31 '23 01:08 github-actions[bot]

Can confirm that this patch is required to get messages to be accepted by Confluent Kafka with schema validation enabled.

silverwind avatar Feb 15 '24 10:02 silverwind

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar May 16 '24 01:05 github-actions[bot]