kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Enable `retry` mechanism for Elasticsearch HTTP Errors

Open lukegil opened this issue 5 years ago • 1 comments

Currently, if the Jest request exceeds connection.timeout.ms or read.timeout.ms, the Elasticsearch Connector throws a ConnectionError and kills the task. Preferably, the current max.retries and retry.backoff.ms configuration options would apply to these timeouts, or a new set of options would be offered to handle this.

Sample Stack Trace

org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor.toConnectException(BulkProcessor.java:466)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor.failAndStop(BulkProcessor.java:451)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor.access$100(BulkProcessor.java:45)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:345)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:329)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
	at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
	at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
	at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
	at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
	at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
	at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
	at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
	at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:118)
	at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:57)
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.executeBulk(JestElasticsearchClient.java:355)
	at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:40)
	at io.confluent.connect.elasticsearch.BulkIndexingClient.execute(BulkIndexingClient.java:25)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:372)
	at io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:343)
	... 5 more	

lukegil avatar Apr 11 '19 19:04 lukegil

Does it still same?

toughrogrammer avatar Feb 26 '21 15:02 toughrogrammer