kafka-python
kafka-python copied to clipboard
KafkaConsumer - get the latest N messages
import json
import time
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
def get_end_offsets(consumer, topic) -> dict:
partitions_for_topic = consumer.partitions_for_topic(topic)
if partitions_for_topic:
partitions = []
for partition in consumer.partitions_for_topic(topic):
partitions.append(TopicPartition(topic, partition))
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.end_offsets
# Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
end_offsets = consumer.end_offsets(partitions)
return end_offsets
if __name__ == '__main__':
last_n_msg = 2
kafka_server = "localhost:9092"
# consumer
consumer = KafkaConsumer(
bootstrap_servers=kafka_server,
consumer_timeout_ms=9000)
end_offsets = get_end_offsets(consumer, "topic_1")
consumer.assign([*end_offsets])
for key_partition, value_end_offset in end_offsets.items():
new_calculated_offset = value_end_offset - last_n_msg
new_offset = new_calculated_offset if new_calculated_offset >= 0 else 0
consumer.seek(key_partition, new_offset)
# producer
producer = KafkaProducer(bootstrap_servers=kafka_server,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
future_1 = producer.send(topic="topic_1",
value={'key': 'value1'})
producer_1 = future_1.get(timeout=10)
print(f'producer_1: offset: {producer_1.offset}, partition: {producer_1.offset}')
producer.flush()
producer.close()
# -----------
time.sleep(1)
# -----------
print(f'consuming...')
for msg in consumer:
print(f'consumer: {msg}')