kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Kafka consumer stuck after timeout

Open Gonzalo933 opened this issue 4 years ago • 4 comments

We are running some docker services that connect to a single kafka (and zookeeper) service. After some time, one of the services stops working and inspecting the logs we see the following:

2021-05-06T07:48:16.960907113Z WARNING:kafka.coordinator:Heartbeat session expired, marking coordinator dead
2021-05-06T07:48:16.960988770Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: Heartbeat session expired.
2021-05-06T07:48:18.986239094Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.494681416Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.597970941Z WARNING:kafka.client:<BrokerConnection node_id=coordinator-1001 host=kafka:9094 <connected> [IPv4 ('10.0.1.24', 9094)]> timed out after 11000 ms. Closing connection.
2021-05-06T07:48:19.599096759Z WARNING:kafka.client:Node coordinator-1001 connection failed -- refreshing metadata
2021-05-06T07:48:19.600239792Z ERROR:kafka.coordinator:Error sending OffsetCommitRequest_v2 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]
2021-05-06T07:48:19.600772241Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: [Error 7] RequestTimedOutError: Request timed out after 11000 ms.
2021-05-06T07:48:19.601373827Z ERROR:kafka.coordinator:Error sending HeartbeatRequest_v1 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]

the problem is that the consumer doesn't try to reconnect again (or raises an exception so another container can take its place) and the service gets stuck. The rest of the services are fine so I don't think there was any interruption in the service.

The loop that reads kafka messages is really simple:

for msg in consumer:
        do_work()
        consumer.commit()

and we are using kafka-python==2.0.1 and kafka docker image: wurstmeister/kafka:2.13-2.7.0

How can I force the consumer to try to reconnect or exit?

Gonzalo933 avatar May 07 '21 07:05 Gonzalo933

We are running some docker services that connect to a single kafka (and zookeeper) service. After some time, one of the services stops working and inspecting the logs we see the following:

2021-05-06T07:48:16.960907113Z WARNING:kafka.coordinator:Heartbeat session expired, marking coordinator dead
2021-05-06T07:48:16.960988770Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: Heartbeat session expired.
2021-05-06T07:48:18.986239094Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.494681416Z WARNING:kafka.coordinator.consumer:Auto offset commit failed for group main-covid: NodeNotReadyError: coordinator-1001
2021-05-06T07:48:19.597970941Z WARNING:kafka.client:<BrokerConnection node_id=coordinator-1001 host=kafka:9094 <connected> [IPv4 ('10.0.1.24', 9094)]> timed out after 11000 ms. Closing connection.
2021-05-06T07:48:19.599096759Z WARNING:kafka.client:Node coordinator-1001 connection failed -- refreshing metadata
2021-05-06T07:48:19.600239792Z ERROR:kafka.coordinator:Error sending OffsetCommitRequest_v2 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]
2021-05-06T07:48:19.600772241Z WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-1001) for group main-covid: [Error 7] RequestTimedOutError: Request timed out after 11000 ms.
2021-05-06T07:48:19.601373827Z ERROR:kafka.coordinator:Error sending HeartbeatRequest_v1 to node coordinator-1001 [[Error 7] RequestTimedOutError: Request timed out after 11000 ms]

the problem is that the consumer doesn't try to reconnect again (or raises an exception so another container can take its place) and the service gets stuck. The rest of the services are fine so I don't think there was any interruption in the service.

The loop that reads kafka messages is really simple:

for msg in consumer:
        do_work()
        consumer.commit()

and we are using kafka-python==2.0.1 and kafka docker image: wurstmeister/kafka:2.13-2.7.0

How can I force the consumer to try to reconnect or exit?

How does your constructor of KafkaConsumer looks like?

I once had a problem which was caused by the "api_version" parameter.

I just removed it, because my loop kept open and didn't exit.

MateusMarcuzzo avatar Sep 16 '21 17:09 MateusMarcuzzo

We are getting this issue and don't pass a api_version parameter

calebickler avatar Jan 06 '22 19:01 calebickler

Do you still have the issue?

MateusMarcuzzo avatar Jul 22 '22 14:07 MateusMarcuzzo

If I remember correctly upgrading to kafka-python 2.0.2 fixed it for us. Its no longer an issue for us.

calebickler avatar Jul 23 '22 00:07 calebickler