aiokafka
aiokafka copied to clipboard
kafka consumer never reconnects
Describe the bug to the best of my knowledge, the consumer is somehow dropping a connection to the kakfa brokers, and when it tries to reconnect it also seems to completely block the active event loop :(
heres how the logs look on the api:
2023-02-04 22:31:05,694 level=ERROR [client._get_conn:460] Unable connect to node with id 4:
2023-02-04 22:31:18,166 level=ERROR [client._get_conn:460] Unable connect to node with id 1:
2023-02-04 22:32:55,445 level=ERROR [client._get_conn:460] Unable connect to node with id 5:
2023-02-04 22:33:35,451 level=ERROR [client._get_conn:460] Unable connect to node with id 6:
2023-02-04 22:34:15,457 level=ERROR [client._get_conn:460] Unable connect to node with id 4:
2023-02-04 22:34:55,462 level=ERROR [client._get_conn:460] Unable connect to node with id 3:
2023-02-04 22:35:35,468 level=ERROR [client._get_conn:460] Unable connect to node with id 2:
2023-02-04 22:36:58,228 level=ERROR [client._get_conn:460] Unable connect to node with id 2:
2023-02-04 22:37:38,237 level=ERROR [client._get_conn:460] Unable connect to node with id 1:
2023-02-04 22:38:18,242 level=ERROR [client._get_conn:460] Unable connect to node with id 6:
2023-02-04 22:38:58,248 level=ERROR [client._get_conn:460] Unable connect to node with id 4:
2023-02-04 22:39:38,254 level=ERROR [client._get_conn:460] Unable connect to node with id 3:
Expected behaviour
The consumer should be able to recover and reconnect to the brokers, but it does not appear to be doing so.
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"): aiokafka==0.7.2 - kafka-python version (
python -c "import kafka; print(kafka.__version__)"): kafka-python==2.0.2 - Kafka Broker version (
kafka-topics.sh --version): - Other information (Confluent Cloud version, etc.):
I'm having exactly the same problem
Do you have a minimal code to reproduce the error ? It could be network/DNS related, as the exchange between a kafka client and brokers are not as straight forward as other classical TCP/HTTP connection
- a consumer will use the provided "bootstrap servers", in the order they are provided, to discover the cluster
- discovering the cluster will return metadata, that will contains for each topic/partitions which broker is the primary one
- then the consumer uses these information to open new TCP connections and consume messages
Some common issue then could be that the DNS/IP returned as part of the cluster metadata response are getting obsolete or are not reachable
Is there's any programatic way for us to check if it got disconnected and reconnect?