out_forward plugin starts sending invalid packets
Describe the bug
fluentd 1.18
I'm using the out_forward plugin to let it forward incoming logs to my proprietary log server. Every once in a while it starts sending invalid messages consisting of 7 bytes : 93 a5 7a 2e 6c 6f 67
Once fluentd enters this state is keeps sending this message over and over again, incoming logs get buffered in files until they reach their max size and new logs are no longer accepted and therefore lost.
I enabled trace level logging for the out_forward plugin
To Reproduce
I cannot reproduce at will
Expected behavior
fluentd out_forward plugin should not send invalid packets and if it does it should not do so over and over again
Your Environment
- Fluentd version: 1.18
- Package version:
- Operating system: RHEL 9 Linux
- Kernel version:
Your Configuration
<match z.log>
@type forward
@log_level trace
<server>
name <myServerName>
host 127.0.0.1
port 24225
ignore_network_errors_at_startup true
verify_connection_at_startup false
</server>
<buffer>
@type file
path /<path to buffers>/zlog.*.buf
flush_mode interval
flush_interval 1s
overflow_action throw_exception
total_limit_size 1GB
queued_chunks_limit_size 1024
compress gzip
retry_forever true
retry_type periodic
retry_wait 1s
</buffer>
</match>
Your Error Log
2025-11-24 15:09:40 +0100 [trace]: dequeueing a chunk instance=2260
2025-11-24 15:09:40 +0100 [trace]: chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: trying flush for a chunk chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [trace]: adding write count instance=2240
2025-11-24 15:09:40 +0100 [trace]: executing sync write chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [debug]: connect new socket
2025-11-24 15:09:40 +0100 [trace]: write operation done, committing chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40 +0100 [trace]: committing write operation to a chunk chunk="64457b6c5c3bd14110513c43a4d156cd" delayed=false
2025-11-24 15:09:40 +0100 [trace]: purging a chunk instance=2260 chunk_id="64457b6c5c3bd14110513c43a4d156cd" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: chunk purged instance=2260 chunk_id="64457b6c5c3bd14110513c43a4d156cd" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: done to commit a chunk chunk="64457b6c5c3bd14110513c43a4d156cd"
2025-11-24 15:09:40.299441051 +0100 fluent.trace: {"instance":2260,"message":"dequeueing a chunk instance=2260"}
2025-11-24 15:09:40.299556774 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.299602102 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"trying flush for a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.299639559 +0100 fluent.trace: {"instance":2240,"message":"adding write count instance=2240"}
2025-11-24 15:09:40.299662757 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"executing sync write chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.299701023 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-24 15:09:40.302766920 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"write operation done, committing chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40.302835320 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","delayed":false,"message":"committing write operation to a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\" delayed=false"}
2025-11-24 15:09:40.302873690 +0100 fluent.trace: {"instance":2260,"chunk_id":"64457b6c5c3bd14110513c43a4d156cd","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"purging a chunk instance=2260 chunk_id=\"64457b6c5c3bd14110513c43a4d156cd\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.302983926 +0100 fluent.trace: {"instance":2260,"chunk_id":"64457b6c5c3bd14110513c43a4d156cd","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk purged instance=2260 chunk_id=\"64457b6c5c3bd14110513c43a4d156cd\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.303023355 +0100 fluent.trace: {"chunk":"64457b6c5c3bd14110513c43a4d156cd","message":"done to commit a chunk chunk=\"64457b6c5c3bd14110513c43a4d156cd\""}
2025-11-24 15:09:40 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:40 +0100 [debug]: Created new chunk chunk_id="64457b6d67cb7d7c1d004aa1758ad9b9" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:40 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 159
2025-11-24 15:09:40.617262760 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:40.617614515 +0100 fluent.debug: {"chunk_id":"64457b6d67cb7d7c1d004aa1758ad9b9","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"Created new chunk chunk_id=\"64457b6d67cb7d7c1d004aa1758ad9b9\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-24 15:09:40.617796147 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 159"}
2025-11-24 15:09:40 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:40 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 318
2025-11-24 15:09:40.618903549 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:40.619042316 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 318"}
2025-11-24 15:09:40 +0100 [trace]: sending heartbeat host="127.0.0.1" port=24225 heartbeat_type=:transport
2025-11-24 15:09:40 +0100 [debug]: connect new socket
2025-11-24 15:09:40.983441674 +0100 fluent.trace: {"host":"127.0.0.1","port":24225,"heartbeat_type":"transport","message":"sending heartbeat host=\"127.0.0.1\" port=24225 heartbeat_type=:transport"}
2025-11-24 15:09:40.983567212 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 477
2025-11-24 15:09:41.122298570 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41.122606090 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 477"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 636
2025-11-24 15:09:41.129487541 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41.129718161 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 636"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.132771965 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 7395 new_size: 8031
2025-11-24 15:09:41.133715266 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 7395 new_size: 8031"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.182099288 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8190
2025-11-24 15:09:41.182447693 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8190"}
2025-11-24 15:09:41 +0100 [trace]: writing events into buffer instance=2260 metadata_size=1
2025-11-24 15:09:41.184271628 +0100 fluent.trace: {"instance":2260,"metadata_size":1,"message":"writing events into buffer instance=2260 metadata_size=1"}
2025-11-24 15:09:41 +0100 [trace]: chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8349
2025-11-24 15:09:41.184573445 +0100 fluent.trace: {"message":"chunk /ffdc/fluent/buffers/zlog.b64457b6d67cb7d7c1d004aa1758ad9b9.buf size_added: 159 new_size: 8349"}
2025-11-24 15:09:41 +0100 [trace]: enqueueing all chunks in buffer instance=2260
2025-11-24 15:09:41 +0100 [trace]: enqueueing chunk instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-24 15:09:41.224905148 +0100 fluent.trace: {"instance":2260,"message":"enqueueing all chunks in buffer instance=2260"}
2025-11-24 15:09:41.225123287 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"enqueueing chunk instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
Additional context
No response
Hmmm
Looking at the byte sequence 93 a5 7a 2e 6c 6f 67 as MessagePack, it looks like the data is cut off after ["z.log" (a 3-element array header + the string "z.log"). It seems the packet is being sent partially or is truncated.
Could you please clarify the nature of your 'proprietary log server'?
Is it another Fluentd instance running the in_forward plugin, or is it a custom-developed application?
It is a custom developed socket server written in C++ to service the forward protocol
Thank you for clarifying that your log server is a custom application implemented in C++.
out_forward uses IO.copy_stream to flush a log chunk into socket as stream,
the TCP protocol does not guarantee message boundaries.
The 7-byte sequence you reported (93 a5 7a 2e 6c 6f 67) looks like a valid fragment of a MessagePack message.
What data are you receiving after this?
I think it may receive the remaining data during the next time...
If your C++ server attempts to decode a MessagePack object immediately upon receiving data, without implementing a proper stream parser or buffering layer, it would treat this fragment as a fatal error (e.g., "invalid message").
in_forward will decode the data with buffering in:
https://github.com/fluent/fluentd/blob/b4dc14dcdf9169a0224955252b468a365e5fce87/lib/fluent/plugin/in_forward.rb#L262-L265
Your C++ server should also require similar decoding processing.
Thanks for the link to the in_forward plugin implementation, I will make sure my code works in a similar way, it currently reads data from the socket until the fluentd side closes it and then decodes the received data via msgpack. Unfortunately the out_forward plugin keeps repeating this invalid 7-byte sequence (opening the connection, sending these 7 bytes, then closing the socket) over and over again and the only way out is to stop fluentd, delete all buffered events, and restart fluentd.
If possible, can you capture and attach a series of TCP packets? Then, we may find the way to reproduce your issue.
If possible, can you capture and attach a series of TCP packets? Then, we may find the way to reproduce your issue.
Unfortunately not, I cannot have customer systems run a tcp trace and I only get to know after the facts.
out_forward sends the file contents as-is.
If the buffer file handled by Fluentd becomes corrupted due to system failure (etc.), corrupted data will be sent.
in_forward performs a simple check to ensure the data is not corrupted.
https://github.com/fluent/fluentd/blob/b4dc14dcdf9169a0224955252b468a365e5fce87/lib/fluent/plugin/in_forward.rb#L367-L381
Then the out_forward buffering code should check the validity of the files ? I have no indication of a problem writing files so I would not exclude the possibility that out_forward is already generating these invalid files.
I will try to get my hands on a system in the failing state to see what the buffer files look like. If your theory of corrupted files is correct then grabbing these files and putting them on a test system should reproduce the issue, correct ?
Looking at the out_forward code you provided a link for it seems that it would generate this invalid request if it failed to write the chunk to the socket - for whatever reason. I don't see any exception/error handling there so this is just guessing. But if it failed to send the full message then it would not send the chunk ID in the options part and my server end could not send a corresponding ack which would then cause the send_data method to retry the chunk.
hmm.
If the file is corrupted, out_forward should log something...
Looks for me the log you attached does not appear to show any such things...
I will try to get my hands on a system in the failing state to see what the buffer files look like. If your theory of corrupted files is correct then grabbing these files and putting them on a test system should reproduce the issue, correct ?
Yes. If so, you can reproduce the issue.
Was able to grab the buffer files from a failing system, put them onto a test system, and was able to reproduce the error. This is the fluentd log :
2025-11-25 12:52:38 +0100 [debug]: connect new socket
2025-11-25 12:52:38.948055588 +0100 fluent.trace: {"host":"127.0.0.1","port":24225,"heartbeat_type":"transport","message":"sending heartbeat host=\"127.0.0.1\" port=24225 heartbeat_type=:transport"}
2025-11-25 12:52:38.948514575 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-25 12:52:38 +0100 [trace]: dequeueing a chunk instance=2260
2025-11-25 12:52:38 +0100 [trace]: chunk dequeued instance=2260 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-25 12:52:38 +0100 [trace]: trying flush for a chunk chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:38.971575502 +0100 fluent.trace: {"instance":2260,"message":"dequeueing a chunk instance=2260"}
2025-11-25 12:52:38 +0100 [trace]: adding write count instance=2240
2025-11-25 12:52:38.971829524 +0100 fluent.trace: {"instance":2260,"metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chunk dequeued instance=2260 metadata=#<struct Fl
uent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-25 12:52:38 +0100 [trace]: executing sync write chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:38.972038156 +0100 fluent.trace: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"trying flush for a chunk chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:38 +0100 [debug]: connect new socket
2025-11-25 12:52:38.972203570 +0100 fluent.trace: {"instance":2240,"message":"adding write count instance=2240"}
2025-11-25 12:52:38.972692958 +0100 fluent.trace: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"executing sync write chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:38.972987631 +0100 fluent.debug: {"message":"connect new socket"}
2025-11-25 12:52:39 +0100 [debug]: taking back chunk for errors. chunk="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:39 +0100 [trace]: taking back a chunk instance=2260 chunk_id="643049d6a64142b606da8f72cf9826f5"
2025-11-25 12:52:39 +0100 [trace]: chunk taken back instance=2260 chunk_id="643049d6a64142b606da8f72cf9826f5" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag="z.log", variables=nil, seq=0>
2025-11-25 12:52:39.218012431 +0100 fluent.debug: {"chunk":"643049d6a64142b606da8f72cf9826f5","message":"taking back chunk for errors. chunk=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:39.218586242 +0100 fluent.trace: {"instance":2260,"chunk_id":"643049d6a64142b606da8f72cf9826f5","message":"taking back a chunk instance=2260 chunk_id=\"643049d6a64142b606da8f72cf9826f5\""}
2025-11-25 12:52:39.218763443 +0100 fluent.trace: {"instance":2260,"chunk_id":"643049d6a64142b606da8f72cf9826f5","metadata":"#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>","message":"chu
nk taken back instance=2260 chunk_id=\"643049d6a64142b606da8f72cf9826f5\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=\"z.log\", variables=nil, seq=0>"}
2025-11-25 12:52:39 +0100 [warn]: failed to flush the buffer. retry_times=9 next_retry_time=2025-11-25 12:52:40 +0100 chunk="643049d6a64142b606da8f72cf9826f5" error_class=Zlib::GzipFile::Error error="not in gzip format"
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `initialize'
2025-11-25 12:52:39.219192598 +0100 fluent.warn: {"retry_times":9,"next_retry_time":"2025-11-25 12:52:40 +0100","chunk":"643049d6a64142b606da8f72cf9826f5","error":"#<Zlib::GzipFile::Error: not in gzip format, input=\"\\x00\\x00\\x00
\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\
x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x0
0\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\">","message":"failed to flush the buffer. retry_times=9 next_retry_time=2025-11-25 12:52:40 +0100 chunk=\"643049d6a64142b606da8f72cf9826f5\" error_class=Zlib::GzipFile::Error error=\"not in gzip format\""}
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `new'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:79:in `block in io_decompress'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:78:in `loop'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:78:in `io_decompress'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/compressable.rb:38:in `decompress'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/chunk.rb:212:in `block in open'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/file_chunk.rb:171:in `open'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/buffer/chunk.rb:205:in `open'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:664:in `send_data_actual'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:678:in `block in send_data'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/connection_manager.rb:54:in `connect'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:807:in `connect'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:676:in `send_data'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:365:in `block in write'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:46:in `block in select_healthy_node'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `times'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward/load_balancer.rb:37:in `select_healthy_node'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/service_discovery/manager.rb:108:in `select_service'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/service_discovery.rb:82:in `service_discovery_select_service'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:365:in `write'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:1225:in `try_flush'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/out_forward.rb:353:in `try_flush'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:1540:in `flush_thread_run'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2025-11-25 12:52:39 +0100 [warn]: /console/ruby/gems/fluentd-1.18.0/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2025-11-25 12:52:39 +0100 [trace]: sending heartbeat host="127.0.0.1" port=24225 heartbeat_type=:transport
2025-11-25 12:52:39 +0100 [debug]: connect new socket
I was able to identify a buffer file causing the issue.
Thanks. It is interesting. I will look and investigate it
Successfully ran gzip -d -c on the bad buffer file and fed the output into msgpack2json without problems. The result is a valid log event:
[
1762536960,
{
"header": {
"version": 1048576,
"class": 29,
"type": 6,
"timestamp": 1762536960188579,
"severity": "event",
"name": "SW_EVENT",
"desc": "[CMD: 0x9D MOD: 0x01] PID: 3831413",
"components": [
"swrd"
]
}
}
]
The buffer file contains some extraneous zero data, though. That might be the reason why the decompression in ruby fails.
the system has ruby 3.0.7p220 and the zlib (default: 2.0.0) gem installed. Wrote a little test using the installed zlib version and had no issues decompressing the file :
require 'zlib'
require 'msgpack'
File.open('zlog.q643049d6a64142b606da8f72cf9826f5.buf') do |f|
gz = Zlib::GzipReader.new(f)
print MessagePack.unpack(gz.read)
gz.close
end
the output generated is
[1762536960, {"header"=>{"version"=>1048576, "class"=>29, "type"=>6, "timestamp"=>1762536960188579, "severity"=>"event", "name"=>"SW_EVENT", "desc"=>"[CMD: 0x9D MOD: 0x01] PID: 3831413", "components"=>["swrd"]}}]
seems to be insufficient validation that a gzip-compressed buffer file is corrupted.
If you can remove the compress gzip configuration, it might make stable... maybe.
seems to be insufficient validation that a gzip-compressed buffer file is corrupted. If you can remove the
compress gzipconfiguration, it might make stable... maybe.
Will disable the buffer compression until this has been sorted out. One question : what happens when fluentd is stopped/restarted to disable the buffer compression but there are still compressed buffer files existent ? Will the restart cause failures or will the file buffer compression be detected and handled correspondingly ?
If gzip compression is stopped halfway, the file will be sent without being decompressed.
- First, add the configuration
flush_at_shutdown trueto<buffer>section to flush the buffer when fluentd finish. - Restart Fluentd to apply above configuration.
- Then, remove
compress gzipand restart Fluentd to apply this configuration.
I think you shouldn't encounter any issues.