kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

KafkaConsumer - get the latest N messages

Open joseboretto opened this issue 3 years ago • 0 comments

import json
import time

from kafka import KafkaConsumer, KafkaProducer, TopicPartition


def get_end_offsets(consumer, topic) -> dict:
    partitions_for_topic = consumer.partitions_for_topic(topic)
    if partitions_for_topic:
        partitions = []
        for partition in consumer.partitions_for_topic(topic):
            partitions.append(TopicPartition(topic, partition))
        # https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.end_offsets
        # Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
        end_offsets = consumer.end_offsets(partitions)
        return end_offsets


if __name__ == '__main__':
    last_n_msg = 2
    kafka_server = "localhost:9092"
    # consumer
    consumer = KafkaConsumer(
        bootstrap_servers=kafka_server,
        consumer_timeout_ms=9000)
    end_offsets = get_end_offsets(consumer, "topic_1")
    consumer.assign([*end_offsets])
    for key_partition, value_end_offset in end_offsets.items():
        new_calculated_offset = value_end_offset - last_n_msg
        new_offset = new_calculated_offset if new_calculated_offset >= 0 else 0
        consumer.seek(key_partition, new_offset)
    # producer
    producer = KafkaProducer(bootstrap_servers=kafka_server,
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    future_1 = producer.send(topic="topic_1",
                             value={'key': 'value1'})
    producer_1 = future_1.get(timeout=10)
    print(f'producer_1: offset: {producer_1.offset}, partition: {producer_1.offset}')
    producer.flush()
    producer.close()
    # -----------
    time.sleep(1)
    # -----------
    print(f'consuming...')
    for msg in consumer:
        print(f'consumer: {msg}')


joseboretto avatar Aug 09 '21 15:08 joseboretto