fluent-bit
fluent-bit copied to clipboard
out_kafka: add timestamp_mode to allow disabling timestamp key
This PR tries to address the feature request raised by #2477 to have
a way to disable timestamp key in kafka plugin. In the past it was
not possible to disable timestamp (default key is @timestamp
)
in output.
This PR adds another option of Timestamp_Mode = on|off
so that it is possible to disable timestamp. This is also with
the intention of not breaking existing user.
This PR fixes #2477.
Signed-off-by: Yong Tang [email protected]
Kafka messages before:
{"@timestamp":1612909053.768296,"path":"lines.txt","log":"{\"log\": \"aaa\"}"}
{"@timestamp":1612909053.774844,"path":"lines.txt","log":"{\"log\": \"aab\"}"}
{"@timestamp":1612909053.774868,"path":"lines.txt","log":"{\"log\": \"bbb\"}"}
{"@timestamp":1612909053.77488,"path":"lines.txt","log":"{\"log\": \"ccc\"}"}
{"@timestamp":1612909053.774892,"path":"lines.txt","log":"{\"log\": \"ddd\"}"}
{"@timestamp":1612909053.774903,"path":"lines.txt","log":"{\"log\": \"eee\"}"}
{"@timestamp":1612909053.774913,"path":"lines.txt","log":"{\"log\": \"fff\"}"}
{"@timestamp":1612909053.774924,"path":"lines.txt","log":"{\"log\": \"ggg\"}"}
Kafka messages after:
{"path":"lines.txt","log":"{\"log\": \"aaa\"}"}
{"path":"lines.txt","log":"{\"log\": \"aab\"}"}
{"path":"lines.txt","log":"{\"log\": \"bbb\"}"}
{"path":"lines.txt","log":"{\"log\": \"ccc\"}"}
{"path":"lines.txt","log":"{\"log\": \"ddd\"}"}
{"path":"lines.txt","log":"{\"log\": \"eee\"}"}
{"path":"lines.txt","log":"{\"log\": \"fff\"}"}
{"path":"lines.txt","log":"{\"log\": \"ggg\"}"}
Example Config:
$ cat fluent-bit.conf
[INPUT]
Name tail
Path lines.txt
Read_from_Head true
Path_Key path
[OUTPUT]
Name kafka
Match *
Brokers 127.0.0.1:9092
Topics test
Timestamp_Mode off
[OUTPUT]
Name stdout
Match *
lines.txt:
$ cat lines.txt
{"log": "aaa"}
{"log": "aab"}
{"log": "bbb"}
{"log": "ccc"}
{"log": "ddd"}
{"log": "eee"}
{"log": "fff"}
{"log": "ggg"}
Vagrind:
$ valgrind --leak-check=full bin/fluent-bit -c fluent-bit.conf
==525220== Memcheck, a memory error detector
==525220== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==525220== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==525220== Command: bin/fluent-bit -c fluent-bit.conf
==525220==
Fluent Bit v1.7.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2021/02/09 21:58:38] [ info] [engine] started (pid=525220)
[2021/02/09 21:58:39] [ info] [storage] version=1.1.0, initializing...
[2021/02/09 21:58:39] [ info] [storage] in-memory
[2021/02/09 21:58:39] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/02/09 21:58:39] [ info] [output:kafka:kafka.0] brokers='127.0.0.1:9092' topics='test'
[2021/02/09 21:58:39] [ info] [sp] stream processor started
[2021/02/09 21:58:39] [ info] [input:tail:tail.0] inotify_fs_add(): inode=2305433 watch_fd=1 name=lines.txt
[0] tail.0: [1612907919.376051134, {"path"=>"lines.txt", "log"=>"{"log": "aaa"}"}]
[1] tail.0: [1612907919.382767006, {"path"=>"lines.txt", "log"=>"{"log": "aab"}"}]
[2] tail.0: [1612907919.382792267, {"path"=>"lines.txt", "log"=>"{"log": "bbb"}"}]
[3] tail.0: [1612907919.382847768, {"path"=>"lines.txt", "log"=>"{"log": "ccc"}"}]
[4] tail.0: [1612907919.382863058, {"path"=>"lines.txt", "log"=>"{"log": "ddd"}"}]
[5] tail.0: [1612907919.382874338, {"path"=>"lines.txt", "log"=>"{"log": "eee"}"}]
[6] tail.0: [1612907919.382885108, {"path"=>"lines.txt", "log"=>"{"log": "fff"}"}]
[7] tail.0: [1612907919.382896099, {"path"=>"lines.txt", "log"=>"{"log": "ggg"}"}]
^C[2021/02/09 21:58:54] [engine] caught signal (SIGINT)
[2021/02/09 21:58:54] [ info] [input] pausing tail.0
[2021/02/09 21:58:54] [ warn] [engine] service will stop in 5 seconds
[2021/02/09 21:58:58] [ info] [engine] service stopped
[2021/02/09 21:58:58] [ info] [input:tail:tail.0] inotify_fs_remove(): inode=2305433 watch_fd=1
==525220==
==525220== HEAP SUMMARY:
==525220== in use at exit: 0 bytes in 0 blocks
==525220== total heap usage: 822 allocs, 822 frees, 1,493,890 bytes allocated
==525220==
==525220== All heap blocks were freed -- no leaks are possible
==525220==
==525220== For lists of detected and suppressed errors, rerun with: -s
==525220== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
Enter [N/A]
in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
Documentation
- [x] Documentation required for this feature
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@edsiper any chance to take a look?
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@edsiper Can you please look into this?
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@edsiper Can you please look into this? or @nokute78 can help us out?
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
+1 This would be a nice addition to the Kafka output plugin.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.