Stage buffer sometimes sticks around and doesn't ever get queued
Describe the bug
I've been trying to track down what looks like a memory leak for the last week where a stage buffer doesn't get cleared out even though new data arrives. In my latest attempt to isolate the problem, I noticed a jump to 8 MB in the fluentd_output_status_buffer_stage_byte_size Prometheus metric, which measures the total bytes of the stage queue:
This jump appears to persist indefinitely until I restart fluentd.
To Reproduce
I'm still working on this.
Expected behavior
No memory growth over time.
Your Environment
- Fluentd version: v1.16.5
- Package version: 5.0.4-1
- Operating system: Ubuntu 20.04.6
- Kernel version: 5.15.0-1051-gcp
Your Configuration
I don't have a clear reproduction step yet. Our config looks something like this:
<source>
@type tail
tag postgres.postgres
path /var/log/postgresql/postgresql.log
pos_file /var/log/fluent/postgres.log.pos
format /(?<time>[^G]*) GMT \[(?<pg_id>\d+), (?<xid>\d+)\]: .* user=(?<pg_user>[^,]*),db=(?<pg_db>[^,]*),app=(?<pg_application>[^,]*),client=(?<pg_client>[^ ]*) (?<pg_message>.*)/
time_format %Y-%m-%d %H:%M:%S.%N
</source>
<filter postgres.postgres_csv>
@type postgresql_slowlog
</filter>
<filter postgres.postgres_csv>
@type postgresql_redactor
max_length 200000
</filter>
<match postgres.*>
@type copy
<store>
@type google_cloud
label_map {
"tag": "tag"
}
buffer_type file
buffer_path /opt/fluent/buffers/postgres/google_cloud
buffer_chunk_limit 8MB
buffer_queue_limit 1000
flush_interval 30s
log_level info
</store>
<store>
@type cloud_pubsub
topic pubsub-postgres-inf-gprd
project my-project
buffer_type file
buffer_path /opt/fluent/buffers/postgres/cloud_pubsub
buffer_chunk_limit 8MB
buffer_queue_limit 1000
flush_interval 30s
</store>
</match>
Your Error Log
The stuck 8MB buffer seems to have coincided with an EOF error:
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
2024-10-08 10:40:21 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21 +0000 [error]: #0 failed to purge buffer chunk chunk_id="623f4c358bd4b7cd7f63a4eb7410b459" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
2024-10-08 10:40:21 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2024-10-08 10:40:21 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-08 10:40:21.470273004 +0000 fluent.error: {"chunk_id":"623f4c358bd4b7cd7f63a4eb7410b459","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","message":"failed to purge buffer chunk chunk_id=\"623f4c358bd4b7cd7f63a4eb7410b459\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b623f4c358bd4b7cd7f63a4eb7410b459.log>","tag":"fluent.error","environment":"gprd","hostname":"example.com","fqdn":"example.com","stage":"main","shard":"backup","tier":"db","type":"patroni"}
Additional context
Note that previously when log messages were up to 3 MB, I would see more of these "step" jumps in memory usage. I've altered our filters to truncate the log messages to 200K, which seems to have stopped most of these stage buffer leaks. But I'm still wondering if there is a corner case here where the file buffer got cleared but the stage buffer did not.
I saw this error message again today:
2024-10-15 16:50:28 +0000 [error]: #0 unexpected error error="closed stream"
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `seek'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:170:in `open'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/event.rb:318:in `each'
2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/compat/output.rb:131:in `write'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1225:in `try_flush'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.302744238 +0000 fluent.error: {"error":"closed stream","message":"unexpected error error=\"closed stream\"","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}
2024-10-15 16:50:28 +0000 [error]: #0 failed to purge buffer chunk chunk_id="62486bfe620a478fb7b97c5b27980db8" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `unlink'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer/file_chunk.rb:161:in `purge'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:601:in `block in purge_chunk'
2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `synchronize'
2024-10-15 16:50:28 +0000 [error]: #0 /opt/fluent/lib/ruby/3.2.0/monitor.rb:202:in `mon_synchronize'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/buffer.rb:592:in `purge_chunk'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1110:in `commit_write'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1229:in `try_flush'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:1538:in `flush_thread_run'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin/output.rb:510:in `block (2 levels) in start'
2024-10-15 16:50:28 +0000 [error]: #0 /var/lib/fluent/vendor/bundle/ruby/3.2.0/gems/fluentd-1.16.5/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2024-10-15 16:50:28.791864146 +0000 fluent.error: {"chunk_id":"62486bfe620a478fb7b97c5b27980db8","error_class":"Errno::ENOENT","error":"#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","message":"failed to purge buffer chunk chunk_id=\"62486bfe620a478fb7b97c5b27980db8\" error_class=Errno::ENOENT error=#<Errno::ENOENT: No such file or directory @ apply2files - /opt/fluent/buffers/postgres/cloud_pubsub/buffer.b62486bfe620a478fb7b97c5b27980db8.log>","tag":"fluent.error","environment":"gprd","hostname":"patroni-main-v14-02-db-gprd","fqdn":"patroni-main-v14-02-db-gprd.c.gitlab-production.internal","stage":"main","shard":"backup","tier":"db","type":"patroni"}
In this example, I suspect:
- The file buffer stream closed for some reason, resulting in the
closed streamerror. Even if the file were deleted, I'm not sure why this would result in theclosed streamerror unless this pertained to the upstream PubSub service. - The buffer attempted to be purged, but this resulted in the
ENOENTerror. - As a result, the
ENOENTprevented the metrics from updating:
https://github.com/fluent/fluentd/blob/403a28ff8d74adfeefb9842c8342292397ba84b7/lib/fluent/plugin/buffer.rb#L602
I don't know why this closed stream error happens infrequently, but I wonder:
- Should
ENOENTbe ignored inBuffer::FileChunk#purge? - Should
Buffer#purge_chunkcatchENOENTand ensure that@queue_size_metrics.sub(bytesize)runs?
It seems this intermittent closed stream error was reported a while ago: https://github.com/fluent/fluentd/issues/2391
@ashie I suspect we need https://github.com/fluent/fluentd/pull/4336 after all. I'm seeing what looks to be a race condition on a single worker instance.
It seems that your own plugin is related with this stack trace.
2024-10-15 16:50:28 +0000 [error]: #0 /etc/fluent/plugin/out_cloud_pubsub.rb:62:in `write'
Since we can't read this code, I'm not sure the cause yet.
It seems this intermittent closed stream error was reported a while ago: https://github.com/fluent/fluentd/issues/2391
We already found out that #2391 is caused by rollback, but it doesn't appear in your error log.
So it doesn't seem related with this.
@ashie out_cloud_pubsub.rb is this:
# Originally copied from https://github.com/yosssi/fluent-plugin-cloud-pubsub
# License: MIT
require 'google/cloud/pubsub'
module Fluent
class CloudPubSubOutput < BufferedOutput
MAX_REQ_SIZE = 10 * 1024 * 1024 # 10 MB
MAX_MSGS_PER_REQ = 1000
Plugin.register_output('cloud_pubsub', self)
config_param :project, :string, :default => nil
config_param :topic, :string, :default => nil
config_param :key, :string, :default => nil
config_param :max_req_size, :integer, :default => MAX_REQ_SIZE
config_param :max_msgs_per_req, :integer, :default => MAX_MSGS_PER_REQ
unless method_defined?(:log)
define_method("log") { $log }
end
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
def configure(conf)
super
raise Fluent::ConfigError, "'project' must be specified." unless @project
raise Fluent::ConfigError, "'topic' must be specified." unless @topic
end
def multi_workers_ready?
true
end
def start
super
pubsub = Google::Cloud::PubSub.new(project_id: @project, credentials: @key)
@client = pubsub.topic @topic
end
def format(tag, time, record)
[tag, time, record].to_msgpack
end
def publish(msgs)
log.debug "publish #{msgs.length} messages"
@client.publish do |batch|
msgs.each do |m|
batch.publish m
end
end
end
def write(chunk)
msgs = []
msgs_size = 0
chunk.msgpack_each do |tag, time, record|
size = Yajl.dump(record).bytesize
if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req)
publish(msgs)
msgs = []
msgs_size = 0
end
msgs << record.to_json
msgs_size += size
end
if msgs.length > 0
publish(msgs)
end
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
end
end
The error shows that it's happening during seek of file_chunk.rb, so I don't think this plugin is at fault here.
Thanks for sharing the plugin code.
The file buffer stream closed for some reason, resulting in the closed stream error.
AFAIK closed stream means it's closed by this process unexpectedly, not by other process.
So it seems there is a bug in fluentd's buffer code as you say.
But I'm not sure the cause yet.
@ashie I suspect we need https://github.com/fluent/fluentd/pull/4336 after all. I'm seeing what looks to be a race condition on a single worker instance.
If a chunk is possible to be processed by multiple thread simultaneously, it might be effective.
But usually a queued chunk isn't processed by multiple threads, since a queued chunk is popped at the synchronized block in dequeue_chunk method before processing:
https://github.com/fluent/fluentd/blob/a2b935ae2bc4b4d43e5adddbec01092ea4228b9e/lib/fluent/plugin/output.rb#L1189
https://github.com/fluent/fluentd/blob/a2b935ae2bc4b4d43e5adddbec01092ea4228b9e/lib/fluent/plugin/buffer.rb#L561-L573
In addition, you are using only a single flush threads (no flush_thread_count in you configuration).
So I think #4336 wouldn't solve your issue.