aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] on_partitions_assigned & getone/getmany

Open sachako opened this issue 3 years ago • 3 comments

ConsumerRebalanceListener.on_partitions_assigned docstring says:

    This method will be called *after* partition re-assignment completes
    and *before* the consumer starts fetching data again.

I assume this follows original Java API description:

ConsumerRebalanceListener.onPartitionsAssigned

    This method will be called after the partition re-assignment completes and before the consumer starts fetching data, and only as the result of a poll(long) call.

My interpretation of both doc is "the consumer will not consume from a newly assigned partition before the callback completes execution".

As I observe in my tests, this is not true at least for aiokafka. Here is the line where fetcher is essentially unblocked from reading, https://github.com/aio-libs/aiokafka/blob/fff2c021a57291a7484200a5f6b9e82611e59ed9/aiokafka/consumer/group_coordinator.py#L449 while listener is called 20 lines below.

Can anyone clarify, what is the desired behavior?

Thanks, Alexander

sachako avatar Apr 19 '21 14:04 sachako

make an await call inside your re-assignment handler.

cal97g avatar Mar 09 '22 12:03 cal97g

I am stumbling to the same problem

It won't solve the issue, @cal97g the handler is called from the group coordinator task, that is different from the "task" that is performing the pool. As soon as the handler is calling a first "await", the event loop will be free to run again, so the fetch will be performed while the full handler is maybe not completed yet

Here to reproduce

import asyncio

from aiokafka import AIOKafkaConsumer, ConsumerRebalanceListener


class Listener(ConsumerRebalanceListener):

    async def on_partitions_revoked(self, revoked):
        pass

    async def on_partitions_assigned(self, assigned):
        print("Starting assigned")
        await asyncio.sleep(3)
        print("Do A")
        await asyncio.sleep(3)
        print("Do B")


async def run():
    consumer = AIOKafkaConsumer(
        bootstrap_servers="kafka:9092",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        group_id="test"
    )
    await consumer.start()

    consumer.subscribe(topics=["contents"], listener=Listener())

    res = await consumer.getone()
    print("I got one")
    print(res)

if __name__ == "__main__":
    asyncio.run(run())

According the doc, we should have

Starting assigned
Do A
Do B
I got one
<the record>

what I am seeing is

Starting assigned
I got one
<the record>
Do A
Do B

vmaurin avatar Jan 31 '23 11:01 vmaurin

My current workaround is to have my listener to produce task/futures that my consumer pool can wait on, so I can fully wait for the listener to complete before I am moving on consuming

example

import asyncio
from asyncio import Event

from aiokafka import AIOKafkaConsumer, ConsumerRebalanceListener


class Listener(ConsumerRebalanceListener):

    def __init__(self):
        self._ready = Event()

    async def on_partitions_revoked(self, revoked):
        self._ready.clear()

    async def on_partitions_assigned(self, assigned):
        print("Starting assigned")
        await asyncio.sleep(3)
        print("Do A")
        await asyncio.sleep(3)
        print("Do B")
        self._ready.set()

    async def wait_assignment(self):
        await self._ready.wait()


async def run():
    consumer = AIOKafkaConsumer(
        bootstrap_servers="kafka:9092",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        group_id="test"
    )
    await consumer.start()

    listener = Listener()
    consumer.subscribe(topics=["contents"], listener=listener)

    res = await consumer.getone()
    print("I got one, but I will wait")
    await listener.wait_assignment()
    print("Ok, good to go")
    print(res)


if __name__ == "__main__":
    asyncio.run(run())

vmaurin avatar Jan 31 '23 12:01 vmaurin