aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] Which Partition to use when Performing a Offset Seek using AIOKafkaConsumer?

Open athenawisdoms opened this issue 4 years ago • 6 comments

When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used?

I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified in.

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"
    starting_offset = 3
    
    # Publish some messages
    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()
    for i in range(10):
        await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))

    # Start consuming from a specific offset
    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    consumer.seek(None, starting_offset) # NEED HELP HERE :)

    while True:
        message = await consumer.getone()
        print("message:", message.value)


if __name__ == "__main__":
    asyncio.run(main())

Thanks!

athenawisdoms avatar Aug 05 '21 14:08 athenawisdoms

Messages are consumed for each topic+partition independently, offset is an index in queue for particular topic+partition. Where are going to get starting_offset from? Do you store somewhere the offset where you came so far? Then you have store it along with topic and partition, they all are accessible as fields of ConsumerRecord returned by getone() method.

ods avatar Aug 05 '21 15:08 ods

@ods In my actual code (not the example posted above). I'm getting starting_offset from a cache file that points to a message that acts as a checkpoint/snapshot.

In the above example. I figured that the default partition is 0, however I am having trouble doing the seek when using partition 0.

    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    print(consumer.partitions_for_topic(topic))    # prints `{0}`
    print(consumer.assignment())                   # prints `frozenset({TopicPartition(topic='test', partition=0)})`
    consumer.seek(0, starting_offset)

prints

{0}
frozenset({TopicPartition(topic='test', partition=0)})

telling us its on partition 0, but consumer.seek(0, starting_offset) gives the error

raise IllegalStateError( kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition 0

Why is the error saying that the consumer not assigned to partition 0?

athenawisdoms avatar Aug 06 '21 04:08 athenawisdoms

Just realized what you meant by "topic+partition"!

Once I did the following, it now works

    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    for tp in consumer.assignment():
        consumer.seek(tp, starting_offset)

Hope I got this right, thanks!

athenawisdoms avatar Aug 06 '21 04:08 athenawisdoms

Your code is right, but it's unreliable: consumer.assignment() may return empty or incomplete result shortly after the start or during rebalancing. A possible solution could be calling await consumer.assign([tp]) explicitly.

ods avatar Aug 06 '21 07:08 ods

@ods Gave your suggestion a try, but its giving an error

    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    for tp in consumer.assignment():
        await self.consumer_ob.assign([tp])     # giving an error
        consumer.seek(tp, starting_offset)

gives the error

kafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive

Did I miss out on something?

athenawisdoms avatar Aug 07 '21 19:08 athenawisdoms

Subscription to topics assumes automatic partition assignment, while you assign them manually. Just provide no topics on AIOKafkaConsumer initialisation.

The for loop looks very strange. Why do you get tp from assignment()? You should have saved topic+partition from previous run, so it should come from the same source as starting_offset. Then you assign subscription for single topic to single partition, set offset and discard the result of this jobs on the next iteration.

ods avatar Aug 10 '21 19:08 ods