confluent-kafka-python
confluent-kafka-python copied to clipboard
Consumer doesn't detect that it cannot consume from the topic
Description
When a topic is destroyed, but recreated quickly enough, confluent kafka seems to a) believe that the topic is healthy but b) not be able to consume any messages from said topic or providing any errors.
Additionally there doesn't seem to be any indication either in the topic or the assignment that the kafka consumer is lost, nor does it call the loss handler, and the offset on the TopicPartition object doesn't indicate an error.
How to reproduce
TL; DR :- Start a consumer on a topic; recreate the topic.
To make it easier I've created a setup with docker compose and python to replicate the issue:
- Spin up the associated compose file, which will
- spin up kafka/zookeeper/kafka-ui
- create
test_topic - publish
datetotest_topicevery 15 seconds
- Run the attached python file
python check_topics.py <niter>, this will- drain any redundant data
- poll the kafka consumer every 5 seconds up to
nitertimes (default 5) - print out the results (so every 3/4 prints we should see a date)
- Recreate
test_topic(rerunning the init-topics container should do it)
Checklist
Please provide the following information:
-
[ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):('2.5.0', 33882112) -
[ ] Apache Kafka broker version: 3.7.1
-
[] Client configuration:
{ "bootstrap.servers": "localhost:9092", "group.id": uuid.uuid4(), "auto.offset.reset": "earliest", "enable.auto.commit": False, "max.poll.interval.ms": 6000, "session.timeout.ms": 6000, "connections.max.idle.ms": 6000, "metadata.max.age.ms": 6000, "debug": "consumer", } -
[ ] Operating system: Mac
-
[ ] Provide client logs (with
'debug': '..'as necessary)
%5|1721753630.136|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic test_topic partition count changed from 1 to 0
- [ ] Provide broker log excerpts
2024-07-23 12:53:51 [2024-07-23 16:53:51,137] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test_topic-0) (kafka.server.ReplicaFetcherManager)
- [ ] Critical issue
Any update on this? Have come across it as well
The library should handle partition count changed from 1 to 0 more gracefully. That being said I'm not sure what the expected behavior is from a topic deletion and recreation without client restart. I'll mark this as an enhancement so that the internal state could be reset but it's also somewhere close to an undefined behavior area as topics aren't usually deleted and recreated within an application setting.