fluent-bit
fluent-bit copied to clipboard
filter_parser: Hash_key for parsed result
Add Hash key for parsed value in Filter parser (similar to fluentd)
Addresses #8893
Enter [N/A] in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [x] Documentation required for this feature https://github.com/fluent/fluent-bit-docs/pull/1444
Backporting
- [N/A] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Configuration used for testing
[SERVICE]
Flush 1
Daemon Off
Log_Level debug
Parsers_File parsers.conf
[INPUT]
Name stdin
Tag myapp.*
[FILTER]
Name parser
Match myapp.*
Parser inner_json_parser
Key_name MESSAGE
Reserve_Data On
Hash_Value_Field On
[OUTPUT]
Name stdout
Match *
Format json_lines
[PARSER]
Name inner_json_parser
Format regex
Regex ^(?<STRING1>[^ ]+) (?<STRING2>.+)$
The manual testing output
root@127f08db3cdb:/workspaces/fluent-bit/build/bin# echo -n "{\"MESSAGE\": \"some value\"}" | ./fluent-bit -c config.conf -q
[2024/08/24 06:10:35] [ info] Configuration:
[2024/08/24 06:10:35] [ info] flush time | 1.000000 seconds
[2024/08/24 06:10:35] [ info] grace | 5 seconds
[2024/08/24 06:10:35] [ info] daemon | 0
[2024/08/24 06:10:35] [ info] ___________
[2024/08/24 06:10:35] [ info] inputs:
[2024/08/24 06:10:35] [ info] stdin
[2024/08/24 06:10:35] [ info] ___________
[2024/08/24 06:10:35] [ info] filters:
[2024/08/24 06:10:35] [ info] parser.0
[2024/08/24 06:10:35] [ info] ___________
[2024/08/24 06:10:35] [ info] outputs:
[2024/08/24 06:10:35] [ info] stdout.0
[2024/08/24 06:10:35] [ info] ___________
[2024/08/24 06:10:35] [ info] collectors:
[2024/08/24 06:10:35] [ info] [fluent bit] version=3.1.7, commit=, pid=40121
[2024/08/24 06:10:35] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/08/24 06:10:35] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/08/24 06:10:35] [ info] [cmetrics] version=0.9.4
[2024/08/24 06:10:35] [ info] [ctraces ] version=0.5.5
[2024/08/24 06:10:35] [ info] [input:stdin:stdin.0] initializing
[2024/08/24 06:10:35] [ info] [input:stdin:stdin.0] storage_strategy='memory' (memory only)
[2024/08/24 06:10:35] [debug] [stdin:stdin.0] created event channels: read=21 write=22
[2024/08/24 06:10:35] [debug] [input:stdin:stdin.0] buf_size=16000
[2024/08/24 06:10:35] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2024/08/24 06:10:35] [ info] [sp] stream processor started
[2024/08/24 06:10:35] [ info] [output:stdout:stdout.0] worker #0 started
[2024/08/24 06:10:35] [ warn] [input:stdin:stdin.0] end of file (stdin closed by remote end)
[2024/08/24 06:10:35] [debug] [task] created task=0x7f86b00237d0 id=0 OK
{"date":1724479835.275198,"STRING1":"some","STRING2":"value","parsed":{"STRING1":"some","STRING2":"value"}}
[2024/08/24 06:10:35] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2024/08/24 06:10:35] [ warn] [engine] service will shutdown in max 5 seconds
[2024/08/24 06:10:35] [debug] [out flush] cb_destroy coro_id=0
[2024/08/24 06:10:35] [debug] [task] destroy task=0x7f86b00237d0 (task_id=0)
[2024/08/24 06:10:35] [ info] [engine] service has stopped (0 pending tasks)
[2024/08/24 06:10:35] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/08/24 06:10:35] [ info] [output:stdout:stdout.0] thread worker #0 stopped
Valgrind output
root@3aeb5205afb0:/workspaces/fluent-bit/build/bin# valgrind ./fluent-bit -c config.conf
==3576== Memcheck, a memory error detector
==3576== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==3576== Using Valgrind-3.19.0 and LibVEX; rerun with -h for copyright info
==3576== Command: ./fluent-bit -c config.conf
==3576==
Fluent Bit v3.1.7
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
______ _ _ ______ _ _ _____ __
| ___| | | | | ___ (_) | |____ |/ |
| |_ | |_ _ ___ _ __ | |_ | |_/ /_| |_ __ __ / /`| |
| _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / \ \ | |
| | | | |_| | __/ | | | |_ | |_/ / | |_ \ V /.___/ /_| |_
\_| |_|\__,_|\___|_| |_|\__| \____/|_|\__| \_/ \____(_)___/
[2024/08/26 09:22:44] [ info] Configuration:
[2024/08/26 09:22:44] [ info] flush time | 1.000000 seconds
[2024/08/26 09:22:44] [ info] grace | 5 seconds
[2024/08/26 09:22:44] [ info] daemon | 0
[2024/08/26 09:22:44] [ info] ___________
[2024/08/26 09:22:44] [ info] inputs:
[2024/08/26 09:22:44] [ info] stdin
[2024/08/26 09:22:44] [ info] ___________
[2024/08/26 09:22:44] [ info] filters:
[2024/08/26 09:22:44] [ info] parser.0
[2024/08/26 09:22:44] [ info] ___________
[2024/08/26 09:22:44] [ info] outputs:
[2024/08/26 09:22:44] [ info] stdout.0
[2024/08/26 09:22:44] [ info] ___________
[2024/08/26 09:22:44] [ info] collectors:
[2024/08/26 09:22:44] [ info] [fluent bit] version=3.1.7, commit=, pid=3576
[2024/08/26 09:22:44] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/08/26 09:22:44] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/08/26 09:22:44] [ info] [cmetrics] version=0.9.4
[2024/08/26 09:22:44] [ info] [ctraces ] version=0.5.5
[2024/08/26 09:22:44] [ info] [input:stdin:stdin.0] initializing
[2024/08/26 09:22:44] [ info] [input:stdin:stdin.0] storage_strategy='memory' (memory only)
[2024/08/26 09:22:44] [ info] [output:stdout:stdout.0] worker #0 started
[2024/08/26 09:22:44] [debug] [stdin:stdin.0] created event channels: read=21 write=22
[2024/08/26 09:22:44] [debug] [input:stdin:stdin.0] buf_size=16000
[2024/08/26 09:22:44] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2024/08/26 09:22:44] [ info] [sp] stream processor started
s
[2024/08/26 09:23:07] [debug] [input:stdin:stdin.0] invalid JSON message, skipping
{"MESSAGE":"some"}
[2024/08/26 09:23:59] [debug] [task] created task=0x577e5f0 id=0 OK
[2024/08/26 09:23:59] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
{"date":1724664238.915407,"MESSAGE":"some"}
[2024/08/26 09:23:59] [debug] [task] destroy task=0x577e5f0 (task_id=0)
[2024/08/26 09:23:59] [debug] [out flush] cb_destroy coro_id=0
{"MESSAGE":"some value"}
{"date":1724664286.568816,"STRING1":"some","STRING2":"value","parsed":{"STRING1":"some","STRING2":"value"}}
[2024/08/26 09:24:47] [debug] [task] created task=0x7188dd0 id=0 OK
[2024/08/26 09:24:47] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2024/08/26 09:24:47] [debug] [out flush] cb_destroy coro_id=1
[2024/08/26 09:24:47] [debug] [task] destroy task=0x7188dd0 (task_id=0)
^C[2024/08/26 09:24:54] [engine] caught signal (SIGINT)
[2024/08/26 09:24:54] [ warn] [engine] service will shutdown in max 5 seconds
[2024/08/26 09:24:55] [ info] [engine] service has stopped (0 pending tasks)
[2024/08/26 09:24:55] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/08/26 09:24:55] [ info] [output:stdout:stdout.0] thread worker #0 stopped
==3576==
==3576== HEAP SUMMARY:
==3576== in use at exit: 8,279 bytes in 3 blocks
==3576== total heap usage: 2,511 allocs, 2,508 frees, 5,707,028 bytes allocated
==3576==
==3576== LEAK SUMMARY:
==3576== definitely lost: 79 bytes in 2 blocks
==3576== indirectly lost: 8,200 bytes in 1 blocks
==3576== possibly lost: 0 bytes in 0 blocks
==3576== still reachable: 0 bytes in 0 blocks
==3576== suppressed: 0 bytes in 0 blocks
==3576== Rerun with --leak-check=full to see details of leaked memory
==3576==
==3576== For lists of detected and suppressed errors, rerun with: -s
==3576== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
@edsiper, @leonardo-albertovich and others. I have added all necessary logs and content. Could take a look at the PR?
I will check the failing UTs
@cosmo0920 corrected it, i missed 2 mem leaks
@cosmo0920 could you please approve the workflow?
@edsiper seems like the checks are passing. Please review the change and let me know the changes

@cosmo0920 i understand you all are busy with something. Would be nice if atleast someone replies if they can look at it or point out someone
@cosmo0920 addressed your comments
@cosmo0920 i guess the tests the are passing. I have addressed your comments as well. Anything else left?
hey @cosmo0920 any further updates?
@cosmo0920 addressed the comments
@cosmo0920 addressed the comments
@cosmo0920 this code PR looked done and was added to 3.2.0 milestone... can you update the status / fix conflicts and merge? This is holding up doc PR https://github.com/fluent/fluent-bit-docs/pull/1444
@cosmo0920 this code PR looked done and was added to 3.2.0 milestone... can you update the status / fix conflicts and merge? This is holding up doc PR fluent/fluent-bit-docs#1444
This was added as 3.2 milestone but it is still waiting for the user response and changed to the Fluent Bit Next target that is placeholder for the future releases. So, there's no progresses.