confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Consumer from unknown topic stuck

Open HenriqueMAlmeida opened this issue 1 year ago • 0 comments

Description

I am trying to create a consumer subscribed to more than one topic.

Some of the topics may not exist which appears to create an issue with the remaining topics. In the example below, I have subscribed to an existing topic (already with messages in the queue) and another topic that does not exist.

I have enabled the "allow.auto.create.topics" option, which I expected it would allow the consumer to create the topic.

However, the inexistent topic does not appear to be created and the consumer sends an error message saying it found an unknown topic. The consumer also stays stuck in the polling, without reading the messages from the other topic.

Changing the order of subscription, subscribing to the existing topic last, appears to change the behavior of the consumer, and all the messages from the existing topic are read.

Is this an expected behavior, should I change any more settings to allow the consumer to create the topic himself?

Thank you!

How to reproduce

import uuid

from confluent_kafka.cimpl import KafkaError, Consumer

kafka_config = {
    "allow.auto.create.topics": True,
    "auto.offset.reset": "earliest",
    "bootstrap.servers": "servers",
    "enable.auto.commit": False,
    "group.id": str(uuid.uuid4()),
}

consumer = Consumer({**kafka_config})

consumer.subscribe(["existing-topic"])
consumer.subscribe(["inexistent-topic"])

while True:
    msg = consumer.poll(timeout=1.0)
    print(msg)

    if msg is None:
        continue

    if msg.error():
        print(msg.error())

    print(f"Received message: {msg.value()}")`

python: 3.10 version: ('2.4.0', 33816576) libversion: ('2.4.0', 33816831) Broker version: 3.0.1

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [x] Apache Kafka broker version:
  • [ ] Client configuration: {...}
  • [ ] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

HenriqueMAlmeida avatar May 16 '24 11:05 HenriqueMAlmeida