fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

out_forward plugin starts sending invalid packets

Open ChristThePhone opened this issue 1 month ago • 22 comments

Describe the bug

fluentd 1.18

I'm using the out_forward plugin to let it forward incoming logs to my proprietary log server. Every once in a while it starts sending invalid messages consisting of 7 bytes : 93 a5 7a 2e 6c 6f 67

Once fluentd enters this state is keeps sending this message over and over again, incoming logs get buffered in files until they reach their max size and new logs are no longer accepted and therefore lost.

I enabled trace level logging for the out_forward plugin

To Reproduce

I cannot reproduce at will

Expected behavior

fluentd out_forward plugin should not send invalid packets and if it does it should not do so over and over again

Your Environment

- Fluentd version: 1.18
- Package version:
- Operating system: RHEL 9 Linux
- Kernel version:

Your Configuration

<match z.log>

     @type forward
     @log_level trace

     <server>
        name <myServerName>
        host 127.0.0.1
        port 24225
        ignore_network_errors_at_startup true
        verify_connection_at_startup false
     </server>

     <buffer>
        @type file
        path /<path to buffers>/zlog.*.buf
        flush_mode interval
        flush_interval 1s
        overflow_action throw_exception
        total_limit_size 1GB
        queued_chunks_limit_size 1024
        compress gzip
        retry_forever true
        retry_type periodic
        retry_wait 1s
     </buffer>

</match>

Your Error Log

2025-11-24 15:09:40 +0100 [trace]: dequeueing a chunk instance=2260
2025-11-24 15:09:40 +0100 [trace]: chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: trying flush for a chunk chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [trace]: adding write count instance=2240
2025-11-24 15:09:40 +0100 [trace]: executing sync write chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [debug]: connect new socket
2025-11-24 15:09:40 +0100 [trace]: write operation done, committing chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [trace]: committing write operation to a chunk chunk="64457b6c5c3bd14110513c43a4d156cd" delayed=false
2025-11-24 15:09:40 +0100 [trace]: purging a chunk instance=2260 chunk_id="64457b6c5c3bd14110513c43a4d156cd" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: chunk purged instance=2260 chunk_id="64457b6c5c3bd14110513c43a4d156cd" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: done to commit a chunk chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40.299441051 +0100 fluent.trace: {"instance":2260,"message":"dequeueing a chunk instance=2260"}
2025-11-24 15:09:40.299556774 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.299602102 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"trying flush for a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.299639559 +0100 fluent.trace: {"instance":2240,"message":"adding write count instance=2240"}
2025-11-24 15:09:40.299662757 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"executing sync write chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.299701023 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-24 15:09:40.302766920 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"write operation done, committing chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.302835320 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","delayed":false,"message":"committing write operation to a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\" delayed=false"}
2025-11-24 15:09:40.302873690 +0100 fluent.trace: {"instance":2260,"chunk_id":"64457b6c5c3bd14110513c43a4d156cd","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"purging a chunk instance=2260 chunk_id=\"64457b6c5c3bd14110513c43a4d156cd\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.302983926 +0100 fluent.trace: {"instance":2260,"chunk_id":"64457b6c5c3bd14110513c43a4d156cd","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk purged instance=2260 chunk_id=\"64457b6c5c3bd14110513c43a4d156cd\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.303023355 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"done to commit a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:40 +0100 [debug]: Created new chunk chunk_id="64457b6d67cb7d7c1d004aa1758ad9b9" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 159
2025-11-24 15:09:40.617262760 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:40.617614515 +0100 fluent.debug: {"chunk_id":"64457b6d67cb7d7c1d004aa1758ad9b9","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"Created new chunk chunk_id=\"64457b6d67cb7d7c1d004aa1758ad9b9\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.617796147 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 159"}
2025-11-24 15:09:40 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:40 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 318
2025-11-24 15:09:40.618903549 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:40.619042316 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 318"}
2025-11-24 15:09:40 +0100 [trace]: sending heartbeat host="127.0.0.1" port=24225 heartbeat_type=:transport
2025-11-24 15:09:40 +0100 [debug]: connect new socket
2025-11-24 15:09:40.983441674 +0100 fluent.trace: {"host":"127.0.0.1","port":24225,"heartbeat_type":"transport","message":"sending heartbeat host=\"127.0.0.1\" port=24225 heartbeat_type=:transport"}
2025-11-24 15:09:40.983567212 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 477
2025-11-24 15:09:41.122298570 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41.122606090 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 477"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 636
2025-11-24 15:09:41.129487541 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41.129718161 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 636"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.132771965 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 7395 new_size: 8031
2025-11-24 15:09:41.133715266 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 7395 new_size: 8031"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.182099288 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8190
2025-11-24 15:09:41.182447693 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8190"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.184271628 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8349
2025-11-24 15:09:41.184573445 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8349"}
2025-11-24 15:09:41 +0100 [trace]: enqueueing all chunks in buffer instance=2260
2025-11-24 15:09:41 +0100 [trace]: enqueueing chunk instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:41.224905148 +0100 fluent.trace: {"instance":2260,"message":"enqueueing all chunks in buffer instance=2260"}
2025-11-24 15:09:41.225123287 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"enqueueing chunk instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}

Additional context

No response

ChristThePhone avatar Nov 24 '25 14:11 ChristThePhone

Hmmm

Looking at the byte sequence 93 a5 7a 2e 6c 6f 67 as MessagePack, it looks like the data is cut off after ["z.log" (a 3-element array header + the string "z.log"). It seems the packet is being sent partially or is truncated.

Watson1978 avatar Nov 25 '25 02:11 Watson1978

Could you please clarify the nature of your 'proprietary log server'?

Is it another Fluentd instance running the in_forward plugin, or is it a custom-developed application?

Watson1978 avatar Nov 25 '25 03:11 Watson1978

It is a custom developed socket server written in C++ to service the forward protocol

ChristThePhone avatar Nov 25 '25 06:11 ChristThePhone

Thank you for clarifying that your log server is a custom application implemented in C++.

out_forward uses IO.copy_stream to flush a log chunk into socket as stream, the TCP protocol does not guarantee message boundaries.

The 7-byte sequence you reported (93 a5 7a 2e 6c 6f 67) looks like a valid fragment of a MessagePack message. What data are you receiving after this? I think it may receive the remaining data during the next time...

If your C++ server attempts to decode a MessagePack object immediately upon receiving data, without implementing a proper stream parser or buffering layer, it would treat this fragment as a fatal error (e.g., "invalid message").

in_forward will decode the data with buffering in: https://github.com/fluent/fluentd/blob/b4dc14dcdf9169a0224955252b468a365e5fce87/lib/fluent/plugin/in_forward.rb#L262-L265

Your C++ server should also require similar decoding processing.

Watson1978 avatar Nov 25 '25 08:11 Watson1978

Thanks for the link to the in_forward plugin implementation, I will make sure my code works in a similar way, it currently reads data from the socket until the fluentd side closes it and then decodes the received data via msgpack. Unfortunately the out_forward plugin keeps repeating this invalid 7-byte sequence (opening the connection, sending these 7 bytes, then closing the socket) over and over again and the only way out is to stop fluentd, delete all buffered events, and restart fluentd.

ChristThePhone avatar Nov 25 '25 08:11 ChristThePhone

If possible, can you capture and attach a series of TCP packets? Then, we may find the way to reproduce your issue.

Watson1978 avatar Nov 25 '25 08:11 Watson1978

If possible, can you capture and attach a series of TCP packets? Then, we may find the way to reproduce your issue.

Unfortunately not, I cannot have customer systems run a tcp trace and I only get to know after the facts.

ChristThePhone avatar Nov 25 '25 08:11 ChristThePhone

out_forward sends the file contents as-is. If the buffer file handled by Fluentd becomes corrupted due to system failure (etc.), corrupted data will be sent.

in_forward performs a simple check to ensure the data is not corrupted. https://github.com/fluent/fluentd/blob/b4dc14dcdf9169a0224955252b468a365e5fce87/lib/fluent/plugin/in_forward.rb#L367-L381

Watson1978 avatar Nov 25 '25 08:11 Watson1978

Then the out_forward buffering code should check the validity of the files ? I have no indication of a problem writing files so I would not exclude the possibility that out_forward is already generating these invalid files.

ChristThePhone avatar Nov 25 '25 08:11 ChristThePhone

I will try to get my hands on a system in the failing state to see what the buffer files look like. If your theory of corrupted files is correct then grabbing these files and putting them on a test system should reproduce the issue, correct ?

ChristThePhone avatar Nov 25 '25 09:11 ChristThePhone

Looking at the out_forward code you provided a link for it seems that it would generate this invalid request if it failed to write the chunk to the socket - for whatever reason. I don't see any exception/error handling there so this is just guessing. But if it failed to send the full message then it would not send the chunk ID in the options part and my server end could not send a corresponding ack which would then cause the send_data method to retry the chunk.

ChristThePhone avatar Nov 25 '25 09:11 ChristThePhone

hmm. If the file is corrupted, out_forward should log something... Looks for me the log you attached does not appear to show any such things...

Watson1978 avatar Nov 25 '25 09:11 Watson1978

I will try to get my hands on a system in the failing state to see what the buffer files look like. If your theory of corrupted files is correct then grabbing these files and putting them on a test system should reproduce the issue, correct ?

Yes. If so, you can reproduce the issue.

Watson1978 avatar Nov 25 '25 09:11 Watson1978

Was able to grab the buffer files from a failing system, put them onto a test system, and was able to reproduce the error. This is the fluentd log :

2025-11-25 12:52:38 +0100 [debug]: connect new socket
2025-11-25 12:52:38.948055588 +0100 fluent.trace: {"host":"127.0.0.1","port":24225,"heartbeat_type":"transport","message":"sending heartbeat host=\"127.0.0.1\" port=24225 heartbeat_type=:transport"}
2025-11-25 12:52:38.948514575 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-25 12:52:38 +0100 [trace]: dequeueing a chunk instance=2260
2025-11-25 12:52:38 +0100 [trace]: chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-25 12:52:38 +0100 [trace]: trying flush for a chunk chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:38.971575502 +0100 fluent.trace: {"instance":2260,"message":"dequeueing a chunk instance=2260"}
2025-11-25 12:52:38 +0100 [trace]: adding write count instance=2240
2025-11-25 12:52:38.971829524 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk dequeued instance=2260 metadata=#<struct Fl
uent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-25 12:52:38 +0100 [trace]: executing sync write chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:38.972038156 +0100 fluent.trace: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"trying flush for a chunk chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:38 +0100 [debug]: connect new socket
2025-11-25 12:52:38.972203570 +0100 fluent.trace: {"instance":2240,"message":"adding write count instance=2240"}
2025-11-25 12:52:38.972692958 +0100 fluent.trace: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"executing sync write chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:38.972987631 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-25 12:52:39 +0100 [debug]: taking back chunk for errors. chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:39 +0100 [trace]: taking back a chunk instance=2260 chunk_id="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:39 +0100 [trace]: chunk taken back instance=2260 chunk_id="643049d6a64142b606da8f72cf9826f5" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-25 12:52:39.218012431 +0100 fluent.debug: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"taking back chunk for errors. chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:39.218586242 +0100 fluent.trace: {"instance":2260,"chunk_id":"643049d6a64142b606da8f72cf9826f5","message":"taking back a chunk instance=2260 chunk_id=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:39.218763443 +0100 fluent.trace: {"instance":2260,"chunk_id":"643049d6a64142b606da8f72cf9826f5","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chu
nk taken back instance=2260 chunk_id=\"643049d6a64142b606da8f72cf9826f5\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-25 12:52:39 +0100 [warn]: failed to flush the buffer. retry_times=9 next_retry_time=2025-11-25 12:52:40 +0100 chunk="643049d6a64142b606da8f72cf9826f5" error_class=Zlib::GzipFile::Error error="not in gzip format"
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `initialize'
2025-11-25 12:52:39.219192598 +0100 fluent.warn: {"retry_times":9,"next_retry_time":"2025-11-25 12:52:40 +0100","chunk":"643049d6a64142b606da8f72cf9826f5","error":"#<Zlib::GzipFile::Error: not in gzip format, input=\"\\x00\\x00\\x00
\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\
x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x0
0\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\">","message":"failed to flush the buffer. retry_times=9 next_retry_time=2025-11-25 12:52:40 +0100 chunk=\"643049d6a64142b606da8f72cf9826f5\" error_class=Zlib::GzipFile::Error error=\"not in gzip format\""}
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `new'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `block in io_decompress'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:78:in `loop'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:78:in `io_decompress'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:38:in `decompress'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/chunk.rb:212:in `block in open'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/file_chunk.rb:171:in `open'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/chunk.rb:205:in `open'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:664:in `send_data_actual'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:678:in `block in send_data'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/connection_manager.rb:54:in `connect'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:807:in `connect'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:676:in `send_data'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:365:in `block in write'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:46:in `block in select_healthy_node'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `times'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `select_healthy_node'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/service_discovery/manager.rb:108:in `select_service'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/service_discovery.rb:82:in `service_discovery_select_service'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:365:in `write'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:1225:in `try_flush'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:353:in `try_flush'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:1540:in `flush_thread_run'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
  2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2025-11-25 12:52:39 +0100 [trace]: sending heartbeat host="127.0.0.1" port=24225 heartbeat_type=:transport
2025-11-25 12:52:39 +0100 [debug]: connect new socket

ChristThePhone avatar Nov 25 '25 11:11 ChristThePhone

I was able to identify a buffer file causing the issue.

ChristThePhone avatar Nov 25 '25 12:11 ChristThePhone

Thanks. It is interesting. I will look and investigate it

Watson1978 avatar Nov 25 '25 12:11 Watson1978

Successfully ran gzip -d -c on the bad buffer file and fed the output into msgpack2json without problems. The result is a valid log event:

[
    1762536960,
    {
        "header": {
            "version": 1048576,
            "class": 29,
            "type": 6,
            "timestamp": 1762536960188579,
            "severity": "event",
            "name": "SW_EVENT",
            "desc": "[CMD: 0x9D MOD: 0x01] PID: 3831413",
            "components": [
                "swrd"
            ]
        }
    }
]

The buffer file contains some extraneous zero data, though. That might be the reason why the decompression in ruby fails.

ChristThePhone avatar Nov 25 '25 15:11 ChristThePhone

the system has ruby 3.0.7p220 and the zlib (default: 2.0.0) gem installed. Wrote a little test using the installed zlib version and had no issues decompressing the file :

require 'zlib'
require 'msgpack'

File.open('zlog.q643049d6a64142b606da8f72cf9826f5.buf') do |f|
  gz = Zlib::GzipReader.new(f)
  print MessagePack.unpack(gz.read)
  gz.close
end

the output generated is

[1762536960, {"header"=>{"version"=>1048576, "class"=>29, "type"=>6, "timestamp"=>1762536960188579, "severity"=>"event", "name"=>"SW_EVENT", "desc"=>"[CMD: 0x9D MOD: 0x01] PID: 3831413", "components"=>["swrd"]}}]

ChristThePhone avatar Nov 25 '25 15:11 ChristThePhone

seems to be insufficient validation that a gzip-compressed buffer file is corrupted. If you can remove the compress gzip configuration, it might make stable... maybe.

Watson1978 avatar Nov 26 '25 01:11 Watson1978

seems to be insufficient validation that a gzip-compressed buffer file is corrupted. If you can remove the compress gzip configuration, it might make stable... maybe.

Will disable the buffer compression until this has been sorted out. One question : what happens when fluentd is stopped/restarted to disable the buffer compression but there are still compressed buffer files existent ? Will the restart cause failures or will the file buffer compression be detected and handled correspondingly ?

ChristThePhone avatar Nov 27 '25 08:11 ChristThePhone

If gzip compression is stopped halfway, the file will be sent without being decompressed.

  1. First, add the configuration flush_at_shutdown true to <buffer> section to flush the buffer when fluentd finish.
  2. Restart Fluentd to apply above configuration.
  3. Then, remove compress gzip and restart Fluentd to apply this configuration.

I think you shouldn't encounter any issues.

Watson1978 avatar Nov 28 '25 06:11 Watson1978