confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

[Question] Reacting to deletion of a topic being read through a consumer

Open shubhanshu02 opened this issue 10 months ago • 0 comments

I encountered a case where while I was reading through a topic, the topic was deleted and then created from the beginning. My consumer using confluent_kafka looks somewhat like this:

from confluent_kafka import Consumer

consumer = Consumer(config)
consumer.assign([TopicPartition(topic, partition, offset)])
while True:
    msgs = self._consumer.consume(1000, 0.1)
    for msg in msgs:
        ...

Case happening here:

  1. The topic is single-partitioned.
  2. When the topic is deleted, it is recreated within a few seconds and then ingestion starts.
  3. The consumer does not crash on consume() and continues to return empty msgs list.

Currently, I only receive the following line on the STDERR when the topic is deleted. Is it possible from the consumer side to know if such event occured, so that the program could react to this event?

%5|1737553425.511|PARTCNT|rdkafka#consumer-3| [thrd:main]: Topic <topic> partition count changed from 1 to 0

shubhanshu02 avatar Jan 22 '25 14:01 shubhanshu02