aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

`AIOKafkaProducer` appears to hang forever if Kafka is down for `send_and_wait` and `stop()`

Open MrCreosote opened this issue 1 year ago • 2 comments

Describe the bug If the Kafka server is down, send()ing a message and awaiting the results has two effects:

  1. A log is sent every retry_backoff_ms
    • This is pretty spammy at the default 100ms
  2. The await hangs apparently forever

If the result is asyncio.wait_for()'d with a timeout so that the thread can eventually continue, and then stop() is called on the producer:

  • The logs continue to be sent
  • The stop() hangs apparently forever

Expected behaviour I would expect that both awaiting the result of a send() call (or awaiting a send_and_wait() call) and a stop() call eventually time out.

Environment (please complete the following information):

  • aiokafka version: 0.12.0
  • Kafka Broker version: Tested with 3.9.0 and 4.0.0
  • Other information: n/a

Reproducible example

"""
Test what happens when kafka is down and the notifier sends a message.
"""

import asyncio
from aiokafka import AIOKafkaProducer
import traceback


async def main():
    ###
    # Kafka is running here
    ###
    kp = AIOKafkaProducer(
        bootstrap_servers=f"localhost:9092",
        enable_idempotence=True,
        acks='all',
        request_timeout_ms=5000,
        retry_backoff_ms=1000
    )
    await kp.start()
    print("sending 1st message")
    future = await kp.send("mytopic", b"foo1")
    print("done")
    await future

    ###
    # Kafka is stopped here
    ###
    print("sending 2nd message")
    future = await kp.send("mytopic", b"foo2")
    print("done")
    try:
        await asyncio.wait_for(future, 6)
    except TimeoutError as e:
        traceback.print_exception(e)
    print("stopping client")
    await kp.stop()
    print('done"')


if __name__ == "__main__":
    asyncio.run(main())

The above results in the output:

sending 1st message
Topic mytopic not found in cluster metadata
done
sending 2nd message
done
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Traceback (most recent call last):
  File "/usr/lib/python3.11/asyncio/tasks.py", line 490, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/<user>/test_manual/aiokafka_test.py", line 36, in main
    await asyncio.wait_for(future, 6)
  File "/usr/lib/python3.11/asyncio/tasks.py", line 492, in wait_for
    raise exceptions.TimeoutError() from exc
TimeoutError
stopping client
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Unable to update metadata from [1]
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)
Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
Unable connect to node with id 1: [Errno 111] Connect call failed ('127.0.0.1', 54301)

*** continues seemingly forever ***

MrCreosote avatar Mar 25 '25 22:03 MrCreosote

+1 on this. Any way to stop or exit after not connecting certain amount of time

jainal09 avatar Apr 09 '25 17:04 jainal09

I would agree this is a sensible suggestion - calling .stop() should cause exceptions to be raised on all outstanding futures and the stop call itself should return once the connection attempts are finished.

shuckc avatar Apr 25 '25 09:04 shuckc