kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Connector shows up as "RUNNING" after it stopped working and entered unending rebalance
Hello,
When using the elasticsearch connector, I keep running into a problem that does not occur with other connectors. The connector sometimes fails due to poll timeout:
[2021-02-23 21:06:07,652] INFO [Consumer clientId=connector-consumer-elasticsearch_site1_sink-0, groupId=connect-elasticsearch_site1_sink] Member connector-consumer-elasticsearch_site1_sink-0-fa420937-fcde-4094-9a07-25782aea24d5 sending LeaveGroup request to coordinator 172.22.68.100:9092 (id: 2147482644 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
In Kafka, the connector leaving group is registered:
[2021-02-23 21:06:07,653] INFO [GroupCoordinator 1003]: Member[group.instance.id None, member.id connector-consumer-elasticsearch_site1_sink-0-fa420937-fcde-4094-9a07-25782aea24d5] in group connect-elasticsearch_site1_sink has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-02-23 21:06:07,653] INFO [GroupCoordinator 1003]: Preparing to rebalance group connect-elasticsearch_site1_sink in state PreparingRebalance with old generation 11 (__consumer_offsets-48) (reason: removing member connector-consumer-elasticsearch_site1_sink-0-fa420937-fcde-4094-9a07-25782aea24d5 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2021-02-23 21:06:07,653] INFO [GroupCoordinator 1003]: Group connect-elasticsearch_site1_sink with generation 12 is now empty (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
The rebalance never finishes; There is no new data written to elasticsearch, but the task shows up as RUNNING:
{
"connector": {
"state": "RUNNING",
"worker_id": "172.23.95.82:8083"
},
"name": "elasticsearch_site1_sink",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.23.95.82:8083"
}
],
"type": "sink"
}
The group gets transitioned to state Dead after one week; the connector stays RUNNING.
I'm using Kafka 2.7.0; I'm using two Kafka Connect instances running in distributed mode; Both Kafka brokers and Kafka Connect clients are inside docker containers. The error occurs with plugin versions 11.0.0 and 11.0.3. The connector resumes operation after either
- restarting the task; or
- turning off the kafka connect instance that was running the task.
The issue appeared on two geographically distinct environments.
@IridiumOxide can you please post your configs and your max.poll.timeout.ms
config for the worker
@IridiumOxide can you please post your configs and your
max.poll.timeout.ms
config for the worker
I also encountered the same problem. My connector uses docker deployment, do I need to set the configuration to the docker environment variable?
CONNECT_MAX_POLL_INTERVAL_MS=3600000
@ReasonDuan can you post your connector and consumer configs? It will be a lot easier for us to understand what's going wrong if we have the whole picture
Hi, I am facing exactly the same problem, connector and task are in "RUNNING" state but no data is sent to ElasticSearch.
I described my problem here https://forum.confluent.io/t/elasticsearch-sink-connector-unrecoverable-exception/1927/4 (v11.0.4). (There is no difference if my script, mentioned in the external link, runs or not).
http://localhost:8083/connectors/elasticsearch-sink/status
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "xxx.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "xxx.xxx.xxx.xxx:8083"
}
],
"type": "sink"
}
http://localhost:8083/connectors/elasticsearch-sink/tasks/0/status
{
"id": 0,
"state": "RUNNING",
"worker_id": "xxx.xxx.xxx.xxx:8083"
}
I tried upgrading to v11.0.6, there is no difference.
Logs server.log controller.log connect.log
Properties xxx-elasticsearch.properties.txt connect-standalone.properties.txt server.properties.txt zookeeper.properties.txt
The same problem is described here https://github.com/confluentinc/kafka-connect-elasticsearch/issues/534
In order for me to override max.poll.interval.ms
, I had to add connector.client.config.override.policy=All
into connect-standalone.properties
and consumer.override.max.poll.interval.ms
into the plugin properties file.
Hi, we had exactly the same problem last night. We updated to cp-kafka-connect-base:6.1.1 and kafka-connect-elasticsearch:11.0.6 one week ago. Before, we used cp-kafka-connect-base:5.5.1 with kafka-connect-elasticsearch:5.4.0 and we never had this kind of issue. The configuration of connectors remained the same between the updates.
Do you have an idea ? Thanks.
Same - or at least similar problem - here. Use local elasticsearch-7.13.2 and the confluentinc docker-compose v6.2.0 without Kafka Connect. Bootstrap Kafka Connect 'manually' from within the confluent-6.2.0 binaries with the following command: ElasticSearch Sink connector 11.0.6.
bin/connect-standalone etc/kafka/connect-standalone.properties ......./connect-es-sink.properties
connect-es-sink.properties being:
name=es-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=test-topic connection.url=localhost:9200 type.name=_doc key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false schema.ignore=true
send some json records through test-topic, observe it succeed, and teardown ElasticSearch.
The connector hangs in Running state, and the following logs are printed:
[2021-07-14 10:50:15,919] INFO [Consumer clientId=connector-consumer-es-sink-0, groupId=connect-es-sink] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:820)
[2021-07-14 10:50:55,765] INFO Creating index test-topic. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:162)
[2021-07-14 10:50:55,779] WARN request [HEAD http://localhost:9200/test-topic?ignore_unavailable=false&expand_wildcards=open%2Cclosed&allow_no_indices=false] returned 1 warnings: [299 Elasticsearch-7.13.2-4d960a0733be83dd2543ca018aa4ddc42e956800 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:65)
[2021-07-14 10:50:55,897] WARN request [POST http://localhost:9200/_bulk?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.13.2-4d960a0733be83dd2543ca018aa4ddc42e956800 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.13/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:65)
[2021-07-14 10:51:10,847] WARN Bulk request 2 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.ConnectException: Timeout connecting to [localhost/127.0.0.1:9200]
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:628)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.base/java.lang.Thread.run(Thread.java:834)
I'd expect (and hope) the ElasticSearch sink task to fail. Is that a correct assumption?
Version 5.2.5 of the connector behaves as expected. Here are some logs:
[2021-07-14 11:35:18,899] WARN Failed to execute batch 2 of 1 records with attempt 1/6, will attempt retry after 0 ms. Failure reason: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor:466)
[2021-07-14 11:35:19,906] WARN Failed to execute batch 2 of 1 records with attempt 2/6, will attempt retry after 124 ms. Failure reason: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor:466)
[2021-07-14 11:35:21,034] WARN Failed to execute batch 2 of 1 records with attempt 3/6, will attempt retry after 237 ms. Failure reason: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor:466)
[2021-07-14 11:35:22,276] WARN Failed to execute batch 2 of 1 records with attempt 4/6, will attempt retry after 624 ms. Failure reason: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor:466)
[2021-07-14 11:35:23,915] WARN Failed to execute batch 2 of 1 records with attempt 5/6, will attempt retry after 575 ms. Failure reason: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out (io.confluent.connect.elasticsearch.bulk.BulkProcessor:466)
[2021-07-14 11:35:25,498] ERROR Failed to execute batch 2 of 1 records after total of 6 attempt(s) (io.confluent.connect.elasticsearch.bulk.BulkProcessor:477)
org.apache.http.conn.ConnectTimeoutException: Connect to localhost:9200 [localhost/127.0.0.1] failed: connect timed out
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
Hi @MPeli @levzem @ilanjiR Do you have some advice to avoid this problem ? I have the default config for my tasks.
max.poll.interval.ms=300000 max.poll.records=500
We had twice this trouble this week. (One because of rebalance, and another due to a connect exception with elasticsearch node this morning. We had to restart all the kconnect instance. (It contains 4 connectors Elastic and 2 connectors jdbc).
Thanks.