kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Trying to use a Bitnami docker image with kafka-python. The messages does not get to Consumer...

Open Felix-neko opened this issue 2 years ago • 3 comments

Hi folks! And thank you for you great work.

I'm trying to set up some developer's playground to start using kafka-python. To do it, I've tried to use Bitnami's docker repo with Kafka (modified for external connection according to their manuals).

I've modified docker-compose.yml for external connection, started it -- and trying to create a producer and a consumer in Python.

# start_consumer.py
from kafka import KafkaConsumer

if __name__ == "__main__":
    consumer = KafkaConsumer('sample', bootstrap_servers=['localhost:9093'])
    for message in consumer:
        print(message)
# start_producer.py
from kafka import KafkaProducer

if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
    producer.send('sample', b'Hello, World!')
    producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

The topic on Kafka server is created, but my Consumer does not get any messages and prints nothing. How can I fix it?

https://github.com/Felix-neko/kafka_sandbox -- here's my sandbox to reproduce this strange behaviour. Maybe I have some wrong Kafka setup?

Felix-neko avatar Nov 10 '21 21:11 Felix-neko

Hey @Felix-neko have you found how to overcome this?

Beaglefoot avatar Feb 01 '22 14:02 Beaglefoot

Ok, I think I got the idea how this works.

You have 3 options here:

  1. To produce after the consumer is started.
  2. Set group_id and explicitly reset offset for it with (works with kafka 3.0.0, not sure about other versions):
kafka-consumer-groups.sh\
  --bootstrap-server localhost:9092\
  --group MY_GROUP\
  --topic MY_TOPIC\
  --reset-offsets\
  --to-earliest\
  --execute
  1. Set group_id and use auto_offset_reset='earliest'

And although this makes sense in retrospective I really wish this was covered by documentation.

Beaglefoot avatar Feb 01 '22 17:02 Beaglefoot

3. Set group_id and use auto_offset_reset='earliest'

Well, i did in this way:

from kafka import KafkaProducer

if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
    future = producer.send('sample', b'Hello, World!')
    result = future.get(timeout=60)
    print("Sent 1")

    future = producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
    result = future.get(timeout=60)
    print("Sent 2")

Felix-neko avatar Feb 02 '22 11:02 Felix-neko