out_influxdb: allow stripping of tag prefix
This removes a defined prefix from measurement names which might otherwise be shared between many measurements in the same data bucket.
When writing to a range of different buckets, routing to the corresponding out_influxdb instances happens on tag matches. This change allows to match on tag prefixes, but strip them from the measurement name. This avoids having identical prefixes for all measurement names in the same data bucket.
To achieve this, read from char tag[] with an offset when writing the measurement name, provided the prefix matches the tag completely and the overlap is at most tag_length - 1 characters.
Enter [N/A] in the box, if an item is not applicable to your change.
Testing Before we can approve your change; please submit the following in a comment:
- [x] Example configuration file for the change
- [x] Debug log output from testing the change
- [x] Attached Valgrind output that shows no leaks or memory corruption was found
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
- [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
https://github.com/fluent/fluent-bit-docs/pull/1468
- [x] Documentation required for this feature
Backporting
- [x] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Example configuration file for this change - how to write to different buckets in the same DB without adding measurement name prefixes:
[SERVICE]
flush 1
Daemon off
Log_Level debug
[INPUT]
Name dummy
Tag foo.somedata
Dummy {"msg": "This is foo", "value": 1.3123}
[INPUT]
Name dummy
Tag bar.stream.importantmessage
Dummy {"msg": "completed", "ID": "1234", "tags": ["ID"]}
[INPUT]
Name dummy
Tag bar.stream.somesensor
Dummy {"value": 1, "tags": ["source", "yours"]}
[OUTPUT]
Name influxdb
Match foo.*
strip_prefix foo.
Host localhost
Port 8086
Bucket foo-bucket
Org foobarorg
HTTP_Token my-super-secret-auth-token
[OUTPUT]
Name influxdb
Match bar.*
strip_prefix bar.stream.
Host localhost
Port 8086
Bucket bar-bucket
Org foobarorg
HTTP_Token my-super-secret-auth-token
Debug log output
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
______ _ _ ______ _ _ _____ __
| ___| | | | | ___ (_) | |____ |/ |
| |_ | |_ _ ___ _ __ | |_ | |_/ /_| |_ __ __ / /`| |
| _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / \ \ | |
| | | | |_| | __/ | | | |_ | |_/ / | |_ \ V /.___/ /_| |_
\_| |_|\__,_|\___|_| |_|\__| \____/|_|\__| \_/ \____(_)___/
[2024/09/30 08:25:33] [ info] Configuration:
[2024/09/30 08:25:33] [ info] flush time | 1.000000 seconds
[2024/09/30 08:25:33] [ info] grace | 5 seconds
[2024/09/30 08:25:33] [ info] daemon | 0
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info] inputs:
[2024/09/30 08:25:33] [ info] dummy
[2024/09/30 08:25:33] [ info] dummy
[2024/09/30 08:25:33] [ info] dummy
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info] filters:
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info] outputs:
[2024/09/30 08:25:33] [ info] influxdb.0
[2024/09/30 08:25:33] [ info] influxdb.1
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info] collectors:
[2024/09/30 08:25:33] [ info] [fluent bit] version=3.2.0, commit=cb44828011, pid=7064
[2024/09/30 08:25:33] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/09/30 08:25:33] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/09/30 08:25:33] [ info] [cmetrics] version=0.9.6
[2024/09/30 08:25:33] [ info] [ctraces ] version=0.5.5
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.0] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.0] created event channels: read=24 write=25
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.1] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.1] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.1] created event channels: read=26 write=27
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.2] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.2] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.2] created event channels: read=28 write=29
[2024/09/30 08:25:33] [debug] [influxdb:influxdb.0] created event channels: read=30 write=31
[2024/09/30 08:25:33] [debug] [output:influxdb:influxdb.0] host=localhost port=8086
[2024/09/30 08:25:33] [debug] [influxdb:influxdb.1] created event channels: read=32 write=33
[2024/09/30 08:25:33] [debug] [output:influxdb:influxdb.1] host=localhost port=8086
[2024/09/30 08:25:33] [debug] [router] match rule dummy.0:influxdb.0
[2024/09/30 08:25:33] [debug] [router] match rule dummy.1:influxdb.1
[2024/09/30 08:25:33] [debug] [router] match rule dummy.2:influxdb.1
[2024/09/30 08:25:33] [ info] [sp] stream processor started
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802d730 id=0 OK
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802d8b0 id=1 OK
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802da00 id=2 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #42 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [upstream] KA connection #43 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [upstream] KA connection #44 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #42 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #43 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=0
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802d730 (task_id=0)
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=0
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802d8b0 (task_id=1)
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #44 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=1
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802da00 (task_id=2)
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e18039050 id=0 OK
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e1802d090 id=1 OK
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e1802d950 id=2 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #42 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [upstream] KA connection #43 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [upstream] KA connection #44 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #42 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=1
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e18039050 (task_id=0)
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #44 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=3
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e1802d950 (task_id=2)
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #43 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=2
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e1802d090 (task_id=1)
valgrind memcheck output
==17905== Memcheck, a memory error detector
==17905== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==17905== Using Valgrind-3.19.0 and LibVEX; rerun with -h for copyright info
==17905== Command: ./build/bin/fluent-bit -c ./test.conf
==17905==
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io
______ _ _ ______ _ _ _____ __
| ___| | | | | ___ (_) | |____ |/ |
| |_ | |_ _ ___ _ __ | |_ | |_/ /_| |_ __ __ / /`| |
| _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / / \ \ | |
| | | | |_| | __/ | | | |_ | |_/ / | |_ \ V /.___/ /_| |_
\_| |_|\__,_|\___|_| |_|\__| \____/|_|\__| \_/ \____(_)___/
[2024/09/30 08:36:39] [ info] Configuration:
[2024/09/30 08:36:39] [ info] flush time | 1.000000 seconds
[2024/09/30 08:36:39] [ info] grace | 5 seconds
[2024/09/30 08:36:39] [ info] daemon | 0
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info] inputs:
[2024/09/30 08:36:39] [ info] dummy
[2024/09/30 08:36:39] [ info] dummy
[2024/09/30 08:36:39] [ info] dummy
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info] filters:
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info] outputs:
[2024/09/30 08:36:39] [ info] influxdb.0
[2024/09/30 08:36:39] [ info] influxdb.1
[2024/09/30 08:36:39] [ info] ___________
[.........]
==17905==
==17905== HEAP SUMMARY:
==17905== in use at exit: 0 bytes in 0 blocks
==17905== total heap usage: 5,464 allocs, 5,464 frees, 12,504,833 bytes allocated
==17905==
==17905== All heap blocks were freed -- no leaks are possible
==17905==
==17905== For lists of detected and suppressed errors, rerun with: -s
==17905== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
this PR is pending for review
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
Walkthrough
Adds per-instance tag prefix stripping to the InfluxDB output plugin: a new strip_prefix config is stored as prefix/prefix_len, used at format time to optionally remove the prefix from record tags, and freed on plugin exit. (49 words)
Changes
| Cohort / File(s) | Change Summary |
|---|---|
InfluxDB output: prefix handlingplugins/out_influxdb/influxdb.c, plugins/out_influxdb/influxdb.h |
Add per-instance strip_prefix support: read/store prefix and prefix_len in init, conditionally strip the prefix when building InfluxDB line protocol (adjust header/key emission), add config_map entry, and free prefix in exit; extend struct flb_influxdb with char *prefix and prefix_len. |
Sequence Diagram(s)
sequenceDiagram
autonumber
participant CFG as Config Loader
participant OI as InfluxDB Instance
participant FM as Formatter
participant EM as Emitter
CFG->>OI: cb_influxdb_init (read `strip_prefix`)
Note right of OI: store `prefix` and `prefix_len`
loop For each record
FM->>FM: receive record tag
alt tag startsWith(prefix) and tag.length > prefix_len
FM->>FM: compute offset, produce tag_without_prefix
Note right of FM #dff0d8: (new/changed path)
else
FM->>FM: use original tag
end
FM->>EM: influxdb_bulk_append_header(tag_used, len)
EM-->>EM: emit line protocol
end
OI-->>OI: cb_influxdb_exit
Note right of OI #f8d7da: free(prefix) if allocated
Estimated code review effort
🎯 2 (Simple) | ⏱️ ~10 minutes
Poem
I nibble prefixes from the start,
A tidy tag is little art.
Hop—trimmed names glide into the stream,
Line protocol neat as a dream.
Rabbit cheers: clean data, hop and part! 🐇✨
Pre-merge checks and finishing touches
❌ Failed checks (1 warning)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Docstring Coverage | ⚠️ Warning | Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. | You can run @coderabbitai generate docstrings to improve docstring coverage. |
✅ Passed checks (2 passed)
| Check name | Status | Explanation |
|---|---|---|
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
| Title Check | ✅ Passed | The pull request title "out_influxdb: allow stripping of tag prefix" is directly related to the main change in the changeset. The title clearly identifies the component (out_influxdb) and the primary functionality being added (ability to strip tag prefix from measurement names). The changeset confirms this purpose: it introduces a new strip_prefix configuration property and implements the logic to conditionally remove this prefix when constructing InfluxDB line protocol data. The title is concise, specific, and avoids generic or vague terminology. A teammate reviewing the history would immediately understand that this PR adds prefix stripping capability to the InfluxDB output plugin. |
✨ Finishing touches
- [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
- [ ] Create PR with unit tests
- [ ] Post copyable unit tests in a comment
📜 Recent review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between 79bc4852209dbb2d3307c0d8acf1997aba7a7fc7 and 2d115fa7eb4dbfdfe8947731eb57c89a2598d8d8.
📒 Files selected for processing (2)
plugins/out_influxdb/influxdb.c(5 hunks)plugins/out_influxdb/influxdb.h(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/out_influxdb/influxdb.c (2)
src/flb_output.c (1)
flb_output_get_property(1096-1099)include/fluent-bit/flb_mem.h (1)
flb_free(126-128)
🔇 Additional comments (4)
plugins/out_influxdb/influxdb.h (1)
59-61: LGTM: Fields added to support prefix stripping.The new
prefixandprefix_lenfields are appropriately placed and will store the configured tag prefix for stripping during InfluxDB line protocol formatting.plugins/out_influxdb/influxdb.c (3)
387-394: LGTM: Initialization handles both configured and default cases.The code correctly reads the
strip_prefixconfiguration property and defaults to an empty string when not specified, ensuringctx->prefixis always valid.
622-624: LGTM: Proper cleanup of allocated prefix.The cleanup correctly frees the allocated
ctx->prefixwith a NULL check, following the same pattern used for other dynamically allocated fields.
728-732: LGTM: Configuration property properly defined.The
strip_prefixconfiguration map entry is correctly defined with an appropriate description and follows the same pattern assequence_tag.
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
@ueli-g can you resolve the conflicts shown above please?
@eschabell thanks for the nudge. I did address the valid bug that prefix_offset needs to be reset in every iteration.
Could you have another look please and let me know if there is anything else I should address? I deliberately did not switch to memcmp as suggested by the AI (and some other nitpicks) to stay in line with the existing style rather than updating this everywhere.