Kafka consumer stuck after timeout
We are running some docker services that connect to a single kafka (and zookeeper) service. After some time, one of the services stops working and inspecting the logs we see the following:
2021-05-06T07:48:16.960907113Z WARNING:kafka.coordinator:Heartbeat session expired, marking coordinator dead
2021-05-06T07:48:16.960988770Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: Heartbeat session expired.
2021-05-06T07:48:18.986239094Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.494681416Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.597970941Z WARNING:kafka.client:<BrokerConnection node_id=coordinator-1001 host=kafka:9094 <connected> [IPv4 ('10.0.1.24', 9094)]> timed out after 11000 ms. Closing connection.
2021-05-06T07:48:19.599096759Z WARNING:kafka.client:Node coordinator-1001 connection failed -- refreshing metadata
2021-05-06T07:48:19.600239792Z ERROR:kafka.coordinator:Error sending OffsetCommitRequest_v2 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]
2021-05-06T07:48:19.600772241Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: [Error 7] RequestTimedOutError: Request timed out after 11000 ms.
2021-05-06T07:48:19.601373827Z ERROR:kafka.coordinator:Error sending HeartbeatRequest_v1 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]
the problem is that the consumer doesn't try to reconnect again (or raises an exception so another container can take its place) and the service gets stuck. The rest of the services are fine so I don't think there was any interruption in the service.
The loop that reads kafka messages is really simple:
for msg in consumer:
do_work()
consumer.commit()
and we are using kafka-python==2.0.1 and kafka docker image: wurstmeister/kafka:2.13-2.7.0
How can I force the consumer to try to reconnect or exit?
We are running some docker services that connect to a single kafka (and zookeeper) service. After some time, one of the services stops working and inspecting the logs we see the following:
2021-05-06T07:48:16.960907113Z WARNING:kafka.coordinator:Heartbeat session expired, marking coordinator dead 2021-05-06T07:48:16.960988770Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: Heartbeat session expired. 2021-05-06T07:48:18.986239094Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001 2021-05-06T07:48:19.494681416Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001 2021-05-06T07:48:19.597970941Z WARNING:kafka.client:<BrokerConnection node_id=coordinator-1001 host=kafka:9094 <connected> [IPv4 ('10.0.1.24', 9094)]> timed out after 11000 ms. Closing connection. 2021-05-06T07:48:19.599096759Z WARNING:kafka.client:Node coordinator-1001 connection failed -- refreshing metadata 2021-05-06T07:48:19.600239792Z ERROR:kafka.coordinator:Error sending OffsetCommitRequest_v2 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms] 2021-05-06T07:48:19.600772241Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: [Error 7] RequestTimedOutError: Request timed out after 11000 ms. 2021-05-06T07:48:19.601373827Z ERROR:kafka.coordinator:Error sending HeartbeatRequest_v1 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]the problem is that the consumer doesn't try to reconnect again (or raises an exception so another container can take its place) and the service gets stuck. The rest of the services are fine so I don't think there was any interruption in the service.
The loop that reads kafka messages is really simple:
for msg in consumer: do_work() consumer.commit()and we are using
kafka-python==2.0.1and kafka docker image:wurstmeister/kafka:2.13-2.7.0How can I force the consumer to try to reconnect or exit?
How does your constructor of KafkaConsumer looks like?
I once had a problem which was caused by the "api_version" parameter.
I just removed it, because my loop kept open and didn't exit.
We are getting this issue and don't pass a api_version parameter
Do you still have the issue?
If I remember correctly upgrading to kafka-python 2.0.2 fixed it for us. Its no longer an issue for us.