fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

out_influxdb: allow stripping of tag prefix

Open ueli-g opened this issue 1 year ago • 3 comments

This removes a defined prefix from measurement names which might otherwise be shared between many measurements in the same data bucket.

When writing to a range of different buckets, routing to the corresponding out_influxdb instances happens on tag matches. This change allows to match on tag prefixes, but strip them from the measurement name. This avoids having identical prefixes for all measurement names in the same data bucket.

To achieve this, read from char tag[] with an offset when writing the measurement name, provided the prefix matches the tag completely and the overlap is at most tag_length - 1 characters.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing Before we can approve your change; please submit the following in a comment:

  • [x] Example configuration file for the change
  • [x] Debug log output from testing the change
  • [x] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

https://github.com/fluent/fluent-bit-docs/pull/1468

  • [x] Documentation required for this feature

Backporting

  • [x] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

ueli-g avatar Sep 26 '24 14:09 ueli-g

Example configuration file for this change - how to write to different buckets in the same DB without adding measurement name prefixes:

[SERVICE]
    flush           1
    Daemon          off
    Log_Level       debug

[INPUT]
    Name        dummy
    Tag         foo.somedata
    Dummy             {"msg": "This is foo", "value": 1.3123}

[INPUT]
    Name        dummy
    Tag         bar.stream.importantmessage
    Dummy             {"msg": "completed", "ID": "1234", "tags": ["ID"]}

[INPUT]
    Name        dummy
    Tag         bar.stream.somesensor
    Dummy             {"value": 1, "tags": ["source", "yours"]}

[OUTPUT]
    Name          influxdb
    Match         foo.*
    strip_prefix  foo.
    Host          localhost
    Port          8086
    Bucket        foo-bucket
    Org           foobarorg
    HTTP_Token    my-super-secret-auth-token

[OUTPUT]
    Name          influxdb
    Match         bar.*
    strip_prefix  bar.stream.
    Host          localhost
    Port          8086
    Bucket        bar-bucket
    Org           foobarorg
    HTTP_Token    my-super-secret-auth-token

ueli-g avatar Sep 30 '24 08:09 ueli-g

Debug log output

Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/09/30 08:25:33] [ info] Configuration:
[2024/09/30 08:25:33] [ info]  flush time     | 1.000000 seconds
[2024/09/30 08:25:33] [ info]  grace          | 5 seconds
[2024/09/30 08:25:33] [ info]  daemon         | 0
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info]  inputs:
[2024/09/30 08:25:33] [ info]      dummy
[2024/09/30 08:25:33] [ info]      dummy
[2024/09/30 08:25:33] [ info]      dummy
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info]  filters:
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info]  outputs:
[2024/09/30 08:25:33] [ info]      influxdb.0
[2024/09/30 08:25:33] [ info]      influxdb.1
[2024/09/30 08:25:33] [ info] ___________
[2024/09/30 08:25:33] [ info]  collectors:
[2024/09/30 08:25:33] [ info] [fluent bit] version=3.2.0, commit=cb44828011, pid=7064
[2024/09/30 08:25:33] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/09/30 08:25:33] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/09/30 08:25:33] [ info] [cmetrics] version=0.9.6
[2024/09/30 08:25:33] [ info] [ctraces ] version=0.5.5
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.0] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.0] created event channels: read=24 write=25
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.1] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.1] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.1] created event channels: read=26 write=27
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.2] initializing
[2024/09/30 08:25:33] [ info] [input:dummy:dummy.2] storage_strategy='memory' (memory only)
[2024/09/30 08:25:33] [debug] [dummy:dummy.2] created event channels: read=28 write=29
[2024/09/30 08:25:33] [debug] [influxdb:influxdb.0] created event channels: read=30 write=31
[2024/09/30 08:25:33] [debug] [output:influxdb:influxdb.0] host=localhost port=8086
[2024/09/30 08:25:33] [debug] [influxdb:influxdb.1] created event channels: read=32 write=33
[2024/09/30 08:25:33] [debug] [output:influxdb:influxdb.1] host=localhost port=8086
[2024/09/30 08:25:33] [debug] [router] match rule dummy.0:influxdb.0
[2024/09/30 08:25:33] [debug] [router] match rule dummy.1:influxdb.1
[2024/09/30 08:25:33] [debug] [router] match rule dummy.2:influxdb.1
[2024/09/30 08:25:33] [ info] [sp] stream processor started
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802d730 id=0 OK
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802d8b0 id=1 OK
[2024/09/30 08:25:34] [debug] [task] created task=0x7f7e1802da00 id=2 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #42 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [upstream] KA connection #43 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [upstream] KA connection #44 to localhost:8086 is connected
[2024/09/30 08:25:34] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #42 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #43 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=0
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802d730 (task_id=0)
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=0
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802d8b0 (task_id=1)
[2024/09/30 08:25:34] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:34] [debug] [upstream] KA connection #44 to localhost:8086 is now available
[2024/09/30 08:25:34] [debug] [out flush] cb_destroy coro_id=1
[2024/09/30 08:25:34] [debug] [task] destroy task=0x7f7e1802da00 (task_id=2)
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e18039050 id=0 OK
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e1802d090 id=1 OK
[2024/09/30 08:25:35] [debug] [task] created task=0x7f7e1802d950 id=2 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #42 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [upstream] KA connection #43 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [upstream] KA connection #44 to localhost:8086 has been assigned (recycled)
[2024/09/30 08:25:35] [debug] [http_client] not using http_proxy for header
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.0] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #42 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=1
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e18039050 (task_id=0)
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #44 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=3
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e1802d950 (task_id=2)
[2024/09/30 08:25:35] [debug] [output:influxdb:influxdb.1] http_do=0 OK
[2024/09/30 08:25:35] [debug] [upstream] KA connection #43 to localhost:8086 is now available
[2024/09/30 08:25:35] [debug] [out flush] cb_destroy coro_id=2
[2024/09/30 08:25:35] [debug] [task] destroy task=0x7f7e1802d090 (task_id=1)

ueli-g avatar Sep 30 '24 08:09 ueli-g

valgrind memcheck output

==17905== Memcheck, a memory error detector
==17905== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==17905== Using Valgrind-3.19.0 and LibVEX; rerun with -h for copyright info
==17905== Command: ./build/bin/fluent-bit -c ./test.conf
==17905== 
Fluent Bit v3.2.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

______ _                  _    ______ _ _           _____  __  
|  ___| |                | |   | ___ (_) |         |____ |/  | 
| |_  | |_   _  ___ _ __ | |_  | |_/ /_| |_  __   __   / /`| | 
|  _| | | | | |/ _ \ '_ \| __| | ___ \ | __| \ \ / /   \ \ | | 
| |   | | |_| |  __/ | | | |_  | |_/ / | |_   \ V /.___/ /_| |_
\_|   |_|\__,_|\___|_| |_|\__| \____/|_|\__|   \_/ \____(_)___/

[2024/09/30 08:36:39] [ info] Configuration:
[2024/09/30 08:36:39] [ info]  flush time     | 1.000000 seconds
[2024/09/30 08:36:39] [ info]  grace          | 5 seconds
[2024/09/30 08:36:39] [ info]  daemon         | 0
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info]  inputs:
[2024/09/30 08:36:39] [ info]      dummy
[2024/09/30 08:36:39] [ info]      dummy
[2024/09/30 08:36:39] [ info]      dummy
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info]  filters:
[2024/09/30 08:36:39] [ info] ___________
[2024/09/30 08:36:39] [ info]  outputs:
[2024/09/30 08:36:39] [ info]      influxdb.0
[2024/09/30 08:36:39] [ info]      influxdb.1
[2024/09/30 08:36:39] [ info] ___________
[.........]
==17905== 
==17905== HEAP SUMMARY:
==17905==     in use at exit: 0 bytes in 0 blocks
==17905==   total heap usage: 5,464 allocs, 5,464 frees, 12,504,833 bytes allocated
==17905== 
==17905== All heap blocks were freed -- no leaks are possible
==17905== 
==17905== For lists of detected and suppressed errors, rerun with: -s
==17905== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

ueli-g avatar Sep 30 '24 08:09 ueli-g

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Jan 01 '25 02:01 github-actions[bot]

this PR is pending for review

ueli-g avatar Jan 06 '25 13:01 ueli-g

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Sep 07 '25 02:09 github-actions[bot]

Walkthrough

Adds per-instance tag prefix stripping to the InfluxDB output plugin: a new strip_prefix config is stored as prefix/prefix_len, used at format time to optionally remove the prefix from record tags, and freed on plugin exit. (49 words)

Changes

Cohort / File(s) Change Summary
InfluxDB output: prefix handling
plugins/out_influxdb/influxdb.c, plugins/out_influxdb/influxdb.h
Add per-instance strip_prefix support: read/store prefix and prefix_len in init, conditionally strip the prefix when building InfluxDB line protocol (adjust header/key emission), add config_map entry, and free prefix in exit; extend struct flb_influxdb with char *prefix and prefix_len.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant CFG as Config Loader
  participant OI as InfluxDB Instance
  participant FM as Formatter
  participant EM as Emitter

  CFG->>OI: cb_influxdb_init (read `strip_prefix`)
  Note right of OI: store `prefix` and `prefix_len`
  loop For each record
    FM->>FM: receive record tag
    alt tag startsWith(prefix) and tag.length > prefix_len
      FM->>FM: compute offset, produce tag_without_prefix
      Note right of FM #dff0d8: (new/changed path)
    else
      FM->>FM: use original tag
    end
    FM->>EM: influxdb_bulk_append_header(tag_used, len)
    EM-->>EM: emit line protocol
  end
  OI-->>OI: cb_influxdb_exit
  Note right of OI #f8d7da: free(prefix) if allocated

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

I nibble prefixes from the start,
A tidy tag is little art.
Hop—trimmed names glide into the stream,
Line protocol neat as a dream.
Rabbit cheers: clean data, hop and part! 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "out_influxdb: allow stripping of tag prefix" is directly related to the main change in the changeset. The title clearly identifies the component (out_influxdb) and the primary functionality being added (ability to strip tag prefix from measurement names). The changeset confirms this purpose: it introduces a new strip_prefix configuration property and implements the logic to conditionally remove this prefix when constructing InfluxDB line protocol data. The title is concise, specific, and avoids generic or vague terminology. A teammate reviewing the history would immediately understand that this PR adds prefix stripping capability to the InfluxDB output plugin.
✨ Finishing touches
  • [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • [ ] Create PR with unit tests
  • [ ] Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 79bc4852209dbb2d3307c0d8acf1997aba7a7fc7 and 2d115fa7eb4dbfdfe8947731eb57c89a2598d8d8.

📒 Files selected for processing (2)
  • plugins/out_influxdb/influxdb.c (5 hunks)
  • plugins/out_influxdb/influxdb.h (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/out_influxdb/influxdb.c (2)
src/flb_output.c (1)
  • flb_output_get_property (1096-1099)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
🔇 Additional comments (4)
plugins/out_influxdb/influxdb.h (1)

59-61: LGTM: Fields added to support prefix stripping.

The new prefix and prefix_len fields are appropriately placed and will store the configured tag prefix for stripping during InfluxDB line protocol formatting.

plugins/out_influxdb/influxdb.c (3)

387-394: LGTM: Initialization handles both configured and default cases.

The code correctly reads the strip_prefix configuration property and defaults to an empty string when not specified, ensuring ctx->prefix is always valid.


622-624: LGTM: Proper cleanup of allocated prefix.

The cleanup correctly frees the allocated ctx->prefix with a NULL check, following the same pattern used for other dynamically allocated fields.


728-732: LGTM: Configuration property properly defined.

The strip_prefix configuration map entry is correctly defined with an appropriate description and follows the same pattern as sequence_tag.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot] avatar Sep 29 '25 13:09 coderabbitai[bot]

@ueli-g can you resolve the conflicts shown above please?

eschabell avatar Oct 22 '25 00:10 eschabell

@eschabell thanks for the nudge. I did address the valid bug that prefix_offset needs to be reset in every iteration.

Could you have another look please and let me know if there is anything else I should address? I deliberately did not switch to memcmp as suggested by the AI (and some other nitpicks) to stay in line with the existing style rather than updating this everywhere.

ueli-g avatar Oct 24 '25 16:10 ueli-g