fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

Fluent-Bit does not handle 201-Created with 409-Conflicts errors from ElasticSearch correctly

Open simbou2000 opened this issue 3 years ago • 5 comments

Bug Report

Describe the bug ElasticSearch server returns responses that Fluent-Bit will parse and check if there were any issues with the data it just transmitted. Right now, Fluent-Bit does handle a case of "error" where ElasticSearch informs of "conflicts" and Fluent-Bit will silently ignore that as a successful transmission. (https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776) This check however will fail if the last bulk had both conflicts and created entries since the status code for a created entry is 201, which Fluent-Bit will consider it as an error.

I have included an example of chunk being transmitted 5 times and their ElasticSearch responses in the screenshot section. 1st attempt: errors=true, an overload reject, but created 2 entries from the bulk. 2nd attempt: errors=true, an overload reject again, with 2 conflicts from the 1st attempt. 3rd attempt: Same as 2nd attempt. 4th attempt: errors=true, created the last entry, with the same 2 conflicts as before. 5th attempt: errors=true, 3 conflicts, but Fluent-Bit handle this as a success.

The 4th attempt in this case should have been a success since even if ElasticSearch indicates there was an error (errors=true), Fluent-Bit handles conflicts as a non-issue, but a 201 (created) status code was also mixed within those items and treated it as an actual error.

This cause Fluent-Bit to resend the same bulk multiple times more than necessary and put a higher stress on the ElasticSearch server. Seen other cases where the following attempt were 429 Rejected although it already sent that bulk correctly.

To Reproduce

  • Steps to reproduce the problem: Use ElasticSearch output with any kind of data where the server is under heavy load or a simulated overloaded server.

Expected behavior Fluent-Bit should also exclude code 201 in this check: https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776 For example:

if (item_val.via.i64 != 409 && item_val.via.i64 != 201) {
    check = FLB_TRUE;
    goto done;
}

Possibly other codes or any 2xx status codes should be excluded too.

Screenshots

[2022/10/26 15:46:36] [error] [output:es:es.1] error: Output
{
 "took": 15022,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@52f3939d}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@1a3c5f59} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10134, completed tasks = 10934096]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 801595,
    "_primary_term": 1,
    "status": 201
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 806215,
    "_primary_term": 1,
    "status": 201
   }
  }
 ]
}
[2022/10/26 15:46:36] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 11 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:47:11] [error] [output:es:es.1] error: Output
{
 "took": 24131,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7a1057e7}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@757b6fa9} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10489, completed tasks = 10947558]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:47:11] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:47:40] [error] [output:es:es.1] error: Output
{
 "took": 13507,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "status": 429,
    "error": {
     "type": "es_rejected_execution_exception",
     "reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7dc88a53}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@4b6357aa} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10512, completed tasks = 10964755]]"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:47:40] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...

[2022/10/26 15:48:10] [error] [output:es:es.1] error: Output
{
 "took": 13759,
 "errors": true,
 "items": [
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "f52b988e-ee72-79b4-7854-863f517883d4",
    "_version": 1,
    "result": "created",
    "_shards": {
     "total": 2,
     "successful": 2,
     "failed": 0
    },
    "_seq_no": 892046,
    "_primary_term": 1,
    "status": 201
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "4",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  },
  {
   "create": {
    "_index": "platform-filebeat-2022.10.26-000671",
    "_type": "_doc",
    "_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
    "status": 409,
    "error": {
     "type": "version_conflict_engine_exception",
     "reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
     "index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
     "shard": "1",
     "index": "platform-filebeat-2022.10.26-000671"
    }
   }
  }
 ]
}
[2022/10/26 15:48:10] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 74 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)

...


[2022/10/26 15:49:43] [ info] [engine] flush chunk '444271-1666813576.646583913.flb' succeeded at retry 4: task_id=9, input=tail.0 > output=es.1 (out_id=1)

Your Environment

  • Version used: td-agent-bit 1.9.3
  • Configuration:
  • Environment name and version (e.g. Kubernetes? What version?): RHEL 8 VM
  • Server type and version:
  • Operating System and version:
  • Filters and plugins:

Additional context Wrongly detecting a problem when a mix of conflicts and created entries in ElasticSearch bulk only cause more stress on ElasticSearch server by resending the whole bulk again.

simbou2000 avatar Nov 03 '22 19:11 simbou2000

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] avatar Feb 02 '23 02:02 github-actions[bot]

This is a one-liner fix that reduces unnecessary requests to Elasticsearch server when it is overloaded. The issue needs to remain open.

acastong avatar Feb 02 '23 15:02 acastong

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

github-actions[bot] avatar May 05 '23 01:05 github-actions[bot]

This issue was closed because it has been stalled for 5 days with no activity.

github-actions[bot] avatar May 10 '23 01:05 github-actions[bot]

@simbou2000 We currently have exactly this problem. But I wonder why you didn't create it as a pull request in the first place?

dsteininger86 avatar Jun 18 '24 17:06 dsteininger86

Hi, I tried to write another candidates to fix this issue: https://github.com/fluent/fluent-bit/pull/9236 The formerly proposed solution does not covered for (failure, success, success, failure) case. We needn't to eliminate success information during the error checking.

cosmo0920 avatar Aug 15 '24 11:08 cosmo0920