fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

IOError at chunk.rollback

Open yteraoka opened this issue 4 years ago • 2 comments

Check CONTRIBUTING guideline first and here is the list to help us investigate the problem.

Describe the bug

IOError occured during the file buffer operation after chunk bytes limit exceeds.

To Reproduce

Described on https://github.com/yteraoka/fluentd-reproduce-ioerror

Expected behavior

No Errors.

Your Environment

Originally this problem happend in Amazon EKS. I reproduced it on docker on mac. (19.03.8)

  • Fluentd or td-agent version: fluentd --version or td-agent --version
    • fluentd 1.11.1
    • Docker Image: fluent/fluentd-kubernetes-daemonset:v1.11.1-debian-elasticsearch7-1.1
  • Operating system: cat /etc/os-release
    • Amazon Linux 2
  • Kernel version: uname -r
    • 4.14.181-142.260.amzn2.x86_64

If you hit the problem with older fluentd version, try latest version first.

Your Configuration

https://github.com/yteraoka/fluentd-reproduce-ioerror/blob/master/fluent.conf

<system>
  root_dir /var/log/
</system>

<label @FLUENT_LOG>
  <match fluent.**>
    @type null
  </match>
</label>

<source>
  @type tail
  @id in_tail_container_logs
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
  exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
  read_from_head true
  refresh_interval 5s
  <parse>
    @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

<match kubernetes.**>
  @type rewrite_tag_filter
  @id rewrite_tag_filter
  capitalize_regex_backreference no

  <rule>
    key $.log
    pattern /./
    tag example_cluster.namespace_name
  </rule>
</match>

<match **>
   @type elasticsearch
   @id out_es
   @log_level info
   include_tag_key true
   host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
   port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
   path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
   scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
   ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
   ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
   user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
   password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
   reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
   reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
   reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
   log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
   logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
   logstash_dateformat "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_DATEFORMAT'] || '%Y.%m.%d'}"
   logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
   index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
   type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
   include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'false'}"
   template_name "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_NAME'] || use_nil}"
   template_file "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_FILE'] || use_nil}"
   template_overwrite "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_OVERWRITE'] || use_default}"
   sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
   request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '5s'}"
   suppress_type_name "#{ENV['FLUENT_ELASTICSEARCH_SUPPRESS_TYPE_NAME'] || 'true'}"
   <buffer>
     @type file
     flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
     flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
     chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
     queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
     retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
     retry_forever true
   </buffer>
</match>
FLUENT_ELASTICSEARCH_HOST=es
FLUENT_ELASTICSEARCH_PORT=9200
FLUENT_ELASTICSEARCH_PATH=/
FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL=1
FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT=1
FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH=1000

Your Error Log

2020-07-28 14:59:19 +0000 [info]: #0 [in_tail_container_logs] following tail of /var/log/containers/test.log
2020-07-28 14:59:21 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 8650165bytes
2020-07-28 14:59:21 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 3295317bytes
...
(chunk bytes limit exceeds repeated)
...
2020-07-28 14:59:25 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 7573972bytes
2020-07-28 14:59:26 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 8169588bytes
2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="cafiscode-eks-cluster.default"
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `rollback'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:339:in `rescue in block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:332:in `block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:331:in `each'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:331:in `write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1004:in `block in handle_stream_simple'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:888:in `write_guard'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:1003:in `handle_stream_simple'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:878:in `execute_chunking'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:808:in `emit_buffered'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/event_router.rb:97:in `emit_stream'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-rewrite-tag-filter-2.2.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:85:in `block in process'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-rewrite-tag-filter-2.2.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:84:in `each'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-rewrite-tag-filter-2.2.0/lib/fluent/plugin/out_rewrite_tag_filter.rb:84:in `process'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/output.rb:797:in `emit_sync'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/event_router.rb:97:in `emit_stream'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:477:in `receive_lines'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:845:in `block in handle_notify'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:877:in `with_io'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:825:in `handle_notify'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:808:in `block in on_notify'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:808:in `synchronize'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:808:in `on_notify'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:653:in `on_notify'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:325:in `block in setup_watcher'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/in_tail.rb:596:in `on_timer'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.6.0/lib/cool.io/loop.rb:88:in `run_once'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/cool.io-1.6.0/lib/cool.io/loop.rb:88:in `run'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="kubernetes.var.log.containers.test.log"
  2020-07-28 14:59:26 +0000 [warn]: #0 suppressed same stacktrace
2020-07-28 14:59:26 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 2493080bytes
2020-07-28 14:59:27 +0000 [warn]: #0 [out_es] chunk bytes limit exceeds for an emitted event stream: 5036846bytes
...

Additional context

I added print debug code then I noticed fluentd call rollback method on closed chunk file. So IOError occurred at call seek (in pos method).

2020-07-28 09:23:53 +0000 [warn]: #0 [out_elasticsearch] chunk bytes limit exceeds for an emitted event stream: 9441885bytes

2020-07-28 09:23:53 +0000 [warn]: #0 [out_elasticsearch] call chunk.rollback (write_once 1)
file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7742942e1dc6558e6eb7f75c3.log
file_chunk.rollback seek and truncate /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7742942e1dc6558e6eb7f75c3.log

2020-07-28 09:23:53 +0000 [warn]: #0 [out_elasticsearch] call chunk.rollback (write_step_by_step 1)
file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff77a721dcc45c6893c0f83116a.log
file_chunk.rollback seek and truncate /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff77a721dcc45c6893c0f83116a.log
file_chunk.purge                      /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff77a721dcc45c6893c0f83116a.log <-- close and unlink in purge

file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7742942e1dc6558e6eb7f75c3.log
file_chunk.rollback seek and truncate /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7742942e1dc6558e6eb7f75c3.log

file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7794a3340240d7d15976f91ec.log
file_chunk.rollback seek and truncate /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7794a3340240d7d15976f91ec.log
file_chunk.purge                      /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff7794a3340240d7d15976f91ec.log

file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff779de15f1cad6b2c7a455f1c5.log
file_chunk.rollback seek and truncate /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff779de15f1cad6b2c7a455f1c5.log
file_chunk.purge                      /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff779de15f1cad6b2c7a455f1c5.log

file_chunk.rollback                   /var/log/worker0/out_elasticsearch/buffer/buffer.b5ab7cff77a721dcc45c6893c0f83116a.log <-- rollback unlinked file

2020-07-28 09:23:53 +0000 [warn]: #0 [out_elasticsearch] #<IOError: closed stream>

Situation

An application output large (over 1MB) log in a line. The docker will split it into 16kb chunks and writes out about 100 lines at a time.

yteraoka avatar Jul 29 '20 00:07 yteraoka

Thanks for the report. We will check it with your setting

repeatedly avatar Jul 29 '20 04:07 repeatedly

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] avatar Dec 18 '20 10:12 github-actions[bot]

We received similar reports from our customer. They use fluentd v1.16.1 and their stack trace seems almost same with this, so the issue still exists in the latest version.

Although I can't reproduce it yet, I found possible race condition issue in the code. There are some cases where locking a chunk is missing when calling chunk.close or chunk.purge.

#4336 tries to fix it.

2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="cafiscode-eks-cluster.default"
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'

closed stream means the file handle owned by the chunk is already closed. Because it's a local file and locked by chunk.mon_enter here, it shouldn't occur in usual. A possible case is it's closed by other thread unexpectedly due to missing lock. AFAIK, only the codes fixed in #4336 can do it. (I'm not sure whether they are really possible to be called simultaneously by multiple threads though.)

ashie avatar Oct 31 '23 09:10 ashie

I'm seeing similar errors in one of our environments. Would this be related?

send an error event stream to @ERROR: error_class=IOError error="closed stream" location="/usr/local/lib/ruby/gems/3.1.0/gems/fluentd-1.16.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="logs"

larntz avatar Nov 07 '23 18:11 larntz

#3056 seems same issue with this

ashie avatar Nov 13 '23 09:11 ashie

Although I can't reproduce it yet, I found possible race condition issue in the code. There are some cases where locking a chunk is missing when calling chunk.close or chunk.purge.

#4336 tries to fix it.

I'm still wondering whether it's really possible. Probably flush thread doesn't call purge_chunk against chunk which is under processing, so it seems hard to occur.

So I was checking other possibilities, then now I've found another scenario.

When a chunk is reached to size limit in the following code: https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L823-L831

following block.call will be called against the :unstaged chunk because above code calls break: https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L854

In the block, the chunk is stacked on operated_chunks https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L345-L347

On the other hand, it's possible to be purged when ShouldRetry is raised while processing trailing chunks: https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L858-L867

Although it was purged already, it's still remained in operated_chunks. Thus, the error code of this issue is called against the purged chunk: https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L389-L401

Now I think it's the true cause of this issue instead of #4336

ashie avatar Nov 15 '23 05:11 ashie

All reports about this error say that many chunk bytes limit exceeds are occurred before it. This is an evidence that the true cause is https://github.com/fluent/fluentd/issues/3089#issuecomment-1811839198

ashie avatar Nov 15 '23 05:11 ashie

I've created the patch for this issue: #4342

ashie avatar Nov 15 '23 09:11 ashie

When a chunk is reached to size limit in the following code:

https://github.com/fluent/fluentd/blob/dd045d30783d5ca28e332774d25b170bc552289d/lib/fluent/plugin/buffer.rb#L823-L831

following block.call will be called against the :unstaged chunk because above code calls break:

The above code is introduced by aad2feb19, which doesn't exist at the time original comment is posted. So it's not the cause of the original comment. On the other hand, another similar break code exists here: https://github.com/fluent/fluentd/blob/7e9eba736ff40ad985341be800ddc46558be75f2/lib/fluent/plugin/buffer.rb#L845-L847 It's introduced at 2016-06-20 by b5f2e9fe4.

Now I've succeeded to reproduce the bug triggered by this code, I've added a test to reproduce it in #4342 (and I've confirmed that the patch in #4342 fixes the bug).

ashie avatar Dec 12 '23 08:12 ashie