kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Connector is not auto retrying on Indexing Error

Open spencerdcarlson opened this issue 2 years ago • 1 comments

When attempting to write to ElasticSearch and an "Indexing record failed" error occurs, the connector dies without any retries. We do not even see the retry log execute.

Kafka Connect Image: confluentinc/cp-kafka-connect:7.1.1 Elastic Search Connector: confluentinc/kafka-connect-elasticsearch:13.0.0 Elastic Search Version: 7.8.0

Connector Config

{
      "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "connection.url" : "<REDACTED>",
      "connection.username" : "<REDACTED>",
      "connection.password" : "<REDACTED>",
      "tasks.max" : "1",
      "consumer.override.bootstrap.servers" : "<REDACTED>",
      "topics" : "confluent-audit-log-events",
      "name" : "erie-confluent-audit-logs",
      "linger.ms" : "5000",
      "connection.timeout.ms" : "15000",
      "read.timeout.ms" : "15000",
      "flush.synchronously" : "true",
      "max.retries" : "10",
      "retry.backoff.ms" : "1000",
      "consumer.override.sasl.jaas.config" : "<REDACTED>",
      "producer.override.sasl.jaas.config" : "<REDACTED>",
      "type.name" : "_doc",
      "behavior.on.malformed.documents" : "WARN",
      "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable" : "false",
      "schema.ignore" : "true",
      "key.ignore" : "true",
      "transforms" : "topicFromJsonPath, topicReformat, topicDateSuffix",
      "transforms.topicFromJsonPath.type" : "io.confluent.connect.transforms.ExtractTopic$Value",
      "transforms.topicFromJsonPath.field" : "$[\"type\"]",
      "transforms.topicFromJsonPath.field.format" : "JSON_PATH",
      "transforms.topicFromJsonPath.skip.missing.or.null" : "false",
      "transforms.topicReformat.type" : "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.topicReformat.regex" : "([^\\/]+)\\/([a-z]+)",
      "transforms.topicReformat.replacement" : "confluent-audit-logs-$1-$2"
      "transforms.topicDateSuffix.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
      "transforms.topicDateSuffix.timestamp.format" : "yyyy-MM-dd",
      "transforms.topicDateSuffix.topic.format" : "$${topic}-$${timestamp}",
      "errors.tolerance" : "all",
      "errors.log.enable" : true,
      "errors.log.include.messages" : true,
      "errors.deadletterqueue.topic.name" : "kafka-connect-cluster.erie.usw2.prod.dlq.erie-confluent-audit-logs.v1",
      "errors.deadletterqueue.context.headers.enable" : "true",
    }

Stacktrace

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:173)
at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402)
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:383)
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onResponse(ActionListener.java:341)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
at java.base/java.lang.Thread.run(Thread.java:829)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:71)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2188)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:169)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.elasticsearch.action.bulk.Retry$RetryHandler.finishHim(Retry.java:168)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:128)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:184)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
Caused by: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of processing of [443935630][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[confluent-audit-logs-io.confluent.kafka.server-authorization-2022-05-25][0]] containing [27] requests, target allocation id: NJf_aiuQROqIBvKtXQr8Gg, primary term: 1 on EsThreadPoolExecutor[name = prod-usw2-cs-main-es-logging-esdata-10/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5782c33f[Running, pool size = 8, active threads = 8, queued tasks = 246, completed tasks = 202515737]]]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
... 5 more
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:565)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:59)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:112)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:56)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
at io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:565)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:426)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:56)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:171)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:59)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:128)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
org.apache.kafka.connect.errors.ConnectException: Indexing record failed.
at org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:112)
... 5 more
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onResponse(ActionListener.java:341)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:173)
at java.base/java.lang.Thread.run(Thread.java:829)

spencerdcarlson avatar May 25 '22 15:05 spencerdcarlson