confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Issue: delivery_future never resolves without explicit flush()

Open doctorSrcipt opened this issue 1 day ago • 0 comments

Hello team!

Problem The official example shows delivery_future should resolve without calling flush():

from confluent_kafka.aio import AIOProducer  # Note: stable module, not experimental

delivery_future = await producer.produce(args.topic, value=serialized_value)
msg = await delivery_future  # ← Works without flush()

However, with experimental.aio.AIOProducer, the delivery_future hangs indefinitely:

from confluent_kafka.experimental.aio import AIOProducer

producer = AIOProducer({"bootstrap.servers": "..."})

delivery_future = await producer.produce("topic", value=b"message")
msg = await delivery_future  # ← HANGS FOREVER 

"""
I've also tried:
`AIOProducer(producer_config, batch_size=1, buffer_timeout=0.0)`
but the result was the same.
"""

Environment

  • Package: confluent-kafka==2.12.2
  • Module: confluent_kafka.experimental.aio.AIOProducer
  • Python: 3.13.7

Current Workaround Explicit flush() is required:

delivery_future = await producer.produce(topic, value=value)
await producer.flush()  # Required to resolve delivery_future
msg = await delivery_future  # Now works

This defeats batching purpose and contradicts the official example.

Questions

  • Is the example for a newer stable confluent_kafka.aio.AIOProducer (not yet released)?
  • Should experimental.aio.AIOProducer require explicit flush() per message?
  • The BufferTimeoutManager has auto-flush background task - why doesn't it resolve delivery_future?

Expected The BufferTimeoutManager background task should auto-flush after buffer_timeout (default 1.0s) and resolve delivery_future automatically, matching the official example behavior.

doctorSrcipt avatar Dec 11 '25 14:12 doctorSrcipt