fluentd
fluentd copied to clipboard
parser: support subsecond resolution with comma (%s,%L)
Is your feature request related to a problem? Please describe.
I am sending Kafka logs through Fluentd to OpenSearch.
Sample Logs:
[2022-08-01 04:12:02,278] INFO [UnifiedLog partition=fluentd-0, dir=/bitnami/kafka/data] Incremented log start offset to 813432079 due to leader offset increment (kafka.log.UnifiedLog)
[2022-08-01 04:12:38,398] INFO [UnifiedLog partition=fluentd-0, dir=/bitnami/kafka/data] Deleting segments due to log start offset 813432079 breach: LogSegment(baseOffset=812370806, size=1073523310, lastModifiedTime=1659242508257, largestRecordTimestamp=Some(1659157721930)) (kafka.log.UnifiedLog)
[2022-08-01 04:13:38,401] INFO [LocalLog partition=fluentd-0, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=812370806, size=1073523310, lastModifiedTime=1659242508257, largestRecordTimestamp=Some(1659157721930)) (kafka.log.LocalLog$)
[2022-08-01 04:13:38,410] INFO Deleted log /bitnami/kafka/data/fluentd-0/00000000000812370806.log.deleted. (kafka.log.LogSegment)
[2022-08-01 04:13:38,411] INFO Deleted offset index /bitnami/kafka/data/fluentd-0/00000000000812370806.index.deleted. (kafka.log.LogSegment)
[2022-08-01 04:13:38,415] INFO Deleted time index /bitnami/kafka/data/fluentd-0/00000000000812370806.timeindex.deleted. (kafka.log.LogSegment)
Relevant Filter Config:
<filter docker.kafka1.**>
@type parser
key_name log
remove_key_name_field true
<parse>
@type regexp
expression /\[(?<time>[^\]]*)\]\s(?<log_level>[A-Z]{4,5})\s(?<log_message>.*)/m
time_format %Y-%m-%d %H:%M:%S,%L
</parse>
</filter>
Fluentd Errors:
2022-08-01 04:12:12 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:12:02,278, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:12:02,278' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:12:12.969909299 +0000 record={"container_name"=>"/kafka1", "source"=>"stdout", "log"=>"[2022-08-01 04:12:02,278] INFO [UnifiedLog partition=fluentd-0, dir=/bitnami/kafka/data] Incremented log start offset to 813432079 due to leader offset increment (kafka.log.UnifiedLog)", "container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
2022-08-01 04:12:48 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:12:38,398, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:12:38,398' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:12:48.730864061 +0000 record={"container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "container_name"=>"/kafka1", "source"=>"stdout", "log"=>"[2022-08-01 04:12:38,398] INFO [UnifiedLog partition=fluentd-0, dir=/bitnami/kafka/data] Deleting segments due to log start offset 813432079 breach: LogSegment(baseOffset=812370806, size=1073523310, lastModifiedTime=1659242508257, largestRecordTimestamp=Some(1659157721930)) (kafka.log.UnifiedLog)", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
2022-08-01 04:13:48 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:13:38,401, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:13:38,401' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:13:48.748512392 +0000 record={"source"=>"stdout", "log"=>"[2022-08-01 04:13:38,401] INFO [LocalLog partition=fluentd-0, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=812370806, size=1073523310, lastModifiedTime=1659242508257, largestRecordTimestamp=Some(1659157721930)) (kafka.log.LocalLog$)", "container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "container_name"=>"/kafka1", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
2022-08-01 04:13:48 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:13:38,410, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:13:38,410' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:13:48.748578645 +0000 record={"container_name"=>"/kafka1", "source"=>"stdout", "log"=>"[2022-08-01 04:13:38,410] INFO Deleted log /bitnami/kafka/data/fluentd-0/00000000000812370806.log.deleted. (kafka.log.LogSegment)", "container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
2022-08-01 04:13:48 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:13:38,411, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:13:38,411' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:13:48.748638338 +0000 record={"container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "container_name"=>"/kafka1", "source"=>"stdout", "log"=>"[2022-08-01 04:13:38,411] INFO Deleted offset index /bitnami/kafka/data/fluentd-0/00000000000812370806.index.deleted. (kafka.log.LogSegment)", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
2022-08-01 04:13:48 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::Parser::ParserError error="invalid time format: value = 2022-08-01 04:13:38,415, error_class = ArgumentError, error = invalid date or strptime format - `2022-08-01 04:13:38,415' `%Y-%m-%d %H:%M:%S,%3N'" location="/opt/bitnami/fluentd/gems/fluentd-1.15.0/lib/fluent/plugin/parser.rb:195:in `rescue in parse_time'" tag="docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest" time=2022-08-01 04:13:48.748698341 +0000 record={"container_id"=>"67a65cdefe697634d3a4d45ed202910e84a1b07907925f1eb7052d3bf131df5f", "container_name"=>"/kafka1", "source"=>"stdout", "log"=>"[2022-08-01 04:13:38,415] INFO Deleted time index /bitnami/kafka/data/fluentd-0/00000000000812370806.timeindex.deleted. (kafka.log.LogSegment)", "hostname"=>"Kafka1.homelab.bjzy.me", "fluentd_tag"=>"docker.kafka1.67a65cdefe69.harbor.homelab.bjzy.me/dh-cache/bitnami/kafka:latest"}
Describe the solution you'd like
I would like sub-seconds formatted with commas to be parsed with %s,%L as is the current implementation of fluent-bit.
See: Fluent-bit code
Describe alternatives you've considered
I have not tried this, but is there a workaround I could do to first rewrite the comma to a dot? I imagine something like this could be leveraged to pre-process somehow but would be very inefficient.
I've just started to investigate customizing the Kafka/Zookeeper logs as a workaround. However, given that they just recently moved from Log4j to Logback I've had some challenge finding the right info.
Additional context
I believe this timestamp format is formally known as ISO8601DateFormat.
We use https://github.com/nurse/strptime as the fast time format translation. Could you raise your issue there?
This issue has been automatically marked as stale because it has been open 30 days with no activity. Remove stale label or comment or this issue will be closed in 7 days
This issue was automatically closed because of stale in 7 days