aiokafka
aiokafka copied to clipboard
[QUESTION] Which Partition to use when Performing a Offset Seek using AIOKafkaConsumer?
When trying to let an AIOKafkaConsumer start reading messages from a specific offset starting_offset, how do we know which partition to be used?
I am trying to use the AIOKafkaConsumer.seek method, but it requires a TopicPartition to be specified in.
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
starting_offset = 3
# Publish some messages
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
for i in range(10):
await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))
# Start consuming from a specific offset
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
consumer.seek(None, starting_offset) # NEED HELP HERE :)
while True:
message = await consumer.getone()
print("message:", message.value)
if __name__ == "__main__":
asyncio.run(main())
Thanks!
Messages are consumed for each topic+partition independently, offset is an index in queue for particular topic+partition. Where are going to get starting_offset from? Do you store somewhere the offset where you came so far? Then you have store it along with topic and partition, they all are accessible as fields of ConsumerRecord returned by getone() method.
@ods In my actual code (not the example posted above). I'm getting starting_offset from a cache file that points to a message that acts as a checkpoint/snapshot.
In the above example. I figured that the default partition is 0, however I am having trouble doing the seek when using partition 0.
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
print(consumer.partitions_for_topic(topic)) # prints `{0}`
print(consumer.assignment()) # prints `frozenset({TopicPartition(topic='test', partition=0)})`
consumer.seek(0, starting_offset)
prints
{0}
frozenset({TopicPartition(topic='test', partition=0)})
telling us its on partition 0, but consumer.seek(0, starting_offset) gives the error
raise IllegalStateError( kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition 0
Why is the error saying that the consumer not assigned to partition 0?
Just realized what you meant by "topic+partition"!
Once I did the following, it now works
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
for tp in consumer.assignment():
consumer.seek(tp, starting_offset)
Hope I got this right, thanks!
Your code is right, but it's unreliable: consumer.assignment() may return empty or incomplete result shortly after the start or during rebalancing. A possible solution could be calling await consumer.assign([tp]) explicitly.
@ods Gave your suggestion a try, but its giving an error
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
for tp in consumer.assignment():
await self.consumer_ob.assign([tp]) # giving an error
consumer.seek(tp, starting_offset)
gives the error
kafka.errors.IllegalStateError: IllegalStateError: Subscription to topics, partitions and pattern are mutually exclusive
Did I miss out on something?
Subscription to topics assumes automatic partition assignment, while you assign them manually. Just provide no topics on AIOKafkaConsumer initialisation.
The for loop looks very strange. Why do you get tp from assignment()? You should have saved topic+partition from previous run, so it should come from the same source as starting_offset. Then you assign subscription for single topic to single partition, set offset and discard the result of this jobs on the next iteration.