Fluent-Bit does not handle 201-Created with 409-Conflicts errors from ElasticSearch correctly
Bug Report
Describe the bug ElasticSearch server returns responses that Fluent-Bit will parse and check if there were any issues with the data it just transmitted. Right now, Fluent-Bit does handle a case of "error" where ElasticSearch informs of "conflicts" and Fluent-Bit will silently ignore that as a successful transmission. (https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776) This check however will fail if the last bulk had both conflicts and created entries since the status code for a created entry is 201, which Fluent-Bit will consider it as an error.
I have included an example of chunk being transmitted 5 times and their ElasticSearch responses in the screenshot section. 1st attempt: errors=true, an overload reject, but created 2 entries from the bulk. 2nd attempt: errors=true, an overload reject again, with 2 conflicts from the 1st attempt. 3rd attempt: Same as 2nd attempt. 4th attempt: errors=true, created the last entry, with the same 2 conflicts as before. 5th attempt: errors=true, 3 conflicts, but Fluent-Bit handle this as a success.
The 4th attempt in this case should have been a success since even if ElasticSearch indicates there was an error (errors=true), Fluent-Bit handles conflicts as a non-issue, but a 201 (created) status code was also mixed within those items and treated it as an actual error.
This cause Fluent-Bit to resend the same bulk multiple times more than necessary and put a higher stress on the ElasticSearch server. Seen other cases where the following attempt were 429 Rejected although it already sent that bulk correctly.
To Reproduce
- Steps to reproduce the problem: Use ElasticSearch output with any kind of data where the server is under heavy load or a simulated overloaded server.
Expected behavior Fluent-Bit should also exclude code 201 in this check: https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L773-L776 For example:
if (item_val.via.i64 != 409 && item_val.via.i64 != 201) {
check = FLB_TRUE;
goto done;
}
Possibly other codes or any 2xx status codes should be excluded too.
Screenshots
[2022/10/26 15:46:36] [error] [output:es:es.1] error: Output
{
"took": 15022,
"errors": true,
"items": [
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "f52b988e-ee72-79b4-7854-863f517883d4",
"status": 429,
"error": {
"type": "es_rejected_execution_exception",
"reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@52f3939d}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@1a3c5f59} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10134, completed tasks = 10934096]]"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 801595,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 806215,
"_primary_term": 1,
"status": 201
}
}
]
}
[2022/10/26 15:46:36] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 11 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)
...
[2022/10/26 15:47:11] [error] [output:es:es.1] error: Output
{
"took": 24131,
"errors": true,
"items": [
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "f52b988e-ee72-79b4-7854-863f517883d4",
"status": 429,
"error": {
"type": "es_rejected_execution_exception",
"reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7a1057e7}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@757b6fa9} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10489, completed tasks = 10947558]]"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "4",
"index": "platform-filebeat-2022.10.26-000671"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "1",
"index": "platform-filebeat-2022.10.26-000671"
}
}
}
]
}
[2022/10/26 15:47:11] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)
...
[2022/10/26 15:47:40] [error] [output:es:es.1] error: Output
{
"took": 13507,
"errors": true,
"items": [
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "f52b988e-ee72-79b4-7854-863f517883d4",
"status": 429,
"error": {
"type": "es_rejected_execution_exception",
"reason": "rejected execution of org.elasticsearch.action.support.replication.TransportWriteAction$1/WrappedActionListener{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6662/0x0000000801b00b08@7dc88a53}{org.elasticsearch.action.support.replication.ReplicationOperation$$Lambda$6666/0x0000000801b013a8@4b6357aa} on EsThreadPoolExecutor[name = elasticsearch-es-default-resized-0/write, queue capacity = 10000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7675026a[Running, pool size = 4, active threads = 4, queued tasks = 10512, completed tasks = 10964755]]"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "4",
"index": "platform-filebeat-2022.10.26-000671"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "1",
"index": "platform-filebeat-2022.10.26-000671"
}
}
}
]
}
[2022/10/26 15:47:40] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 16 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)
...
[2022/10/26 15:48:10] [error] [output:es:es.1] error: Output
{
"took": 13759,
"errors": true,
"items": [
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "f52b988e-ee72-79b4-7854-863f517883d4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 892046,
"_primary_term": 1,
"status": 201
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "5cb12418-c53d-bcc0-4d8e-25ca508c238c",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[5cb12418-c53d-bcc0-4d8e-25ca508c238c]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "4",
"index": "platform-filebeat-2022.10.26-000671"
}
}
},
{
"create": {
"_index": "platform-filebeat-2022.10.26-000671",
"_type": "_doc",
"_id": "196d923e-0e07-10f0-b091-ae5cf142a7a7",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[196d923e-0e07-10f0-b091-ae5cf142a7a7]: version conflict, document already exists (current version [1])",
"index_uuid": "x9HHbHGpRdiwMjDFgvdETQ",
"shard": "1",
"index": "platform-filebeat-2022.10.26-000671"
}
}
}
]
}
[2022/10/26 15:48:10] [ warn] [engine] failed to flush chunk '444271-1666813576.646583913.flb', retry in 74 seconds: task_id=9, input=tail.0 > output=es.1 (out_id=1)
...
[2022/10/26 15:49:43] [ info] [engine] flush chunk '444271-1666813576.646583913.flb' succeeded at retry 4: task_id=9, input=tail.0 > output=es.1 (out_id=1)
Your Environment
- Version used: td-agent-bit 1.9.3
- Configuration:
- Environment name and version (e.g. Kubernetes? What version?): RHEL 8 VM
- Server type and version:
- Operating System and version:
- Filters and plugins:
Additional context Wrongly detecting a problem when a mix of conflicts and created entries in ElasticSearch bulk only cause more stress on ElasticSearch server by resending the whole bulk again.
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.
This is a one-liner fix that reduces unnecessary requests to Elasticsearch server when it is overloaded. The issue needs to remain open.
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.
This issue was closed because it has been stalled for 5 days with no activity.
@simbou2000 We currently have exactly this problem. But I wonder why you didn't create it as a pull request in the first place?
Hi, I tried to write another candidates to fix this issue: https://github.com/fluent/fluent-bit/pull/9236 The formerly proposed solution does not covered for (failure, success, success, failure) case. We needn't to eliminate success information during the error checking.