kafka-python
kafka-python copied to clipboard
Can I use poll and iterate in kafka-python together?
I am working with kafka-python. I would like to fetch big amount data from kafka topic till its empty. Then do some action. Then start consuming online new events from that topic. My kafka-python implementation:
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers = [my-bootstrap-server],
auto_offset_reset = 'earliest',
enable_auto_commit = False,
value_deserializer = lambda x: json.loads(x.decode('utf-8'))
)
while True:
x = consumer.poll(timeout_ms = 5 * 1000, max_records = 1000)
if not x: break
// some action with x
// consune new events
for y in consumer:
// some action with y
New events are not so often so the 5s poll timeout is ok. Is this design ok? Or there are drawbacks? In the documentation for poll I can see that it is incompatible with iterator interface..
You can dump a consumer into your own list object, but you may want to check the topic's high watermark before you start, rather than rely on the consumer to end for checking "emptiness"