fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

opentelemetry: refactor json logs input and set metadata using record accessor

Open Syn3rman opened this issue 2 years ago • 9 comments

in_opentelemetry: convert json to msgpack using Fluent Bit API to make parsing easier (instead of directly using jsmn)

out_opentelemetry: use record accessor to set metadata fields like trace_id, span_is, etc...

Addresses #6323


Enter [N/A] in the box, if an item is not applicable to your change.

Testing Before we can approve your change; please submit the following in a comment:

  • [x] Example configuration file for the change
[INPUT]
  Name   dummy
  Tag    dummy.log
  Rate 1
  Samples 1
  Dummy {"severity_text":"very severe text", "severity_number": "2", "resource":[{"resource-attr":"resource-val1"}],"body":{"testbody": "bod"},"traceid":"2e9ab9abb4fca54cfda61ffea37e429d","spanid":"804df3bd74eb09e2","attributes":[{"testkey":"testval"}]}

[OUTPUT]
  Name stdout
  Match *

[OUTPUT]
  Match *
  Name opentelemetry
  Host 0.0.0.0
  Port 3434
  Logs_uri /v1/logs
  attributes_key $attributes
  resource_key $resource
  trace_id_key $traceid
  span_id_key $spanid
  body_key $body
  severity_number_key $severity_number
  severity_text_key $severity_text
  • [x] Debug log output from testing the change
root@72be08b8372b:/fluent-bit/build# ./bin/fluent-bit -c  ../dev-files/confs/ra_test.conf -v
Fluent Bit v2.0.9
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/02/14 11:00:31] [ info] Configuration:
[2023/02/14 11:00:31] [ info]  flush time     | 1.000000 seconds
[2023/02/14 11:00:31] [ info]  grace          | 5 seconds
[2023/02/14 11:00:31] [ info]  daemon         | 0
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info]  inputs:
[2023/02/14 11:00:31] [ info]      dummy
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info]  filters:
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info]  outputs:
[2023/02/14 11:00:31] [ info]      stdout.0
[2023/02/14 11:00:31] [ info]      opentelemetry.1
[2023/02/14 11:00:31] [ info] ___________
[2023/02/14 11:00:31] [ info]  collectors:
[2023/02/14 11:00:31] [ info] [fluent bit] version=2.0.9, commit=7bcb502ebd, pid=252312
[2023/02/14 11:00:31] [debug] [engine] coroutine stack size: 196608 bytes (192.0K)
[2023/02/14 11:00:31] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/02/14 11:00:31] [ info] [cmetrics] version=0.5.8
[2023/02/14 11:00:31] [ info] [ctraces ] version=0.2.7
[2023/02/14 11:00:31] [ info] [input:dummy:dummy.0] initializing
[2023/02/14 11:00:31] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2023/02/14 11:00:31] [debug] [dummy:dummy.0] created event channels: read=21 write=22
[2023/02/14 11:00:31] [debug] [stdout:stdout.0] created event channels: read=23 write=24
[2023/02/14 11:00:31] [debug] [opentelemetry:opentelemetry.1] created event channels: read=30 write=31
[2023/02/14 11:00:31] [ info] [output:stdout:stdout.0] worker #0 started
[2023/02/14 11:00:31] [debug] [router] match rule dummy.0:stdout.0
[2023/02/14 11:00:31] [debug] [router] match rule dummy.0:opentelemetry.1
[2023/02/14 11:00:31] [ info] [sp] stream processor started
[2023/02/14 11:00:31] [debug] [input chunk] update output instances with new chunk size diff=214
[2023/02/14 11:00:32] [debug] [task] created task=0xffff9401de40 id=0 OK
[2023/02/14 11:00:32] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] dummy.log: [1676372431.277538465, {"severity_text"=>"very severe text", "severity_number"=>"2", "resource"=>[{"resource-attr"=>"resource-val1"}], "body"=>{"testbody"=>"bod"}, "traceid"=>"2e9ab9abb4fca54cfda61ffea37e429d", "spanid"=>"804df3bd74eb09e2", "attributes"=>[{"testkey"=>"testval"}]}]
[2023/02/14 11:00:32] [debug] [out flush] cb_destroy coro_id=0
[2023/02/14 11:00:32] [debug] [http_client] not using http_proxy for header
[2023/02/14 11:00:32] [ info] [output:opentelemetry:opentelemetry.1] 0.0.0.0:3434, HTTP status=200


[2023/02/14 11:00:32] [debug] [out flush] cb_destroy coro_id=0
[2023/02/14 11:00:32] [debug] [task] destroy task=0xffff9401de40 (task_id=0)
^C[2023/02/14 11:00:36] [engine] caught signal (SIGINT)
[2023/02/14 11:00:36] [ warn] [engine] service will shutdown in max 5 seconds
[2023/02/14 11:00:36] [ info] [input] pausing dummy.0
[2023/02/14 11:00:37] [ info] [engine] service has stopped (0 pending tasks)
[2023/02/14 11:00:37] [ info] [input] pausing dummy.0
[2023/02/14 11:00:37] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/02/14 11:00:37] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • [x] Attached Valgrind output that shows no leaks or memory corruption was found
root@72be08b8372b:/fluent-bit/build# valgrind --leak-check=full ./bin/fluent-bit -c  ../dev-files/confs/ra_test.conf
==252308== Memcheck, a memory error detector
==252308== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==252308== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==252308== Command: ./bin/fluent-bit -c ../dev-files/confs/ra_test.conf
==252308==
Fluent Bit v2.0.9
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/02/14 10:59:39] [ info] [fluent bit] version=2.0.9, commit=7bcb502ebd, pid=252308
[2023/02/14 10:59:39] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/02/14 10:59:39] [ info] [output:stdout:stdout.0] worker #0 started
[2023/02/14 10:59:39] [ info] [cmetrics] version=0.5.8
[2023/02/14 10:59:39] [ info] [ctraces ] version=0.2.7
[2023/02/14 10:59:39] [ info] [input:dummy:dummy.0] initializing
[2023/02/14 10:59:39] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2023/02/14 10:59:39] [ info] [sp] stream processor started
[0] dummy.log: [1676372380.321719136, {"severity_text"=>"very severe text", "severity_number"=>"2", "resource"=>[{"resource-attr"=>"resource-val1"}], "body"=>{"testbody"=>"bod"}, "traceid"=>"2e9ab9abb4fca54cfda61ffea37e429d", "spanid"=>"804df3bd74eb09e2", "attributes"=>[{"testkey"=>"testval"}]}]
[2023/02/14 10:59:41] [ info] [output:opentelemetry:opentelemetry.1] 0.0.0.0:3434, HTTP status=200


^C[2023/02/14 10:59:42] [engine] caught signal (SIGINT)
[2023/02/14 10:59:42] [ warn] [engine] service will shutdown in max 5 seconds
[2023/02/14 10:59:42] [ info] [input] pausing dummy.0
[2023/02/14 10:59:43] [ info] [engine] service has stopped (0 pending tasks)
[2023/02/14 10:59:43] [ info] [input] pausing dummy.0
[2023/02/14 10:59:43] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/02/14 10:59:43] [ info] [output:stdout:stdout.0] thread worker #0 stopped
clear array: 0x5479b80==252308==
==252308== HEAP SUMMARY:
==252308==     in use at exit: 0 bytes in 0 blocks
==252308==   total heap usage: 2,103 allocs, 2,103 frees, 1,466,157 bytes allocated
==252308==
==252308== All heap blocks were freed -- no leaks are possible
==252308==
==252308== For lists of detected and suppressed errors, rerun with: -s
==252308== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [ ] Run local packaging test showing all targets (including any new ones) build.
  • [ ] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [ ] Documentation required for this feature

Backporting

  • [ ] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Syn3rman avatar Feb 14 '23 11:02 Syn3rman

~~Will link a docs PR in a while~~ docs PR: https://github.com/fluent/fluent-bit-docs/pull/1049

Syn3rman avatar Feb 14 '23 11:02 Syn3rman

@Syn3rman pls take a look at the CI issues reported, it seems this PR breaks record accessor functionality:

https://github.com/fluent/fluent-bit/actions/runs/4173099707/jobs/7225026177#step:5:2399

edsiper avatar Feb 14 '23 21:02 edsiper

assigned to @leonardo-albertovich for review

edsiper avatar Feb 24 '23 15:02 edsiper

How long will it take to merge

ZpitQ avatar May 04 '23 07:05 ZpitQ

This PR will not be merged. If there is anything that is not covered by the plugin at its current state we should resume the conversation but the fact is that during march while working in the log abstraction layer I found that the opentelemetry input plugin (as it was) did not implement the JSON format properly so I refactored it in that PR and there are a lot of conflicts between these two.

I noticed in this PR that there are changes in the record accessor component and that it's used so I assume this doesn't just revamp the JSON decoder as it was but also implements additional features which if appropriate should be re-submitted in a new PR.

I'm out of the loop but maybe @Syn3rman can chime in and we can get something rolling so the least amount of effort is wasted.

leonardo-albertovich avatar May 04 '23 08:05 leonardo-albertovich

This PR will not be merged. If there is anything that is not covered by the plugin at its current state we should resume the conversation but the fact is that during march while working in the log abstraction layer I found that the opentelemetry input plugin (as it was) did not implement the JSON format properly so I refactored it in that PR and there are a lot of conflicts between these two.

I noticed in this PR that there are changes in the record accessor component and that it's used so I assume this doesn't just revamp the JSON decoder as it was but also implements additional features which if appropriate should be re-submitted in a new PR.

I'm out of the loop but maybe @Syn3rman can chime in and we can get something rolling so the least amount of effort is wasted.

Currently, one of my programs cannot integrate with the OTEL SDK for some reason. Therefore, in order to implement tracing, I have modified my program's logs to comply with the OTEL trace.proto specification. Initially, I intended to use an OpenTelemetry output plugin to forward the logs to OTEL, but it seems that this is not possible.

However, I was able to implement this functionality through an HTTP output plugin. Unfortunately, OTEL does not accept aggregated data passed through the HTTP plugin, so I am currently very troubled by this.

ZpitQ avatar May 04 '23 08:05 ZpitQ

Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.

Feel free to reach out in slack if you feel more comfortable in that way.

leonardo-albertovich avatar May 04 '23 09:05 leonardo-albertovich

Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.

Feel free to reach out in slack if you feel more comfortable in that way.

Would you mind elaborating on your specific use case? I'd love to hear about it and see if there's anything we can do to help you.

Feel free to reach out in slack if you feel more comfortable in that way.

I've signed up to join the Slack channel. My email address for application is: [email protected]

ZpitQ avatar May 04 '23 09:05 ZpitQ

Great, my name there is Leonardo Almiñana, you can find me in the fluent-bit channel. Feel free to send me a private message.

leonardo-albertovich avatar May 04 '23 09:05 leonardo-albertovich