aiokafka
aiokafka copied to clipboard
[QUESTION] on_partitions_assigned & getone/getmany
ConsumerRebalanceListener.on_partitions_assigned docstring says:
This method will be called *after* partition re-assignment completes
and *before* the consumer starts fetching data again.
I assume this follows original Java API description:
ConsumerRebalanceListener.onPartitionsAssigned
This method will be called after the partition re-assignment completes and before the consumer starts fetching data, and only as the result of a poll(long) call.
My interpretation of both doc is "the consumer will not consume from a newly assigned partition before the callback completes execution".
As I observe in my tests, this is not true at least for aiokafka. Here is the line where fetcher is essentially unblocked from reading, https://github.com/aio-libs/aiokafka/blob/fff2c021a57291a7484200a5f6b9e82611e59ed9/aiokafka/consumer/group_coordinator.py#L449 while listener is called 20 lines below.
Can anyone clarify, what is the desired behavior?
Thanks, Alexander
make an await call inside your re-assignment handler.
I am stumbling to the same problem
It won't solve the issue, @cal97g the handler is called from the group coordinator task, that is different from the "task" that is performing the pool. As soon as the handler is calling a first "await", the event loop will be free to run again, so the fetch will be performed while the full handler is maybe not completed yet
Here to reproduce
import asyncio
from aiokafka import AIOKafkaConsumer, ConsumerRebalanceListener
class Listener(ConsumerRebalanceListener):
async def on_partitions_revoked(self, revoked):
pass
async def on_partitions_assigned(self, assigned):
print("Starting assigned")
await asyncio.sleep(3)
print("Do A")
await asyncio.sleep(3)
print("Do B")
async def run():
consumer = AIOKafkaConsumer(
bootstrap_servers="kafka:9092",
enable_auto_commit=False,
auto_offset_reset="earliest",
group_id="test"
)
await consumer.start()
consumer.subscribe(topics=["contents"], listener=Listener())
res = await consumer.getone()
print("I got one")
print(res)
if __name__ == "__main__":
asyncio.run(run())
According the doc, we should have
Starting assigned
Do A
Do B
I got one
<the record>
what I am seeing is
Starting assigned
I got one
<the record>
Do A
Do B
My current workaround is to have my listener to produce task/futures that my consumer pool can wait on, so I can fully wait for the listener to complete before I am moving on consuming
example
import asyncio
from asyncio import Event
from aiokafka import AIOKafkaConsumer, ConsumerRebalanceListener
class Listener(ConsumerRebalanceListener):
def __init__(self):
self._ready = Event()
async def on_partitions_revoked(self, revoked):
self._ready.clear()
async def on_partitions_assigned(self, assigned):
print("Starting assigned")
await asyncio.sleep(3)
print("Do A")
await asyncio.sleep(3)
print("Do B")
self._ready.set()
async def wait_assignment(self):
await self._ready.wait()
async def run():
consumer = AIOKafkaConsumer(
bootstrap_servers="kafka:9092",
enable_auto_commit=False,
auto_offset_reset="earliest",
group_id="test"
)
await consumer.start()
listener = Listener()
consumer.subscribe(topics=["contents"], listener=listener)
res = await consumer.getone()
print("I got one, but I will wait")
await listener.wait_assignment()
print("Ok, good to go")
print(res)
if __name__ == "__main__":
asyncio.run(run())