fluentd
fluentd copied to clipboard
out_forward plugin sends broken chunks
Describe the bug
After upgrading td-agent from <td-agent 4.3.0 fluentd 1.14.3> to <td-agent 4.3.1 fluentd 1.14.6> we changed a custom filter plugin which dropped big lines up to 512K to native in_tail max_line_size property and incresed max_line_size to 1MB. We also changed some buffer parameters that we saw that improved shipping performace. Sample configurations are shown later.
Eventually we started seing exceptions in several machines from our custom fluentd receiver ( developed in c# ). Those exceptions shown td-agent was sending invalid chunks, concretelly the msgpack library complained that "Cannot convert 'array' header from type 'PositiveFixNum'(0x20) in offset 1,156,659". After getting samples of failing chunks we could check that, in fact, after unpacking the main chunk array (tag+msg array+options), it started correctly unpacking messages until it failed at some point. Checking last correctly unpacked message we could see that the unpacked logLine was cut ( checking against original log ) and that where it was supposed to start a new msgpack message ( starting by a MapHeader ) there was a plain string which was part ( not even the begginig ) of a logLine which appeared several logLines after in the original log file.
After checking some related fluentd issues like #660 , #1743, #415 we decided to reduce fluentd max size to 512000 and it seems it may fix the problem. Although by now we have only tested this on development. We will soon check in production and see how it goes. However we would like to come back to allow 1Mb line max size. Is there a way we can achieve this ( maybe reducing the number of threads or upgrading any fluentd library )
To Reproduce
A lot of traffic with a lot of bigLines and the given configuration for the producer: produce 15 Million messages per hour of about 5 GB in total size A in_forward in a different server should output msgpack errors
Expected behavior
Chunks should be in correct msgpack format
Your Environment
- Fluentd version: 1.14.6
- TD Agent version: 4.3.1
- Operating system: RHEL 7.9
- Kernel version: 3.10.0-1160.62.1.el7.x86_64
Your Configuration
old:
<source>
@type tail
pos_file /var/log/td-agent/xxx.pos
tag xxx
path "/ha/logs/xxx/debug.log"
@label xxx
rotate_wait 10s
format multiline
format_firstline /^\[2\d\d\d-[^\]]+\]/
format1 /^\[(?<logtime>[^\]]+)\] \[(?<thread>.*?)\] \[(?<level>[^\]]*)\] \[MEM=(?<memory>[^\]]*)\] (?<message>.*)$/
read_lines_limit 500
</source>
<label xxx>
<filter **>
@type drop_by_size
max_event_size 512000
</filter>
<match **>
@id xxx
@type forward
buffer_type memory
buffer_chunk_limit 128MB
buffer_queue_limit 8
buffer_queue_full_action block
flush_interval 5s
retry_limit 5
num_threads 2
keepalive true
<server>
host "host1"
port 24225
weight 60
</server>
<server>
host "host2"
port 24225
weight 60
</server>
</match>
</label>
new:
<source>
@id tail_xxx
@type tail
pos_file /var/log/td-agent/xxx.pos
tag xxx
path "/ha/logs/xxx/debug.log"
@label xxx
rotate_wait 10s
format multiline
format_firstline /^\[2\d\d\d-[^\]]+\]/
format1 /^\[(?<logtime>[^\]]+)\] \[(?<thread>.*?)\] \[(?<level>[^\]]*)\] \[MEM=(?<memory>[^\]]*)\] (?<message>(?>.*))$/
read_lines_limit 500
max_line_size 1024000
</source>
<label xxx>
<match **>
@id forward_xxx
@type forward
keepalive true
expire_dns_cache 60
ignore_network_errors_at_startup true
<buffer>
chunk_limit_size 8MB
total_limit_size 256MB
overflow_action block
flush_interval immediate
flush_thread_count 4
</buffer>
<service_discovery>
@type file
path "/ha/fluentd/servers.yml"
</service_discovery>
</match>
</label>
Your Error Log
Cannot convert 'array' header from type 'PositiveFixNum'(0x20) in offset 1,156,659
Additional context
No response