kafka-python
kafka-python copied to clipboard
Trying to use a Bitnami docker image with kafka-python. The messages does not get to Consumer...
Hi folks! And thank you for you great work.
I'm trying to set up some developer's playground to start using kafka-python. To do it, I've tried to use Bitnami's docker repo with Kafka (modified for external connection according to their manuals).
I've modified docker-compose.yml for external connection, started it -- and trying to create a producer and a consumer in Python.
# start_consumer.py
from kafka import KafkaConsumer
if __name__ == "__main__":
consumer = KafkaConsumer('sample', bootstrap_servers=['localhost:9093'])
for message in consumer:
print(message)
# start_producer.py
from kafka import KafkaProducer
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
The topic on Kafka server is created, but my Consumer does not get any messages and prints nothing. How can I fix it?
https://github.com/Felix-neko/kafka_sandbox -- here's my sandbox to reproduce this strange behaviour. Maybe I have some wrong Kafka setup?
Hey @Felix-neko have you found how to overcome this?
Ok, I think I got the idea how this works.
You have 3 options here:
- To produce after the consumer is started.
- Set
group_id
and explicitly reset offset for it with (works with kafka 3.0.0, not sure about other versions):
kafka-consumer-groups.sh\
--bootstrap-server localhost:9092\
--group MY_GROUP\
--topic MY_TOPIC\
--reset-offsets\
--to-earliest\
--execute
- Set
group_id
and useauto_offset_reset='earliest'
And although this makes sense in retrospective I really wish this was covered by documentation.
3. Set
group_id
and useauto_offset_reset='earliest'
Well, i did in this way:
from kafka import KafkaProducer
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers=['localhost:9093'])
future = producer.send('sample', b'Hello, World!')
result = future.get(timeout=60)
print("Sent 1")
future = producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
result = future.get(timeout=60)
print("Sent 2")