opentelemetry: refactor json logs input and set metadata using record accessor
in_opentelemetry: convert json to msgpack using Fluent Bit API to make parsing easier (instead of directly using jsmn)
out_opentelemetry: use record accessor to set metadata fields like trace_id, span_is, etc...
Addresses #6323
Enter [N/A] in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
[INPUT]
Name dummy
Tag dummy.log
Rate 1
Samples 1
Dummy {"severity_text":"very severe text", "severity_number": "2", "resource":[{"resource-attr":"resource-val1"}],"body":{"testbody": "bod"},"traceid":"2e9ab9abb4fca54cfda61ffea37e429d","spanid":"804df3bd74eb09e2","attributes":[{"testkey":"testval"}]}
[OUTPUT]
Name stdout
Match *
[OUTPUT]
Match *
Name opentelemetry
Host 0.0.0.0
Port 3434
Logs_uri /v1/logs
attributes_key $attributes
resource_key $resource
trace_id_key $traceid
span_id_key $spanid
body_key $body
severity_number_key $severity_number
severity_text_key $severity_text
- [x] Debug log output from testing the change
root@72be08b8372b:/fluent-bit/build# ./bin/fluent-bit -c ../dev-files/confs/ra_test.conf -v
Fluent Bit v2.0.9
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2023/02/14 11:00:31] [ info] Configuration:
[2023/02/14 11:00:31] [ info] flush time | 1.000000 seconds
[2023/02/14 11:00:31] [ info] grace | 5 seconds
[2023/02/14 11:00:31] [ info] daemon | 0
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info] inputs:
[2023/02/14 11:00:31] [ info] dummy
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info] filters:
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info] outputs:
[2023/02/14 11:00:31] [ info] stdout.0
[2023/02/14 11:00:31] [ info] opentelemetry.1
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info] collectors:
[2023/02/14 11:00:31] [ info] [fluent bit] version=2.0.9, commit=7bcb502ebd, pid=252312
[2023/02/14 11:00:31] [debug] [engine] coroutine stack size: 196608 bytes (192.0K)
[2023/02/14 11:00:31] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/02/14 11:00:31] [ info] [cmetrics] version=0.5.8
[2023/02/14 11:00:31] [ info] [ctraces ] version=0.2.7
[2023/02/14 11:00:31] [ info] [input:dummy:dummy.0] initializing
[2023/02/14 11:00:31] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2023/02/14 11:00:31] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2023/02/14 11:00:31] [debug] [stdout:stdout.0] created event channels: read=23 write=24
[2023/02/14 11:00:31] [debug] [opentelemetry:opentelemetry.1] created event channels: read=30 write=31
[2023/02/14 11:00:31] [ info] [output:stdout:stdout.0] worker #0 started
[2023/02/14 11:00:31] [debug] [router] match rule dummy.0:stdout.0
[2023/02/14 11:00:31] [debug] [router] match rule dummy.0:opentelemetry.1
[2023/02/14 11:00:31] [ info] [sp] stream processor started
[2023/02/14 11:00:31] [debug] [input chunk] update output instances with new chunk size diff=214
[2023/02/14 11:00:32] [debug] [task] created task=0xffff9401de40 id=0 OK
[2023/02/14 11:00:32] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] dummy.log: [1676372431.277538465, {"severity_text"=>"very severe text", "severity_number"=>"2", "resource"=>[{"resource-attr"=>"resource-val1"}], "body"=>{"testbody"=>"bod"}, "traceid"=>"2e9ab9abb4fca54cfda61ffea37e429d", "spanid"=>"804df3bd74eb09e2", "attributes"=>[{"testkey"=>"testval"}]}]
[2023/02/14 11:00:32] [debug] [out flush] cb_destroy coro_id=0
[2023/02/14 11:00:32] [debug] [http_client] not using http_proxy for header
[2023/02/14 11:00:32] [ info] [output:opentelemetry:opentelemetry.1] 0.0.0.0:3434, HTTP status=200
[2023/02/14 11:00:32] [debug] [out flush] cb_destroy coro_id=0
[2023/02/14 11:00:32] [debug] [task] destroy task=0xffff9401de40 (task_id=0)
^C[2023/02/14 11:00:36] [engine] caught signal (SIGINT)
[2023/02/14 11:00:36] [ warn] [engine] service will shutdown in max 5 seconds
[2023/02/14 11:00:36] [ info] [input] pausing dummy.0
[2023/02/14 11:00:37] [ info] [engine] service has stopped (0 pending tasks)
[2023/02/14 11:00:37] [ info] [input] pausing dummy.0
[2023/02/14 11:00:37] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/02/14 11:00:37] [ info] [output:stdout:stdout.0] thread worker #0 stopped
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
root@72be08b8372b:/fluent-bit/build# valgrind --leak-check=full ./bin/fluent-bit -c ../dev-files/confs/ra_test.conf
==252308== Memcheck, a memory error detector
==252308== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==252308== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==252308== Command: ./bin/fluent-bit -c ../dev-files/confs/ra_test.conf
==252308==
Fluent Bit v2.0.9
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2023/02/14 10:59:39] [ info] [fluent bit] version=2.0.9, commit=7bcb502ebd, pid=252308
[2023/02/14 10:59:39] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/02/14 10:59:39] [ info] [output:stdout:stdout.0] worker #0 started
[2023/02/14 10:59:39] [ info] [cmetrics] version=0.5.8
[2023/02/14 10:59:39] [ info] [ctraces ] version=0.2.7
[2023/02/14 10:59:39] [ info] [input:dummy:dummy.0] initializing
[2023/02/14 10:59:39] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2023/02/14 10:59:39] [ info] [sp] stream processor started
[0] dummy.log: [1676372380.321719136, {"severity_text"=>"very severe text", "severity_number"=>"2", "resource"=>[{"resource-attr"=>"resource-val1"}], "body"=>{"testbody"=>"bod"}, "traceid"=>"2e9ab9abb4fca54cfda61ffea37e429d", "spanid"=>"804df3bd74eb09e2", "attributes"=>[{"testkey"=>"testval"}]}]
[2023/02/14 10:59:41] [ info] [output:opentelemetry:opentelemetry.1] 0.0.0.0:3434, HTTP status=200
^C[2023/02/14 10:59:42] [engine] caught signal (SIGINT)
[2023/02/14 10:59:42] [ warn] [engine] service will shutdown in max 5 seconds
[2023/02/14 10:59:42] [ info] [input] pausing dummy.0
[2023/02/14 10:59:43] [ info] [engine] service has stopped (0 pending tasks)
[2023/02/14 10:59:43] [ info] [input] pausing dummy.0
[2023/02/14 10:59:43] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/02/14 10:59:43] [ info] [output:stdout:stdout.0] thread worker #0 stopped
clear array: 0x5479b80==252308==
==252308== HEAP SUMMARY:
==252308== in use at exit: 0 bytes in 0 blocks
==252308== total heap usage: 2,103 allocs, 2,103 frees, 1,466,157 bytes allocated
==252308==
==252308== All heap blocks were freed -- no leaks are possible
==252308==
==252308== For lists of detected and suppressed errors, rerun with: -s
==252308== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [ ] Run local packaging test showing all targets (including any new ones) build.
- [ ] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [ ] Documentation required for this feature
Backporting
- [ ] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
~~Will link a docs PR in a while~~ docs PR: https://github.com/fluent/fluent-bit-docs/pull/1049
@Syn3rman pls take a look at the CI issues reported, it seems this PR breaks record accessor functionality:
https://github.com/fluent/fluent-bit/actions/runs/4173099707/jobs/7225026177#step:5:2399
assigned to @leonardo-albertovich for review
How long will it take to merge
This PR will not be merged. If there is anything that is not covered by the plugin at its current state we should resume the conversation but the fact is that during march while working in the log abstraction layer I found that the opentelemetry input plugin (as it was) did not implement the JSON format properly so I refactored it in that PR and there are a lot of conflicts between these two.
I noticed in this PR that there are changes in the record accessor component and that it's used so I assume this doesn't just revamp the JSON decoder as it was but also implements additional features which if appropriate should be re-submitted in a new PR.
I'm out of the loop but maybe @Syn3rman can chime in and we can get something rolling so the least amount of effort is wasted.
This PR will not be merged. If there is anything that is not covered by the plugin at its current state we should resume the conversation but the fact is that during march while working in the log abstraction layer I found that the opentelemetry input plugin (as it was) did not implement the JSON format properly so I refactored it in that PR and there are a lot of conflicts between these two.
I noticed in this PR that there are changes in the record accessor component and that it's used so I assume this doesn't just revamp the JSON decoder as it was but also implements additional features which if appropriate should be re-submitted in a new PR.
I'm out of the loop but maybe @Syn3rman can chime in and we can get something rolling so the least amount of effort is wasted.
Currently, one of my programs cannot integrate with the OTEL SDK for some reason. Therefore, in order to implement tracing, I have modified my program's logs to comply with the OTEL trace.proto specification. Initially, I intended to use an OpenTelemetry output plugin to forward the logs to OTEL, but it seems that this is not possible.
However, I was able to implement this functionality through an HTTP output plugin. Unfortunately, OTEL does not accept aggregated data passed through the HTTP plugin, so I am currently very troubled by this.
Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.
Feel free to reach out in slack if you feel more comfortable in that way.
Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.
Feel free to reach out in slack if you feel more comfortable in that way.
Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.
Feel free to reach out in slack if you feel more comfortable in that way.
I've signed up to join the Slack channel. My email address for application is: [email protected]
Great, my name there is Leonardo Almiñana, you can find me in the fluent-bit channel. Feel free to send me a private message.