aiida-core icon indicating copy to clipboard operation
aiida-core copied to clipboard

test_calc_job.py::test_restart_after_daemon_reset failed with kiwipy.UnroutableError

Open danielhollas opened this issue 1 week ago • 2 comments

Just saw this potentially significant problem in a test run here:

https://github.com/aiidateam/aiida-core/actions/runs/20044237634/job/57485873542?pr=7142

Here's the full pytest stacktrace

_______________________ test_restart_after_daemon_reset ________________________
[gw0] linux -- Python 3.13.9 /home/runner/work/aiida-core/aiida-core/.venv/bin/python3

self = <kiwipy.rmq.communicator.RmqCommunicator object at 0x7f90dbb58940>
task = {'args': {'nowait': False, 'pid': 1634, 'tag': None}, 'task': 'continue'}
no_reply = True

    async def task_send(self, task, no_reply=False):
        try:
            task_queue = await self.get_default_task_queue()
>           result = await task_queue.task_send(task, no_reply)

.venv/lib/python3.13/site-packages/kiwipy/rmq/communicator.py:526: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.13/site-packages/kiwipy/rmq/tasks.py:437: in task_send
    return await self._publisher.task_send(task, no_reply)
.venv/lib/python3.13/site-packages/kiwipy/rmq/tasks.py:381: in task_send
    published = await self.publish(task_msg, routing_key=self._task_queue_name, mandatory=True)
.venv/lib/python3.13/site-packages/kiwipy/rmq/messages.py:208: in publish
    result = await self._exchange.publish(message, routing_key=routing_key, mandatory=mandatory)
.venv/lib/python3.13/site-packages/aio_pika/exchange.py:199: in publish
    return await channel.basic_publish(
.venv/lib/python3.13/site-packages/aiormq/channel.py:699: in basic_publish
    return await countdown(confirmation)
.venv/lib/python3.13/site-packages/aiormq/tools.py:95: in __call__
    return await coro
/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/asyncio/futures.py:286: in __await__
    yield self  # This tells Task to wait for completion.
/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/asyncio/tasks.py:375: in __wakeup
    future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future finished exception=<PublishError: 'NO_ROUTE' for routing key 'aiida-e2d8e4262c4145d9b8cea5eed6b9017f.process.queue'> created at /opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/asyncio/base_events.py:459>

    def result(self):
        """Return the result this future represents.
    
        If the future has been cancelled, raises CancelledError.  If the
        future's result isn't yet available, raises InvalidStateError.  If
        the future is done and has an exception set, this exception is raised.
        """
        if self._state == _CANCELLED:
            raise self._make_cancelled_error()
        if self._state != _FINISHED:
            raise exceptions.InvalidStateError('Result is not ready.')
        self.__log_traceback = False
        if self._exception is not None:
>           raise self._exception.with_traceback(self._exception_tb)
E           aiormq.exceptions.PublishError: ('NO_ROUTE', 'aiida-e2d8e4262c4145d9b8cea5eed6b9017f.process.queue')

/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/asyncio/futures.py:199: PublishError

During handling of the above exception, another exception occurred:

get_calcjob_builder = <function get_calcjob_builder.<locals>._factory at 0x7f90db909800>
daemon_client = <aiida.engine.daemon.client.DaemonClient object at 0x7f90f3900590>
submit_and_await = <function submit_and_await.<locals>.factory at 0x7f90db7e8cc0>

    @pytest.mark.requires_rmq
    def test_restart_after_daemon_reset(get_calcjob_builder, daemon_client, submit_and_await):
        """Test that a job can be restarted when it is launched and the daemon is restarted.
    
        This is a regression test for https://github.com/aiidateam/aiida-core/issues/5882.
        """
        import time
    
        import plumpy
    
        daemon_client.start_daemon()
    
        # Launch a job with a one second sleep to ensure it doesn't finish before we get the chance to restart the daemon.
        # A monitor is added to ensure that those are properly reinitialized in the ``Waiting`` state of the process.
        builder = get_calcjob_builder()
        builder.metadata.options.sleep = 1
        builder.monitors = {'monitor': orm.Dict({'entry_point': 'core.always_kill', 'disabled': True})}
>       node = submit_and_await(builder, plumpy.ProcessState.WAITING)

tests/engine/processes/calcjobs/test_calc_job.py:1362: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
src/aiida/tools/pytest_fixtures/daemon.py:138: in factory
    node = submit(submittable)
src/aiida/engine/launch.py:144: in submit
    runner.controller.continue_process(process_inited.pid, nowait=False, no_reply=True)
.venv/lib/python3.13/site-packages/plumpy/process_comms.py:431: in continue_process
    return self.task_send(message, no_reply=no_reply)
.venv/lib/python3.13/site-packages/plumpy/process_comms.py:503: in task_send
    return self._communicator.task_send(message, no_reply=no_reply)
.venv/lib/python3.13/site-packages/kiwipy/rmq/threadcomms.py:217: in task_send
    return self._loop_scheduler.await_(self._communicator.task_send(task, no_reply))
.venv/lib/python3.13/site-packages/pytray/aiothreads.py:164: in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/concurrent/futures/_base.py:456: in result
    return self.__get_result()
/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
/opt/hostedtoolcache/Python/3.13.9/x64/lib/python3.13/asyncio/tasks.py:306: in __step_run_and_handle_result
    result = coro.throw(exc)
.venv/lib/python3.13/site-packages/pytray/aiothreads.py:178: in coro
    res = await awaitable
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <kiwipy.rmq.communicator.RmqCommunicator object at 0x7f90dbb58940>
task = {'args': {'nowait': False, 'pid': 1634, 'tag': None}, 'task': 'continue'}
no_reply = True

    async def task_send(self, task, no_reply=False):
        try:
            task_queue = await self.get_default_task_queue()
            result = await task_queue.task_send(task, no_reply)
            return result
        except aio_pika.exceptions.DeliveryError as exception:
>           raise kiwipy.UnroutableError(str(exception))
E           kiwipy.exceptions.UnroutableError: ('NO_ROUTE', 'aiida-e2d8e4262c4145d9b8cea5eed6b9017f.process.queue')

.venv/lib/python3.13/site-packages/kiwipy/rmq/communicator.py:529: UnroutableError
=============================== warnings summary ===============================

danielhollas avatar Dec 12 '25 13:12 danielhollas

I would like to help investigate this issue. @danielhollas From the stack trace it seems the failure is caused by a NO_ROUTE error when publishing to the process queue after the daemon restart.

aman-coder03 avatar Dec 12 '25 14:12 aman-coder03

Hi @aman-coder03, thanks for your interest! However, I don't think this issue is suited for first-time contributors, as it comes deep from dependencies that handle communications via RabbitMQ engine. So this bug might not even be from aiida-core itself.

danielhollas avatar Dec 15 '25 12:12 danielhollas