taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

max-prefetch argument doesn't work as expected

Open bashirmindee opened this issue 1 month ago • 1 comments

Thank you for this amazing project.

The max-prefetch argument doesn't work as expected. If there are tasks to be consumed, the prefetch queue will be bigger than the argument max-prefetch by 1.

Look at this part of the code in the receiver.prefetcher here; and consider the max-prefetch to be 0.

        while True:
            if finish_event.is_set():
                break
            try:
                await self.sem_prefetch.acquire()
                if (
                    self.max_tasks_to_execute
                    and fetched_tasks >= self.max_tasks_to_execute
                ):
                    logger.info("Max number of tasks executed.")
                    break
                # Here we wait for the message to be fetched,
                # but we make it with timeout so it can be interrupted
                done, _ = await asyncio.wait({current_message}, timeout=0.3)
                # If the message is not fetched, we release the semaphore
                # and continue the loop. So it will check if finished event was set.
                if not done:
                    self.sem_prefetch.release()
                    continue
                # We're done, so now we need to check
                # whether task has returned an error.
                message = current_message.result()
                ## << consider the case where we already consumed a message from the previous iteration
                ## << and so message is an Ackable instance
                current_message = asyncio.create_task(iterator.__anext__())  
                ## << when the interpreter arrives here, the task to consume another message is created but not awaited
                ## << so we don't have control over when it will start consuming via the broker.
                ## << usually the broker will have enough time to consume a request and it will yield its value after the next await (line done, _ = await asyncio.wait({current_message}, timeout=0.3)
                fetched_tasks += 1
                await queue.put(message)
            except (asyncio.CancelledError, StopAsyncIteration):
                break

I think we shouldn't create a task an wait for it separately like we currently do. we should something like:

        while True:
            if finish_event.is_set():
                break
            try:
                await self.sem_prefetch.acquire()
                if (
                    self.max_tasks_to_execute
                    and fetched_tasks >= self.max_tasks_to_execute
                ):
                    logger.info("Max number of tasks executed.")
                    break
                # Here we wait for the message to be fetched,
                # but we make it with timeout so it can be interrupted
                ## << create current task and consume it right away
                current_message = asyncio.create_task(iterator.__anext__())  # type: ignore
                done, _ = await asyncio.wait({current_message}, timeout=0.3)
                # If the message is not fetched, we release the semaphore
                # and continue the loop. So it will check if finished event was set.
                if not done:
                    self.sem_prefetch.release()
                    continue
                # We're done, so now we need to check
                # whether task has returned an error.
                message = current_message.result()
                
                fetched_tasks += 1
                await queue.put(message)
            except (asyncio.CancelledError, StopAsyncIteration):
                break

I can do a PR and do a proper fix if you agree

bashirmindee avatar Oct 29 '25 15:10 bashirmindee