fluent-bit
fluent-bit copied to clipboard
in_otel: support to add resource of log
Relates to https://github.com/fluent/fluent-bit/issues/8205 This patch is to modify metadata of in_opentelemetry based on opentelemetry logs spec .
From: There is only a map of "attributes" of "logRecords".
{"logRecords_attributes_key":"value"}
To: Redefined metadata map. It may be a breaking change.
{"ObservedTimestamp":1706312272306657295, "Attributes":{"log.file.name":"a.log"}, "TraceFlags":0, "Resource":{"host.name":"taka-VirtualBox", "os.type":"linux"}, "InstrumentationScope":{"Name":"test_scope", "Version":"1"}}
Enter [N/A]
in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [X] Example configuration file for the change
- [X] Debug log output from testing the change
- [X] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-test
label to test for all targets (requires maintainer to do).
Documentation
- [N/A] Documentation required for this feature
Backporting
- [ ] Backport to latest stable release.
Configuration
[INPUT]
Name opentelemetry
Port 4318
[OUTPUT]
Name stdout
Debug/Valgrind output
- Run fluent-bit
- Send logs using
telemetrygen logs --otlp-http --otlp-insecure --telemetry-attributes "key=\"value\""
Note: I installed telemetrygen using following command.
go install github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen@latest
Reported leak should be fixed by https://github.com/fluent/fluent-bit/pull/8293
$ valgrind --leak-check=full bin/fluent-bit -c otel.cfg
==152173== Memcheck, a memory error detector
==152173== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==152173== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==152173== Command: bin/fluent-bit -c otel.cfg
==152173==
Fluent Bit v2.2.1
* Copyright (C) 2015-2023 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2023/12/17 12:06:01] [ info] [fluent bit] version=2.2.1, commit=2389f44ee4, pid=152173
[2023/12/17 12:06:01] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/12/17 12:06:01] [ info] [cmetrics] version=0.6.5
[2023/12/17 12:06:01] [ info] [ctraces ] version=0.3.1
[2023/12/17 12:06:01] [ info] [input:opentelemetry:opentelemetry.0] initializing
[2023/12/17 12:06:01] [ info] [input:opentelemetry:opentelemetry.0] storage_strategy='memory' (memory only)
[2023/12/17 12:06:01] [ info] [input:opentelemetry:opentelemetry.0] listening on 0.0.0.0:4318
[2023/12/17 12:06:01] [ info] [output:stdout:stdout.0] worker #0 started
[2023/12/17 12:06:01] [ info] [sp] stream processor started
[0] v1_logs: [[1702782363.119620112, {"resource"=>{"attributes"=>{}}, "logRecords"=>{"attributes"=>{"app"=>"server", "value"=>"value"}}}], {"message"=>"the message"}]
^C[2023/12/17 12:06:05] [engine] caught signal (SIGINT)
[2023/12/17 12:06:05] [ warn] [engine] service will shutdown in max 5 seconds
[2023/12/17 12:06:05] [ info] [engine] service has stopped (0 pending tasks)
[2023/12/17 12:06:05] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/12/17 12:06:05] [ info] [output:stdout:stdout.0] thread worker #0 stopped
==152173==
==152173== HEAP SUMMARY:
==152173== in use at exit: 688 bytes in 21 blocks
==152173== total heap usage: 1,607 allocs, 1,586 frees, 1,225,800 bytes allocated
==152173==
==152173== 688 (40 direct, 648 indirect) bytes in 1 blocks are definitely lost in loss record 9 of 9
==152173== at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==152173== by 0x9895E4: system_alloc (protobuf-c.c:154)
==152173== by 0x989634: do_alloc (protobuf-c.c:167)
==152173== by 0x98E534: protobuf_c_message_unpack (protobuf-c.c:3063)
==152173== by 0x9928DB: opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack (logs_service.pb-c.c:43)
==152173== by 0x4BF43F: binary_payload_to_msgpack (opentelemetry_prot.c:413)
==152173== by 0x4C1B4A: process_payload_logs (opentelemetry_prot.c:1378)
==152173== by 0x4C262E: opentelemetry_prot_handle (opentelemetry_prot.c:1686)
==152173== by 0x4B942E: opentelemetry_conn_event (http_conn.c:99)
==152173== by 0x244C82: flb_engine_start (flb_engine.c:1009)
==152173== by 0x1DE9F4: flb_lib_worker (flb_lib.c:638)
==152173== by 0x4FF4AC2: start_thread (pthread_create.c:442)
==152173==
==152173== LEAK SUMMARY:
==152173== definitely lost: 40 bytes in 1 blocks
==152173== indirectly lost: 648 bytes in 20 blocks
==152173== possibly lost: 0 bytes in 0 blocks
==152173== still reachable: 0 bytes in 0 blocks
==152173== suppressed: 0 bytes in 0 blocks
==152173==
==152173== For lists of detected and suppressed errors, rerun with: -s
==152173== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
@braydonk Could you check/test this PR since it relates https://github.com/fluent/fluent-bit/issues/8205 ? This patch is to save "resource" field of logs. However I don't have a tool to send it.
Hi @nokute78, here is a guide so that you can test this and any other future OTLP PRs in the future:
Download an OpenTelemetry Collector from https://github.com/open-telemetry/opentelemetry-collector-releases. You will want to download the latest otelcol-contrib
release for your system (it is crucial that it's otelcol-contrib
, as otherwise some plugins will be missing).
The way I tested it was by running one instance of otelcol-contrib
that read from a file and output OTLP data to Fluent Bit, and have Fluent Bit send that back out to another instance of otelcol-contrib
that read the data and dumped it to a file. Here were the configs I used.
I start these in reverse order. First, I start the collector that will eventually receive OTLP from Fluent Bit.
receivers:
otlp:
protocols:
http:
endpoint: 127.0.0.1:4318
exporters:
file:
path: out.json
service:
telemetry:
logs:
level: debug
pipelines:
logs:
receivers: [otlp]
exporters: [file]
Start the collector with the command otelcol-contrib --config <path to config yaml file>
Then I start the Fluent Bit that will receive and send OTLP back out.
[SERVICE]
Log_level debug
Flush 1
[INPUT]
Name opentelemetry
Listen 127.0.0.1
Port 6969
[OUTPUT]
Name opentelemetry
Match *
Host 127.0.0.1
Port 4318
Logs_uri /v1/logs
Create a file called a.txt
with some log lines.
Finally, start a collector reading from a file and sending to Fluent Bit:
receivers:
filelog:
include: [a.txt]
start_at: beginning
processors:
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
transform/add_scope:
log_statements:
- context: scope
statements:
- set(name, "test_scope")
- set(version, "1")
exporters:
otlphttp:
endpoint: http://127.0.0.1:6969
file:
path: in.json
service:
telemetry:
metrics:
address: ":8889" # This is so the two collectors don't try and bind the same self metrics port
pipelines:
logs:
receivers: [filelog]
processors: ["resourcedetection/system"]
exporters: [file, otlphttp]
(The file
output is optional, but I found it helpful to compare what the OTLP is expected to look like when it comes back out of Fluent Bit).
This pipeline will produce two log files, in.json
which has what the OTLP looks like coming from the collector, and out.json
which is what it looked like after going through Fluent Bit.
If the goal of this PR is only to resolve the input portion of the problem, then it's a good start. I think it would be a good idea to include Instrumentation Scope as well. In the config for the collector reading from a file, I also added some scope attributes. These need to be preserved as well as resource attributes on input.
For the setup I posted above, this PR will not solve the entire pipeline, since the problem is also that scope and resource are erased on output, but as long as this also preserved scope attributes on input then it's a good start.
@braydonk I think it would be good to get all this into an automated set up: https://github.com/fluent/fluent-bit/pull/8294#issuecomment-1860689476
We can add it to fluent-bit-ci then invoke it as required.
@braydonk Thank you for information and comment.
I can test using otelcol-contrib
and output is here.
We can see fluent-bit saves attributes of resource and logRecords as a metadata.
Output of fluent-bit:
[[1703289374.995356038, {"resource"=>{"attributes"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}},
"logRecords"=>{"attributes"=>{"log.file.name"=>"a.txt"}}}]
, {"message"=>"{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1703287606977379083","body":{"kvlistValue":{"values":[{"key":"message","value":{"stringValue":"dummy"}}]}},"traceId":"","spanId":""}]}]}]}"}]
in.json:
{"resourceLogs":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"taka-VirtualBox"}},{"key":"os.type","value":{"stringValue":"linux"}}]},"scopeLogs":[{"scope":{},"logRecords":[{"observedTimeUnixNano":"1703289599189943390","body":{"stringValue":"{\"resourceLogs\":[{\"resource\":{},\"scopeLogs\":[{\"scope\":{},\"logRecords\":[{\"timeUnixNano\":\"1703287606977379083\",\"body\":{\"kvlistValue\":{\"values\":[{\"key\":\"message\",\"value\":{\"stringValue\":\"dummy\"}}]}},\"traceId\":\"\",\"spanId\":\"\"}]}]}]}"},"attributes":[{"key":"log.file.name","value":{"stringValue":"a.txt"}}],"traceId":"","spanId":""}]}],"schemaUrl":"https://opentelemetry.io/schemas/1.6.1"}]}
a.conf:
[SERVICE]
# Log_level debug
Flush 1
[INPUT]
Name opentelemetry
Listen 127.0.0.1
Port 6969
[OUTPUT]
Name stdout
Match *
Valgrind output:
d$ valgrind --leak-check=full bin/fluent-bit -c ~/otel/to_fluentbit/a.conf
==33195== Memcheck, a memory error detector
==33195== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==33195== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==33195== Command: bin/fluent-bit -c /home/taka/otel/to_fluentbit/a.conf
==33195==
Fluent Bit v2.2.1
* Copyright (C) 2015-2023 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2023/12/23 09:23:24] [ info] [fluent bit] version=2.2.1, commit=2f9e2c4f6e, pid=33195
[2023/12/23 09:23:25] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/12/23 09:23:25] [ info] [cmetrics] version=0.6.6
[2023/12/23 09:23:25] [ info] [ctraces ] version=0.4.0
[2023/12/23 09:23:25] [ info] [input:opentelemetry:opentelemetry.0] initializing
[2023/12/23 09:23:25] [ info] [input:opentelemetry:opentelemetry.0] storage_strategy='memory' (memory only)
[2023/12/23 09:23:25] [ info] [input:opentelemetry:opentelemetry.0] listening on 127.0.0.1:6969
[2023/12/23 09:23:25] [ info] [output:stdout:stdout.0] worker #0 started
[2023/12/23 09:23:25] [ info] [sp] stream processor started
[0] v1_logs: [[1703291012.513082047, {"resource"=>{"attributes"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}}, "logRecords"=>{"attributes"=>{"log.file.name"=>"a.txt"}}}], {"message"=>"{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1703287606977379083","body":{"kvlistValue":{"values":[{"key":"message","value":{"stringValue":"dummy"}}]}},"traceId":"","spanId":""}]}]}]}"}]
^C[2023/12/23 09:23:34] [engine] caught signal (SIGINT)
[2023/12/23 09:23:34] [ warn] [engine] service will shutdown in max 5 seconds
[2023/12/23 09:23:34] [ info] [engine] service has stopped (0 pending tasks)
[2023/12/23 09:23:34] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/12/23 09:23:35] [ info] [output:stdout:stdout.0] thread worker #0 stopped
==33195==
==33195== HEAP SUMMARY:
==33195== in use at exit: 0 bytes in 0 blocks
==33195== total heap usage: 1,672 allocs, 1,672 frees, 1,433,078 bytes allocated
==33195==
==33195== All heap blocks were freed -- no leaks are possible
==33195==
==33195== For lists of detected and suppressed errors, rerun with: -s
==33195== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
That's a good start, but that structure in Fluent Bit would be extremely difficult to work with if there was going to be a future implementation of #8206. The hierarchical structure should likely be retained the same way in Fluent Bit as it is when it comes in. Having resource
at the root, but then also having it in plaintext in message
is pretty awkward. Especially because scope
is not at the same level.
Ideally, the structure should be either flat or hierarchical, rather than both. And it should probably be all structured as well, instead of the mixture of JSON string and structured that it is right now.
This structure would also fall apart if there were logs from two different resources, or two different scopes, in the same payload. The only way for this to work is for the Fluent Bit internal representation to match the structure of the OTLP payload, so that all information can be retained upon conversion.
@braydonk Thank you for comment.
I tried to modify metadata structure like hierarchical OTLP payload. This patch was to prevent breaking changes for metadata structure, so I ignored OTLP hierarchy. Note: Current fluent-bit stores only a portion of the OTLP payload as a metadata.
plaintext in message is pretty awkward
I tested using a.txt
that contained OTLP like json and it caused misunderstanding.
a.txt:
{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[{"timeUnixNano":"1703287606977379083","body":{"kvlistValue":{"values":[{"key":"message","value":{"stringValue":"dummy"}}]}},"traceId":"","spanId":""}]}]}]}
A following output is simple text a.log
and it doesn't contain OTLP metadata as a message
.
a.log:
sample log messages
output of fluent-bit:
[0] v1_logs: [[1705109463.456682228, {"resource"=>{"attributes"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}}, "logRecords"=>{"attributes"=>{"log.file.name"=>"a.log"}}}], {"message"=>"sample log messages"}]
https://opentelemetry.io/docs/specs/otel/logs/data-model/ According to the spec, should we support following field as metadata ?
Timestamp, ObservedTimestamp, TraceId, SpanId, TraceFlags, SeverityText, SeverityNumber, Resource, InstrumentationScope and Attributes.
I updated this PR. This patch is to store fields other than body as metadata. The fields are defined here. (The structure is different from OTLP) https://opentelemetry.io/docs/specs/otel/logs/data-model/#definitions-used-in-this-document
Example output:
[0] v1_logs: [[1705715407.696258399, {"ObservedTimestamp"=>1705715407578697276, "Attributes"=>{"log.file.name"=>"a.log"}, "TraceFlags"=>0, "Resource"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}, "InstrumentationScope"=>{}}], {"message"=>"sample log messages"}]
Valgrind output
$ valgrind --leak-check=full bin/fluent-bit -c ~/otel/to_fluentbit/a.conf
==43832== Memcheck, a memory error detector
==43832== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==43832== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==43832== Command: bin/fluent-bit -c /home/taka/otel/to_fluentbit/a.conf
==43832==
Fluent Bit v2.2.1
* Copyright (C) 2015-2023 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
[2024/01/20 10:50:04] [ info] [fluent bit] version=2.2.1, commit=0f0e64f3df, pid=43832
[2024/01/20 10:50:05] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/01/20 10:50:05] [ info] [cmetrics] version=0.6.6
[2024/01/20 10:50:05] [ info] [ctraces ] version=0.4.0
[2024/01/20 10:50:05] [ info] [input:opentelemetry:opentelemetry.0] initializing
[2024/01/20 10:50:05] [ info] [input:opentelemetry:opentelemetry.0] storage_strategy='memory' (memory only)
[2024/01/20 10:50:05] [ info] [input:opentelemetry:opentelemetry.0] listening on 127.0.0.1:6969
[2024/01/20 10:50:05] [ info] [output:stdout:stdout.0] worker #0 started
[2024/01/20 10:50:05] [ info] [sp] stream processor started
[0] v1_logs: [[1705715407.696258399, {"ObservedTimestamp"=>1705715407578697276, "Attributes"=>{"log.file.name"=>"a.log"}, "TraceFlags"=>0, "Resource"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}, "InstrumentationScope"=>{}}], {"message"=>"sample log messages"}]
^C[2024/01/20 10:50:09] [engine] caught signal (SIGINT)
[2024/01/20 10:50:09] [ warn] [engine] service will shutdown in max 5 seconds
[2024/01/20 10:50:10] [ info] [engine] service has stopped (0 pending tasks)
[2024/01/20 10:50:10] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/01/20 10:50:10] [ info] [output:stdout:stdout.0] thread worker #0 stopped
==43832==
==43832== HEAP SUMMARY:
==43832== in use at exit: 0 bytes in 0 blocks
==43832== total heap usage: 1,659 allocs, 1,659 frees, 1,326,164 bytes allocated
==43832==
==43832== All heap blocks were freed -- no leaks are possible
==43832==
==43832== For lists of detected and suppressed errors, rerun with: -s
==43832== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
@nokute78 @braydonk I am wondering if we can use a specific metadata key to "flag" that we have OTel metadata as part of the record so that out_opentelemetry can decode/encode properly without losses.
what do you think ?
@nokute78 I think that structure looks pretty good to me. I am wondering if you could also produce an example that contains Scope Attributes so we could see what that looks like?
@edsiper I see 3 possibilities:
- Specific names that are recognized in metadata as OTLP data fields, so if
out_opentelemetry
finds the literal keyResource
then it assumes it's looking at OTLP fields. (This is the setup that's currently in this PR) - A special metadata key like
OTLPMetadata: true
thatout_opentelemetry
can check for, and if found it will start parsing these fields out. - A special prefix for the OTLP fields like
otlp.*
that can be used to identify things that should be OTLP metadata. So for example in the record @nokute78 posted above:
[0] v1_logs: [[1705715407.696258399, {"otlp.ObservedTimestamp"=>1705715407578697276, "otlp.Attributes"=>{"log.file.name"=>"a.log"}, "otlp.TraceFlags"=>0, "otlp.Resource"=>{"host.name"=>"taka-VirtualBox", "os.type"=>"linux"}, "otlp.InstrumentationScope"=>{}}], {"message"=>"sample log messages"}]
I think any of these 3 could work fine, though there is the vague chance that just doing 1
could lead to clashing fields if for some reason someone put Resource
in their metadata field but didn't intend for it to be for OTLP.
Really I think any one of these (or combination of these) would be fine. I personally like the otlp.*
prefix idea because it's a common pattern to have special prefixes to denominate fields in a log that are internally important for some output plugin. But I think it could still work fine without the prefix.
@braydonk Thank you for checking.
I am wondering if you could also produce an example that contains Scope Attributes so we could see what that looks like?
I don't know the way to produce such examples. I'll check opentelemetry-collector repo.
otelcol-contrib
generates empty scope value and an example of Logs also doesn't contain a value of scope
.
https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#examples
@edsiper How about supporting a property to specify the key of metadata/record ?
e.g. out_opentelemetry supports logs.instrumentationscope_metadata_key
. Default is InstrumentationScope
.
If the metadata contains the key InstrumentationScope
, out_otel adds the value as a scope
.
Too long property name :(
In this comment above in this PR, you can borrow the transform/add_scope
processor that I have in the config which uses OTTL to directly add Scope Attributes to each log.
@braydonk Thank you for comment. I tested following configuration.
output:
[0] v1_logs: [[1706312272.308670763, {"ObservedTimestamp"=>1706312272306657295, "Attributes"=>{"log.file.name"=>"a.log"}, "TraceFlags"=>0, "InstrumentationScope"=>{"Name"=>"test_scope", "Version"=>"1"}}], {"message"=>"sample log messages"}]
receivers:
filelog:
include: [a.log]
start_at: beginning
processors:
resourcedetection/system:
detectors: ["system"]
system:
hostname_sources: ["os"]
transform/add_scope:
log_statements:
- context: scope
statements:
- set(name, "test_scope")
- set(version, "1")
exporters:
otlphttp:
endpoint: http://127.0.0.1:6969
file:
path: in.json
service:
telemetry:
metrics:
address: ":8889" # This is so the two collectors don't try and bind the same self metrics port
pipelines:
logs:
receivers: [filelog]
processors: ["transform/add_scope"]
exporters: [file, otlphttp]
Thanks @nokute78! I think this looks pretty good.
It's probably worth noting that unfortunately this method of packing these labels directly in the metadata in every single log, OTLP Logs are going to be very expensive for Fluent Bit to process. In an OTLP payload it may be one Resource and one Scope with 100 logs, but once flattened to Fluent Bit schema that same Resource and Scope data will be on all 100 logs separately. However I don't really see another easy way to do this, so I think this is good and that will be a tradeoff we might have to accept.
That being said I have no additional comments from my side.
@braydonk Thank you for comment. I also sent a patch for out_otel https://github.com/fluent/fluent-bit/pull/8475 (in progress) Could you check it ?
@edsiper I think this patch is ready to review/merge. Could you review this ?
@edsiper ping
It would be great if we could get this landed - this would close a major gap in making fluentbit a viable OTEL collector
thanks everybody.
I think the remaining part which is out of the scope of this PR is to address this:
OTLP Logs are going to be very expensive for Fluent Bit to process. In an OTLP payload it may be one Resource and one Scope with 100 logs, but once flattened to Fluent Bit schema that same Resource and Scope data will be on all 100 logs separately. However I don't really see another easy way to do this, so I think this is good and that will be a tradeoff we might have to accept.
https://github.com/fluent/fluent-bit/pull/8294#issuecomment-1936720638
at a chunk level we support metadata, actually that's where the Tag is stored. we might think about how to group records that shares metadata... not for this PR, just thinking loudly...
For those interested into what's coming in OTel support, here is the next big PR:
https://github.com/fluent/fluent-bit/pull/8898