taskiq-aio-pika icon indicating copy to clipboard operation
taskiq-aio-pika copied to clipboard

bug: Prefetched messages don't fail gracefully on RabbitMQ Connection errors

Open dave-hellopatient opened this issue 1 week ago • 0 comments

I'm not sure if this issue is part of taskiq-aio-pika, aio-pika, or a combination.

Due to temporary network issue, we recently got an aiormq.exceptions.ChannelInvalidStateError from our TaskIQ worker. The full stack trace is below:

2025-11-19 10:50:27.991 | Task event:FinishedCallEventData:save_telnyx_recording_for_call completed successfully. |  
-- | -- | --
  |   | 2025-11-19 10:50:28.107 | [2025-11-19 15:50:28,105][asyncio][ERROR  ][worker-0] Task exception was never retrieved |  
  |   | 2025-11-19 10:50:28.107 | future: <Task finished name='None' coro=<_timed_coro.<locals>.wrapper() done, defined at /packages/hp-o11y/hp_o11y/otel/asyncio/_instrumentation.py:103> exception=ChannelInvalidStateError() created at /packages/hp-o11y/hp_o11y/otel/asyncio/_instrumentation.py:131> |  
  |   | 2025-11-19 10:50:28.107 | source_traceback: Object created at (most recent call last): |  
  |   | 2025-11-19 10:50:28.107 | File "<frozen runpy>", line 198, in _run_module_as_main |  
  |   | 2025-11-19 10:50:28.107 | File "<frozen runpy>", line 88, in _run_code |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/__main__.py", line 79, in <module> |  
  |   | 2025-11-19 10:50:28.107 | main() |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/__main__.py", line 73, in main |  
  |   | 2025-11-19 10:50:28.107 | status = command.exec(sys.argv[1:]) |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/cli/worker/cmd.py", line 27, in exec |  
  |   | 2025-11-19 10:50:28.107 | return run_worker(wargs) |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/cli/worker/run.py", line 212, in run_worker |  
  |   | 2025-11-19 10:50:28.107 | status = manager.start() |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/cli/worker/process_manager.py", line 266, in start |  
  |   | 2025-11-19 10:50:28.107 | action.handle(self.workers, self.args, self.worker_function) |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/cli/worker/process_manager.py", line 88, in handle |  
  |   | 2025-11-19 10:50:28.107 | new_process.start() |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/process.py", line 121, in start |  
  |   | 2025-11-19 10:50:28.107 | self._popen = self._Popen(self) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/context.py", line 224, in _Popen |  
  |   | 2025-11-19 10:50:28.107 | return _default_context.get_context().Process._Popen(process_obj) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/context.py", line 282, in _Popen |  
  |   | 2025-11-19 10:50:28.107 | return Popen(process_obj) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/popen_fork.py", line 20, in __init__ |  
  |   | 2025-11-19 10:50:28.107 | self._launch(process_obj) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/popen_fork.py", line 74, in _launch |  
  |   | 2025-11-19 10:50:28.107 | code = process_obj._bootstrap(parent_sentinel=child_r) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap |  
  |   | 2025-11-19 10:50:28.107 | self.run() |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/multiprocessing/process.py", line 108, in run |  
  |   | 2025-11-19 10:50:28.107 | self._target(*self._args, **self._kwargs) |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/cli/worker/run.py", line 167, in start_listen |  
  |   | 2025-11-19 10:50:28.107 | loop.run_until_complete(receiver.listen(shutdown_event)) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/base_events.py", line 712, in run_until_complete |  
  |   | 2025-11-19 10:50:28.107 | self.run_forever() |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/base_events.py", line 683, in run_forever |  
  |   | 2025-11-19 10:50:28.107 | self._run_once() |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/base_events.py", line 2042, in _run_once |  
  |   | 2025-11-19 10:50:28.107 | handle._run() |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/events.py", line 89, in _run |  
  |   | 2025-11-19 10:50:28.107 | self._context.run(self._callback, *self._args) |  
  |   | 2025-11-19 10:50:28.107 | File "/packages/hp-o11y/hp_o11y/otel/asyncio/_instrumentation.py", line 108, in wrapper |  
  |   | 2025-11-19 10:50:28.107 | return await coro |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/receiver/receiver.py", line 438, in runner |  
  |   | 2025-11-19 10:50:28.107 | task = asyncio.create_task( |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/tasks.py", line 410, in create_task |  
  |   | 2025-11-19 10:50:28.107 | task = loop.create_task(coro, name=name) |  
  |   | 2025-11-19 10:50:28.107 | File "/usr/local/lib/python3.13/asyncio/base_events.py", line 471, in create_task |  
  |   | 2025-11-19 10:50:28.107 | task = self._task_factory(self, coro, **kwargs) |  
  |   | 2025-11-19 10:50:28.107 | File "/packages/hp-o11y/hp_o11y/otel/asyncio/_instrumentation.py", line 131, in _task_factory |  
  |   | 2025-11-19 10:50:28.107 | return asyncio.Task(wrapped, loop=loop, context=context) |  
  |   | 2025-11-19 10:50:28.107 | Traceback (most recent call last): |  
  |   | 2025-11-19 10:50:28.107 | File "/packages/hp-o11y/hp_o11y/otel/asyncio/_instrumentation.py", line 108, in wrapper |  
  |   | 2025-11-19 10:50:28.107 | return await coro |  
  |   | 2025-11-19 10:50:28.107 | ^^^^^^^^^^ |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/receiver/receiver.py", line 184, in callback |  
  |   | 2025-11-19 10:50:28.107 | await maybe_awaitable(message.ack()) |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/taskiq/utils.py", line 29, in maybe_awaitable |  
  |   | 2025-11-19 10:50:28.107 | return await possible_coroutine |  
  |   | 2025-11-19 10:50:28.107 | ^^^^^^^^^^^^^^^^^^^^^^^^ |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/aio_pika/message.py", line 453, in ack |  
  |   | 2025-11-19 10:50:28.107 | await self.channel.basic_ack( |  
  |   | 2025-11-19 10:50:28.107 | ^^^^^^^^^^^^ |  
  |   | 2025-11-19 10:50:28.107 | File "/api/.venv/lib/python3.13/site-packages/aio_pika/message.py", line 391, in channel |  
  |   | 2025-11-19 10:50:28.107 | raise ChannelInvalidStateError |  
  |   | 2025-11-19 10:50:28.107 | aiormq.exceptions.ChannelInvalidStateError

Expected behavior:

  • Execute the on_error callback in middleware
  • Throw away the connection
  • Either discard the task or place it back into RabbitMQ to be picked up by another worker (perhaps configurable? Ideal behavior seems like it'd vary from one application to another)

Observed behavior:

  • The worker process stopped accepting new tasks altogether, and continued to retry the task(s) that errored.
  • Each retry, the basic_ack() call failed with the same exception
  • We were able to clear the issue by deleting the process and restarting it

This may be related to https://github.com/mosquito/aio-pika/issues/312. We set --max-async-tasks, --max-prefetch, and qos (from the AioPikaBroker constructor) to the same value. So what we think happened is that:

  • All our worker processes had prefetched the full number of tasks and had them in memory
  • The connection error prevented the ack() from going through
  • Rather than trying again on a new connection, the *aio-pika libraries kept retrying on the same connection/channel.

Does this sound like a bug? If so, is it in this library or should I log it to the aio-pika one?

Please also tell me if there are some config options we can set or change instead.

Library versions: aio-pika: 9.5.4 taskiq-aio-pika: 0.4.1

dave-hellopatient avatar Nov 19 '25 20:11 dave-hellopatient