aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

kafka consumer never reconnects

Open cmflynn opened this issue 2 years ago • 3 comments

Describe the bug to the best of my knowledge, the consumer is somehow dropping a connection to the kakfa brokers, and when it tries to reconnect it also seems to completely block the active event loop :(

heres how the logs look on the api:

2023-02-04 22:31:05,694 level=ERROR    [client._get_conn:460] Unable connect to node with id 4: 
2023-02-04 22:31:18,166 level=ERROR    [client._get_conn:460] Unable connect to node with id 1: 
2023-02-04 22:32:55,445 level=ERROR    [client._get_conn:460] Unable connect to node with id 5: 
2023-02-04 22:33:35,451 level=ERROR    [client._get_conn:460] Unable connect to node with id 6: 
2023-02-04 22:34:15,457 level=ERROR    [client._get_conn:460] Unable connect to node with id 4: 
2023-02-04 22:34:55,462 level=ERROR    [client._get_conn:460] Unable connect to node with id 3: 
2023-02-04 22:35:35,468 level=ERROR    [client._get_conn:460] Unable connect to node with id 2: 
2023-02-04 22:36:58,228 level=ERROR    [client._get_conn:460] Unable connect to node with id 2: 
2023-02-04 22:37:38,237 level=ERROR    [client._get_conn:460] Unable connect to node with id 1: 
2023-02-04 22:38:18,242 level=ERROR    [client._get_conn:460] Unable connect to node with id 6: 
2023-02-04 22:38:58,248 level=ERROR    [client._get_conn:460] Unable connect to node with id 4: 
2023-02-04 22:39:38,254 level=ERROR    [client._get_conn:460] Unable connect to node with id 3: 

Expected behaviour

The consumer should be able to recover and reconnect to the brokers, but it does not appear to be doing so.

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): aiokafka==0.7.2
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"): kafka-python==2.0.2
  • Kafka Broker version (kafka-topics.sh --version):
  • Other information (Confluent Cloud version, etc.):

cmflynn avatar Feb 06 '23 17:02 cmflynn

I'm having exactly the same problem

Corfucinas avatar Nov 29 '23 14:11 Corfucinas

Do you have a minimal code to reproduce the error ? It could be network/DNS related, as the exchange between a kafka client and brokers are not as straight forward as other classical TCP/HTTP connection

  • a consumer will use the provided "bootstrap servers", in the order they are provided, to discover the cluster
  • discovering the cluster will return metadata, that will contains for each topic/partitions which broker is the primary one
  • then the consumer uses these information to open new TCP connections and consume messages

Some common issue then could be that the DNS/IP returned as part of the cluster metadata response are getting obsolete or are not reachable

vmaurin avatar Nov 29 '23 14:11 vmaurin

Is there's any programatic way for us to check if it got disconnected and reconnect?

Corfucinas avatar Nov 29 '23 18:11 Corfucinas