Dana Powers
Dana Powers
fwiw, there is an old (unsupported) context manager in kafka.context that was designed to manage consumer offsets w/ SimpleConsumer.
Another alternative would be: ``` import contextlib from kafka import KafkaProducer with contextlib.closing(KafkaProducer(bootstrap_servers='localhost:1234')) as producer: for _ in range(100): producer.send('foobar', b'some_message_bytes') ```
perhaps. but omar was a close friend who passed away and I have a hard time closing this for personal reasons.
Peace brother - merged #2560 w/ console interfaces.
Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
Do you have logs with more details? The expected behavior is that there is no IO blocking within poll() that lasts longer than the timeout value. If timeout is 0...
https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
Currently requires a `NewPartitions` object instance. This should work: ``` from kafka.admin.new_partitions import NewPartitions # Increase the partition count for 'foo_topic' to 6. admin_client.create_partitions({'foo_topic': NewPartitions(total_count=6)}) ```