confluent-kafka-python
confluent-kafka-python copied to clipboard
flush leaves messages remaining
Description
I call flush in my setup and expect the message to be sent, and no warning produced, but a simple test of having one message produced, then a flush, leaves the following output
>>> print(KafkaProducer().flush())
%4|1726771494.331|TERMINATE|rdkafka#producer-8| [thrd:app]: Producer terminating with 1 message (3328 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
None
You see that None is returned indicating the messages are sent, but I still get the Producer terminating message. Not sure how best to proceed.
How to reproduce
Produce a message with a payload, don't poll, but call flush
>>> print(KafkaProducer().flush())
%4|1726771494.331|TERMINATE|rdkafka#producer-8| [thrd:app]: Producer terminating with 1 message (3328 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
None
Checklist
Please provide the following information:
- [X] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):('2.5.0', 33882112),('2.5.0', 33882367) - [X] Apache Kafka broker version:
3.5.1through AWS MSK - [X] Client configuration:
{ "api.version.request": False, "bootstrap.servers": 'msk_server_1,msk_server_2', "security.protocol": "SSL", "ssl.ca.location": 'path/to/local/ssl/cert/, } - [X] Operating system:
Debian - [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
Workaround for you
while len(producer): producer.flush()
Yes it's confusing because flush pushes into the internal queue but doesn't guarantee that the message is sent out. I've proposed we add a flag to flush to enable it to block until the produce internal queue is emptied.