aiokafka
aiokafka copied to clipboard
Unclosed AIOKafkaConnection that does not cause the consumer to crash
Describe the bug
I have a consumer group, which consists of pods of my service. Pods appear and disappear depending on the load. At some point, my pod appeared and at the start there was an error Unclosed AIOKafkaConnection
. After which the service launched successfully and the consumer did not receive a single message. At the same time, checking the connections from the consumer to the broker, it was established. I have a method that is called once every 10 seconds to check the connection,
async def ping(self) -> bool:
return await self._client.ready(self._client.get_random_node())
This way I got a non-working consumer that didn't fail, but didn't actually work and Kafka considered it part of the group
Here is the full event log, including the Kafka log
12:12:01.736 - a new pod my-pod-8f45d9497-xvtnx
12:12:02.319 - Unclosed AIOKafkaConnection error in my-pod-8f45d9497-xvtnx pod
12:17:45 - pod my-pod-8f45d9497-xvtnx died
Kafka logs:
[12:12:01,799] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group device-shadow-prod in Stable state. Created a new member id aiokafka-0.7.2-3b825121-fe0f-45fd-b92e-996ffc08d6ba for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[12:12:01,799] INFO [GroupCoordinator 1]: Preparing to rebalance group device-shadow-prod in state PreparingRebalance with old generation 3128 (__consumer_offsets-5) (reason: Adding new member aiokafka-0.7.2-3b825121-fe0f- 45fd-b92e-996ffc08d6ba with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator)
[12:12:02,273] INFO [GroupCoordinator 1]: Stabilized group device-shadow-prod generation 3129 (__consumer_offsets-5) with 2 members (kafka.coordinator.group.GroupCoordinator)
[12:12:02,274] INFO [GroupCoordinator 1]: Assignment received from leader aiokafka-0.7.2-46313310-5ad0-463a-adc4-fa5cdeac7d7a for group device-shadow-prod for generation 3129. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[12:17:49,359] INFO [GroupCoordinator 1]: Member aiokafka-0.7.2-3b825121-fe0f-45fd-b92e-996ffc08d6ba in group device-shadow-prod has failed, removing it from the group (kafka.coordinator.group .GroupCoordinator)
[12:17:49,359] INFO [GroupCoordinator 1]: Preparing to rebalance group device-shadow-prod in state PreparingRebalance with old generation 3129 (__consumer_offsets-5) (reason: removing member aiokafka-0.7.2-3b825121-fe0f-45fd -b92e-996ffc08d6ba on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[12:17:50,453] INFO [GroupCoordinator 1]: Stabilized group device-shadow-prod generation 3130 (__consumer_offsets-5) with 1 members (kafka.coordinator.group.GroupCoordinator)
[12:17:50,454] INFO [GroupCoordinator 1]: Assignment received from leader aiokafka-0.7.2-46313310-5ad0-463a-adc4-fa5cdeac7d7a for group device-shadow-prod for generation 3130. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
I'm using the default settings for consumer. The situation looks like rebalancing occurred again after max_poll_interval_ms expired. Rebalancing occurred after ~345 seconds instead of 300. But this still does not explain the problem with the fact that the consumer was not actually working
Expected behaviour
I expected an exception to be raised after the Unclosed AIOKafkaConnection
error or if this was not a critical error, then I expected the consumer to work
Environment (please complete the following information):
- aiokafka version 0.7.2
- Kafka Broker version 3.6.0:
Reproducible example I tried for a long time to reproduce the situation with receiving the Unclosed AIOKafkaConnection error so that it did not end with an execution, and it did not work. Looking at the consumer's code, I don't even have any guesses as to how exactly this could have happened, I would be glad to have any suggestions