kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Enable `retry` mechanism for Elasticsearch HTTP Errors
Currently, if the Jest request exceeds connection.timeout.ms
or read.timeout.ms
, the Elasticsearch Connector throws a ConnectionError and kills the task. Preferably, the current max.retries
and retry.backoff.ms
configuration options would apply to these timeouts, or a new set of options would be offered to handle this.
Sample Stack Trace
org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.toConnectException(BulkProcessor.java:466)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.failAndStop(BulkProcessor.java:451)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor.access$100(BulkProcessor.java:45)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:345)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:329)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:118)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:57)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.executeBulk(JestElasticsearchClient.java:355)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:40)
at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:25)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:372)
at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:343)
... 5 more
Does it still same?