confluent-kafka-python
confluent-kafka-python copied to clipboard
Issue: delivery_future never resolves without explicit flush()
Hello team!
Problem
The official example shows delivery_future should resolve without calling flush():
from confluent_kafka.aio import AIOProducer # Note: stable module, not experimental
delivery_future = await producer.produce(args.topic, value=serialized_value)
msg = await delivery_future # ← Works without flush()
However, with experimental.aio.AIOProducer, the delivery_future hangs indefinitely:
from confluent_kafka.experimental.aio import AIOProducer
producer = AIOProducer({"bootstrap.servers": "..."})
delivery_future = await producer.produce("topic", value=b"message")
msg = await delivery_future # ← HANGS FOREVER
"""
I've also tried:
`AIOProducer(producer_config, batch_size=1, buffer_timeout=0.0)`
but the result was the same.
"""
Environment
- Package:
confluent-kafka==2.12.2 - Module:
confluent_kafka.experimental.aio.AIOProducer - Python: 3.13.7
Current Workaround
Explicit flush() is required:
delivery_future = await producer.produce(topic, value=value)
await producer.flush() # Required to resolve delivery_future
msg = await delivery_future # Now works
This defeats batching purpose and contradicts the official example.
Questions
- Is the example for a newer stable confluent_kafka.aio.AIOProducer (not yet released)?
- Should experimental.aio.AIOProducer require explicit flush() per message?
- The BufferTimeoutManager has auto-flush background task - why doesn't it resolve delivery_future?
Expected The BufferTimeoutManager background task should auto-flush after buffer_timeout (default 1.0s) and resolve delivery_future automatically, matching the official example behavior.