aiida-core
aiida-core copied to clipboard
Issues with new asyncio daemon (stress-test)
I have tried a stress-test of the new AiiDA daemon after the replacement of tornado with asyncio.
I have updated to the most recent develop (716a1d8f6), updated also aiida-qe to develop (commit 1a9713aefbcd235c20ecfb65d0df226b5544bf7d of that repo), pip installed both, run reentry scan
, and stopped+started the daemon.
I try to roughly describe also what I've been doing.
Then, roughly, I have prepared a script to submit something of the order of ~2000 relax workflows.
While the submission was happening, I quickly reached the number of slots (warning message at the end of verdi process list indicating a % > 100%), so I did verdi daemon incr 7
to work with 8 workers.
After having submitted more than half of the workflows, I stopped because anyway 8 workers weren't enough and I didn't want to overload the supercomputer with too many connections from too many workers.
I left it run overnight, the next morning I was in a stalled situation all slots were taken, so I increased a bit more the workers, and after a while submitted the rest of the workflows, and let them finish.
Since I realised that most were excepting (see below), I also stopped the daemon (that took a bit, made sure it was stopped, and started again with just one worker to finish the work.
I have seen a number of issues unfortunately ( :-( ) where most calculations had some kind of problem. Pinging @sphuber @unkcpz @muhrin as they have been working on this so they should be able to help debugging/fixing the bugs.
I am going report below as different comments some of the issues that I'm seeing, but I'm not sure how to debug more, so if you need specific logs please let me know what to run (or @sphuber I can give you temporarily access to the machine if it's easier). While I write I have the last few (~30) jobs finishing, but I can already start reporting the issues I see.
- a few of the relax work chains are left in the
Created
state, as well as some of the internals PwBaseWorkChains, and even some PwCalculations:
27034 14h ago PwRelaxWorkChain ⏹ Created
33506 7h ago PwCalculation ⏹ Created
33986 7h ago PwCalculation ⏹ Created
33983 7h ago PwBaseWorkChain ⏹ Created
33989 7h ago PwCalculation ⏹ Created
33992 6h ago PwBaseWorkChain ⏹ Created
34025 6h ago PwBaseWorkChain ⏹ Created
34028 6h ago PwBaseWorkChain ⏹ Created
34090 6h ago PwCalculation ⏹ Created
34147 6h ago PwBaseWorkChain ⏹ Created
34153 6h ago PwBaseWorkChain ⏹ Created
34156 6h ago PwBaseWorkChain ⏹ Created
34162 6h ago PwCalculation ⏹ Created
34174 6h ago PwBaseWorkChain ⏹ Created
34189 6h ago PwBaseWorkChain ⏹ Created
34207 6h ago PwBaseWorkChain ⏹ Created
34391 6h ago PwCalculation ⏹ Created
34394 6h ago PwBaseWorkChain ⏹ Created
35252 6h ago PwCalculation ⏹ Created
35467 5h ago PwCalculation ⏹ Created
35482 5h ago PwCalculation ⏹ Created
35570 5h ago PwCalculation ⏹ Created
35869 5h ago PwCalculation ⏹ Created
39375 3h ago PwCalculation ⏹ Created
(Since I hit CTRL+C only once yesterday night, and restarted the daemon once less than 1h ago, these shouldn't be connected to an action from my side).
I have many (most?) of the calculations and workflows excepted. Running
from collections import Counter
qb = QueryBuilder()
qb.append(Group, filters={'label': 'MY_GROUP_NAME'}, tag='g')
qb.append(Node, with_group='g', project=['id', 'attributes.exit_status'])
res = dict(qb.all())
print(Counter(res.values()))
I get
Counter({None: 1103, 401: 984, 0: 27, 402: 18})
as you see most of them failed, many without even an exit status (None
). I report here some of the report
s:
Example of a 401
$ verdi process report 16518
2020-11-28 01:14:49 [3203 | REPORT]: [16518|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29477>
2020-11-28 06:39:43 [5230 | REPORT]: [29477|PwBaseWorkChain|run_process]: launching PwCalculation<32972> iteration #1
2020-11-28 07:07:45 [7357 | REPORT]: [29477|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 07:18:17 [9388 | REPORT]: [16518|PwRelaxWorkChain|inspect_relax]: relax PwBaseWorkChain failed with exit status 301
2020-11-28 07:18:17 [9389 | REPORT]: [16518|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned
and the corresponding failed (excepted) calculation has:
!verdi process report 32972
*** 32972: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 06:58:59.147179+00:00
| Traceback (most recent call last):
| File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
| result = await super()._continue(communicator, pid, nowait, tag)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
| proc = saved_state.unbundle(self._load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
| return Savable.load(self, load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
| return load_cls.recreate_from(saved_state, load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
| base.call_with_super_check(process.init)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
| wrapped(*args, **kwargs)
| File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
| super().init()
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
| wrapped(self, *args, **kwargs)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
| identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
| return self._communicator.add_rpc_subscriber(converted, identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
| self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
| return self.await_submit(awaitable).result(timeout=self.task_timeout)
| File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
| return self.__get_result()
| File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
| raise self._exception
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
| result = done_future.result()
| File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
| raise self._exception
| File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
| result = coro.throw(exc)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
| return await awaitable
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
| identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
| rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
| timeout=timeout,
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
| await queue.declare(timeout=timeout)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
| timeout=timeout,
| File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
| return await fut
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
| timeout=timeout,
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
| return await self.create_task(func(self, *args, **kwargs))
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
| return await self.task
| File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
| yield self # This tells Task to wait for completion.
| File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
| future.result()
| File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
| raise self._exception
| File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
| result = coro.send(None)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
| raise ChannelInvalidStateError("writer is None")
| aiormq.exceptions.ChannelInvalidStateError: writer is None
and
$ verdi process status 16518
PwRelaxWorkChain<16518> Finished [401] [1:while_(should_run_relax)(1:inspect_relax)]
└── PwBaseWorkChain<29477> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
└── PwCalculation<32972> Excepted
So it failed because an internal step excepted.
Example of a None
$verdi process report 16663
2020-11-28 01:18:21 [3319 | REPORT]: [16663|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29652>
2020-11-28 06:40:24 [5336 | REPORT]: [29652|PwBaseWorkChain|run_process]: launching PwCalculation<33293> iteration #1
2020-11-28 07:08:13 [7465 | ERROR]: Traceback (most recent call last):
File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
result = await super()._continue(communicator, pid, nowait, tag)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
proc = saved_state.unbundle(self._load_context)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
return Savable.load(self, load_context)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
return load_cls.recreate_from(saved_state, load_context)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
base.call_with_super_check(process.init)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
wrapped(*args, **kwargs)
File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
super().init()
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
wrapped(self, *args, **kwargs)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
return self._communicator.add_rpc_subscriber(converted, identifier)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
return self.await_submit(awaitable).result(timeout=self.task_timeout)
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
return self.__get_result()
File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
result = done_future.result()
File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
raise self._exception
File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
result = coro.throw(exc)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
return await awaitable
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
timeout=timeout,
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
await queue.declare(timeout=timeout)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
timeout=timeout,
File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
return await fut
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
timeout=timeout,
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
return await self.create_task(func(self, *args, **kwargs))
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
future.result()
File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
raise self._exception
File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
result = coro.send(None)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None
2020-11-28 07:08:24 [7597 | REPORT]: [29652|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
and
$verdi process status 16663
PwRelaxWorkChain<16663> Excepted [1:while_(should_run_relax)]
└── PwBaseWorkChain<29652> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
└── PwCalculation<33293> Excepted
So also in this case it's an internal step that excepted - I'm not sure of the difference between the two
The 402 (I checked a couple) only happened at the very beginning of the submission, or a couple after a while, here are the PKs:
[1680, 1762, 1886, 1951, 2044, 2061, 2247, 2295, 2357, 2436, 2580, 2594, 2611, 2656, 2876, 2890, 26166, 26572]
Also in this case probably the problem is similar:
$ verdi process report 26166
2020-11-28 07:03:10 [6908 | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<34816>
2020-11-28 07:09:09 [8037 | REPORT]: [34816|PwBaseWorkChain|run_process]: launching PwCalculation<35225> iteration #1
2020-11-28 11:40:36 [11802 | REPORT]: [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> run with smearing and highest band is occupied
2020-11-28 11:40:36 [11803 | REPORT]: [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: BandsData<40529> has invalid occupations: Occupation of 2.0 at last band lkn<0,0,200>
2020-11-28 11:40:36 [11804 | REPORT]: [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> had insufficient bands
2020-11-28 11:40:36 [11805 | REPORT]: [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: Action taken: increased number of bands to 210 and restarting from scratch
2020-11-28 11:40:36 [11806 | REPORT]: [34816|PwBaseWorkChain|inspect_process]: PwCalculation<35225> finished successfully but a handler was triggered, restarting
2020-11-28 11:40:36 [11807 | REPORT]: [34816|PwBaseWorkChain|run_process]: launching PwCalculation<40535> iteration #2
2020-11-28 13:02:41 [11886 | REPORT]: [34816|PwBaseWorkChain|results]: work chain completed after 2 iterations
2020-11-28 13:02:42 [11887 | REPORT]: [34816|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:02:42 [11888 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 1 cell volume of relaxed structure is 519.7184571850364
2020-11-28 13:02:42 [11889 | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<40724>
2020-11-28 13:02:43 [11890 | REPORT]: [40724|PwBaseWorkChain|run_process]: launching PwCalculation<40727> iteration #1
2020-11-28 13:10:43 [11891 | REPORT]: [40724|PwBaseWorkChain|results]: work chain completed after 1 iterations
2020-11-28 13:10:43 [11892 | REPORT]: [40724|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:10:43 [11893 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 2 cell volume of relaxed structure is 519.7184503782762
2020-11-28 13:10:43 [11894 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: relative cell volume difference 1.3097014580438056e-08 smaller than convergence threshold 0.01
2020-11-28 13:10:43 [11895 | REPORT]: [26166|PwRelaxWorkChain|run_final_scf]: launching PwBaseWorkChain<40736> for final scf
2020-11-28 13:10:45 [11896 | REPORT]: [40736|PwBaseWorkChain|run_process]: launching PwCalculation<40739> iteration #1
2020-11-28 13:11:45 [11898 | REPORT]: [40736|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:11:45 [11899 | REPORT]: [26166|PwRelaxWorkChain|inspect_final_scf]: final scf PwBaseWorkChain failed with exit status 301
2020-11-28 13:11:45 [11900 | REPORT]: [26166|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned
and
$verdi process status 26166
PwRelaxWorkChain<26166> Finished [402] [2:if_(should_run_final_scf)(1:inspect_final_scf)]
├── PwBaseWorkChain<34816> Finished [0] [7:results]
│ ├── PwCalculation<35225> Finished [0]
│ └── PwCalculation<40535> Finished [0]
├── PwBaseWorkChain<40724> Finished [0] [7:results]
│ └── PwCalculation<40727> Finished [0]
└── PwBaseWorkChain<40736> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
└── PwCalculation<40739> Excepted
and finally:
$ verdi process report 40739
*** 40739: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 13:10:45.225990+00:00
| Traceback (most recent call last):
| File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
| result = await super()._continue(communicator, pid, nowait, tag)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
| proc = saved_state.unbundle(self._load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
| return Savable.load(self, load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
| return load_cls.recreate_from(saved_state, load_context)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
| base.call_with_super_check(process.init)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
| wrapped(*args, **kwargs)
| File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
| super().init()
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
| wrapped(self, *args, **kwargs)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
| identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
| return self._communicator.add_rpc_subscriber(converted, identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
| self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
| return self.await_submit(awaitable).result(timeout=self.task_timeout)
| File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
| return self.__get_result()
| File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
| raise self._exception
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
| result = done_future.result()
| File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
| raise self._exception
| File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
| result = coro.throw(exc)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
| return await awaitable
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
| identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
| rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
| timeout=timeout,
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
| await queue.declare(timeout=timeout)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
| timeout=timeout,
| File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
| return await fut
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
| timeout=timeout,
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
| return await self.create_task(func(self, *args, **kwargs))
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
| return await self.task
| File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
| yield self # This tells Task to wait for completion.
| File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
| future.result()
| File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
| raise self._exception
| File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
| result = coro.send(None)
| File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
| raise ChannelInvalidStateError("writer is None")
| aiormq.exceptions.ChannelInvalidStateError: writer is None
While this seems to be a quite important bug that happens often and has quite some important consequences for users, the "good news" is that from this simple analysis it seems to be mostly generated by the same aiormq.exceptions.ChannelInvalidStateError: writer is None
error, so maybe fixing this would fix the majority of the problems (except maybe for the few calculations and workflows that were left in the Created
state?
A final comment there are a few more e.g. this:
$ verdi process report 45613
2020-11-28 13:16:06 [12490 | REPORT]: [45613|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<46239>
2020-11-28 13:16:17 [12531 | REPORT]: [46239|PwBaseWorkChain|run_process]: launching PwCalculation<46565> iteration #1
2020-11-28 13:25:50 [13890 | REPORT]: [46239|PwBaseWorkChain|on_except]: Traceback (most recent call last):
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
next_state = await self._run_task(self._state.execute)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
result = await coro(*args, **kwargs)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
result = await self._waiting_future
File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
future.result()
concurrent.futures._base.CancelledError
2020-11-28 13:25:50 [13893 | REPORT]: [46239|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:25:59 [14414 | REPORT]: [45613|PwRelaxWorkChain|on_except]: Traceback (most recent call last):
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
next_state = await self._run_task(self._state.execute)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
result = await coro(*args, **kwargs)
File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
result = await self._waiting_future
File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
future.result()
concurrent.futures._base.CancelledError
2020-11-28 13:25:59 [14419 | REPORT]: [45613|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned
The error is different, and while the workchain is failed, there some leftover orphan calculations still queued or running on the supercomputer:
$ verdi process status 45613
PwRelaxWorkChain<45613> Excepted [1:while_(should_run_relax)]
└── PwBaseWorkChain<46239> Excepted [6:while_(should_run_process)(1:run_process)]
└── PwCalculation<46565> Waiting
and
$ verdi process list | grep 46565
46565 53m ago PwCalculation ⏵ Waiting Monitoring scheduler: job state RUNNING
@giovannipizzi thanks for such elaborate bug report!
As for the 401 and None cases, after reading the traceback, though I'm not sure what's going on here, it reminds me of a piece of code haunt hideously in my head. In the traceback, the program complaint when it adding the rpc subscriber to the communicator(a LoopCommunicator
), this involves calling the plum_to_kiwi_future
of plumpy. In this function, there are lines never get tested by the unittests but the same code once caused problems elsewhere. That's the if isinstance(result, futures.Future):
to get the result from the future and unwrap it if it's another future. But when it is a SavableFuture
it is a _asyncio.Future
(PY>3.6) which not an instance of plumpy.Future
(a asyncio.Future
). So in aiida_core
I use asyncio.isfuture(result)
to identify it.
I'm not sure this relates to this issue, but it might be an inspiration for how to debug this. @muhrin is more expert in this, looking forward to him for inputs.
Thanks @unkcpz ! Unfortunately I'm also not an expert of that code. One important thing to mention (see #4598) is that I had been running with a lot of jobs, so (maybe?) there was just a system overload? I'm not sure, the errors are different than in #4598, but just to keep in mind. In case I could try to run again in a few days, but without "exaggerating" (e.g. only ~200 was at at time) to see if the problem appears also with low computer load. Otherwise, any help in how to debug this more would be appreciated (it's relatively simple to reproduce, I'm just running a lot of PwRelaxWorkChains).
Thanks for the report @giovannipizzi. The exit codes here are not really useful. All those exit codes of the PwBaseWorkChain
and PwRelaxWorkChain
simply reflect that a sub process failed and so they don't provide any useful information. Note that the cases where exit_status == None
that also apply to processes that are either not yet terminated, or are excepted/killed. Finding those in and of themselves then is not problematic, especially if you know that you have a lot of excepted processes. The most interesting query we should do here is simply find all the excepted processes and retrieve the corresponding exception. As you have noticed, there probably are a few types of exceptions that then cause all sorts of knock on effects.
The most critical one seems to be:
aiormq.exceptions.ChannelInvalidStateError: writer is None
Here there seems to be something wrong with the connection to RabbitMQ and so any operation that involves it will raise an exception that will cause the process to fall over. The other one:
concurrent.futures._base.CancelledError
is likely just a result of the former one. A process was waiting on a sub process, which excepted due to the first, causing the associated future to be cancelled and here I think we might not yet be catching the correct exceptions. Either because it has been changed, or because multiple ones can be thrown and we didn't expect this one.
OK - I'll try to get more information about the actual errors/exceptions.
Anyway, would it be possible (at least for now, or maybe even in the mid term) to 'catch' these specific exceptions and just consider them as connection failures, so AiiDA's retry mechanisms is triggered and eventually the process is paused if the problem persists?
I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?
I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?
That would be ideal, but I think this is not going be straightforward to implement because we cannot put this into plumpy where we catch all exceptions and retry. Some exceptions are real and need to fail the process. It won't be possible to have plumpy automatically determine which failures can be transient and need to be "retried", where the last is in brackets because even that is not straightforward. The exponential backoff mechanism is something we implemented completely on top of the Process
state machine, specifically and only for the CalcJob
subclass. It does not exist as a basic piece of the machinery.
Hi Gio,
If there's any way you can still reproduce this then it might be worth looking at whether there could be a connection between the CancelledError
and the ChannelInvalidStateError
.
In aiormq there are only a few ways that the writer
can be set to None
and one of them is if a CancelledError
[1] is caught in an RPC call.
So, if you can reproduce it, would you be able to enable logging for aiormq
, and possibly asynio
? We might see the signature "Closing channel .. because RPC call ... cancelled" warning log, but I would have thought that this would appear by default (as it is at WARNING level).
The only other ways I can see, at the moment, that the writer
could be None
is if the connector
had a None
writer
when the Channel
was created, or if the Connection
itself was closed which I think is probably the most likely, likley due to missed heartbeats.
[1] This CancelledError
is indeed a concurrent.futures._base.CancelledError
if you follow the alias from asyncio
Hi Martin,
I could manage to run again in a few days maybe. Just to be sure, how should I increase the logging leve exactly? Is it something I can do from verdi config
? (Just to be sure that we have all information). Also, it seems AiiDA is rotating the logs very quickly, maybe I should increase some config to avoid to loose too much information?
, how should I increase the logging level exactly? Is it something I can do from verdi config?
Hi @giovannipizzi, the logger setter of asyncio
(it is there to configure the tornado
logger setting but I removed it since it is less used) was moved in this new commit. The easiest way(I guess so. @sphuber correct me if I'm wrong) to enable logger of aiormq
and asyncio
would be add them to the logger setter. Then you can config it through verdi config logging.asyncio_loglevel DEBUG
.
Just a short message to confirm that, with a "low throughput" (100 WorkChains/~300 processes running at the same time), everything works fine (I submitted a total of ~1000 work chains). So I guess these errors start occurring only when the machine gets slow because of too many submissions - still to investigate, but maybe to do together with #4603 and #4598
first to note, there is now an logging.aiopika_loglevel
config value, and also kiwipy has debugging for what messages are sent (since the aiopika logging does not give the content of the messages)
then for the aiormq.exceptions.ChannelInvalidStateError
, as already mentioned it is some issue with connecting to rabbitmq (the communicator has been closed?) that is possibly beyond our control.
In Process.init
we already catch a connection error, in the form of futures.TimeoutError
and ignore it: https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy
try:
identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)
this means that process does not have except, but the trade-off is that it will be unreachable if trying to use verdi kill
.
So we could also catch: aiormq.exceptions.ChannelInvalidStateError
and accept this trade-off.
Alternatively, at a "higher level", perhaps it would be better not to ignore these exceptions but rather, in plumpy.ProcessLauncher._continue
catch them and convert them to a TaskRejected
exception (and also make sure this does not trigger aiida.ProcessLauncher.handle_continue_exception
).
https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy/process_comms.py#L586 https://github.com/aiidateam/aiida-core/blob/c07e3ef0b9e9fff64888115aaf30479bbd76392e/aiida/manage/external/rmq.py#L206
If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers. Then you should not end up with unreachable processes.
How's this sound @muhrin @sphuber?
from #4598 kiwipy.communications.DuplicateSubscriberIdentifier
should also be add to the exceptions that are handled and result in a TaskRejected
exception
in respect to:
try:
identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)
try:
identifier = self._communicator.add_broadcast_subscriber(
self.broadcast_receive, identifier=str(self.pid)
)
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
except kiwipy.TimeoutError:
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)
I guess there should be some thought on what happens if add_rpc_subscriber
is successful and add_broadcast_subscriber
fails, i.e. we should try to remove_rpc_subscriber
(catching exceptions)?
If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers. Then you should not end up with unreachable processes.
Yes, if in ProcessLauncher._continue
we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ. However, this might not actually be a solution. If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever. Once all daemon workers have the same problem, they will all just fail to restart processes and send the task back, causing the tasks to start ping-ponging. So the proposed fix would be one if and only if we also fix the underlying communicator problem, or determine that it is transient and not permanent for a daemon worker.
from #4598
kiwipy.communications.DuplicateSubscriberIdentifier
should also be add to the exceptions that are handled and result in aTaskRejected
exception
Same story here, it would definitely be good to handle this more gracefully and reject the task instead of excepting the process. However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.
thanks for the reply @sphuber
Yes, if in
ProcessLauncher._continue
we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ.
note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception: https://github.com/aiidateam/aiida-core/blob/c07e3ef0b9e9fff64888115aaf30479bbd76392e/aiida/manage/external/rmq.py#L211-L214
i.e. here we should at least catch TaskRejected
exceptions separately and not call handle_continue_exception
before re-raising
If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever.
Indeed, if it is a "permanently broken" communicator then I guess we should just kill the daemon worker.
Within aiida.ProcessLauncher._continue
you could certainly implement an (exponential backoff) retry mechanism, whereby if the Process initiation fails, you simply sleep for some time then retry. Then perhaps if these retries fail you kill the daemon worker entirely.
However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.
indeed but its just so hard to debug, because it is so hard to consistently replicate. perhaps there is some way to mock rabbitmq heartbeats being missed 🤷
note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception:
I know, I said if we were to start doing that, which to me seems like a good idea indeed, provided that we fix the underlying problem of a potentially "permanently" broken communicator. Here we should consult with @muhrin because recent versions of kiwipy
should have a robust communicator that reconnects automatically, in which case either rejecting the task or having an automatic retry would maybe work. The reject would definitely be the simplest.
So just to consolidate known failure modes when adding the process rpc/broadcast reciever:
-
kiwipy.DuplicateSubscriberIdentifier
: it seems this should always be rejected, because we don't want multiple instances of the same process running. (see also in #4598 and #3973) -
aiormq.exceptions.ChannelInvalidStateError: writer is None
-
pika.exceptions.ChannelWrongStateError: Channel is closed
: noted in slack -
RuntimeError: The channel is closed
: fromtopika.common
, noted in slack, but topika is no longer used I guess? -
future.TimoutError
: this one is perhaps a bit tricky, because perhaps a timeout does not always mean that the subscriber has not been added, just that we do not receive a confirmation quick enough. In fact thinking on it now, perhaps this is related tokiwipy.DuplicateSubscriberIdentifier
, since if this exception is encountered then a broadcast remove task will not be added to the process cleanup. Perhaps @giovannipizzi could check if when he sees this exception, he also sees "failed to register as a broadcast subscriber" in the daemon log? (EDIT: its probably more likely related to a heartbeat missed. but good to check)
see https://github.com/aiidateam/plumpy/pull/212 for something I think will help
Hey @muhrin, related to this, perhaps you could summarise your understanding of what happens when an RMQ heartbeat is missed; on the server (RMQ) and client (daemon worker) sides.
e.g. will RMQ discard the original daemon worker connection and resend any pending tasks (i.e. process continues) it was working on? How does aiormq/kiwipy handle this; will it notice that the connection has been lost and create a new one? Will the daemon worker just continue to work on tasks until it potentially excepts
RabbitMQ will actually only consider a connection dead when two subsequent heartbeats are missed. In that case, all tasks that were associated with that worker are immediately requeued and so will eventually be resent. If the heartbeats were missed because the daemon worker actually died, all is working as intended. However, if the daemon worker was actually still working on the tasks and it missed the heartbeats for another reason (failure to respond because it was too busy or whatever) then we run into undefined behaviour because another worker will potentially start to run the same processes and now there will be all sorts of problems where one of the two workers will perform an action that the other already did and an exception will most likely be raised.
Note that a long time ago, @muhrin changed the kiwipy
communication significantly by moving all communication with RabbitMQ to a separate thread from the main thread that does the actual business logic in running the processes. Without this separation we were indeed seeing cases where a daemon worker, when it was under heavy load, would fail to respond to the heartbeats in time and so the tasks would be requeued. After the rework, this should no longer be an issue, but there is no guarantee.
Then there is the question of the daemon worker actually losing its connection but otherwise still being functional. At some point it would and could not automatically reestablish the connection and so at some point the tasks would be requeued. I think @muhrin also worked on this in trying to create a robust connection that would automatically reconnect if broken, but I am not sure if this actually works.
thanks @sphuber, just for some extra clarification; when we set the heartbeat to 600s in broker parameters, this will create a connection URL with ...?heartbeat=600
, which says to RMQ "expect a heartbeat within a 600s frequency", then in kiwipy/aiormq a task is set up on the thread to send out this heartbeat every 600x0.5=300s (see https://github.com/mosquito/aiormq/blob/a0920bb65da2f0fc40bf589e5fb6cc1183f0aa40/aiormq/connection.py#L294), if RMQ does not receive two of these or any other traffic it closes the connection (https://www.rabbitmq.com/heartbeats.html#heartbeats-interval). Similarly, if aiormq does not receive a heartbeat within 600x3=1800s, it will close the connection on the client side.
Hey lads. Sorry I don't have much time at the moment - we did the trip down to Switzerland last weekend and are currently on holiday/getting set up in CH.
To give some input on the discussions:
-
kiwipy
uses robust connections which do reconnect when they can. Of course, if the connection has been dropped then all messages that were being processed by that client will have been requeued. - Communication is running on a separate thread and so heartbeats should get through even under heavy Python load with the exceptions of things that hold the GIL.
- Clients can request a particular heartbeat but IIRC RMQ will always use
min(server heartbeat, client heartbeat)
and I think the server is set to 60s by default. - My suspicion for some of the the duplicate subscriber errors (for example when a laptop goes to sleep) is because the heartbeats are missed and the same subscriber tries to subscribe themselves (not knowing that they have already been 'kicked').
- Currently there is no mechanism in
kiwipy
to find out when a connection has dropped and so there is no clearing of the subscribers mapping (this is a bug and should be fixed but it's not clear what the downstream consequences would be. In any case I'm happy to investigate this in the coming weeks)
we did the trip down to Switzerland last weekend and are currently on holiday/getting set up in CH
welcome back 😄
Clients can request a particular heartbeat but IIRC RMQ will always use min(server heartbeat, client heartbeat) and I think the server is set to 60s by default.
so does this mean that with ..?heartbeat=600
RMQ will still send out a heartbeat every 60s, but will only expect one back every 600s?
Currently there is no mechanism in
kiwipy
to find out when a connection has dropped and so there is no clearing of the subscribers mapping (this is a bug and should be fixed but it's not clear what the downstream consequences would be. In any case I'm happy to investigate this in the coming weeks)
I think this is a very important point. This could definitely explain the behaviour. If a connection is dropped and so RabbitMQ reschedules the tasks, if the worker re-establishes its connection and gets one of the tasks that it was already running, that would definitely cause that problem. The real problem here is of course that if a connection is dropped the worker should automatically and a.s.a.p stop running the tasks it had, because technically it no longer has the "right" to run them as they have been put back in the queue.
The real problem here is of course that if a connection is dropped the worker should automatically and a.s.a.p stop running the tasks it had, because technically it no longer has the "right" to run them as they have been put back in the queue.
yep thats my thinking. In a way kiwipy's reconnect mechanism is "too elegant" lol, in that rather than reconnecting it should just raise an exception and crash that daemon worker (instantly killing all its tasks). (Then circus will just spawn a fresh worker)
I think the main challenge here would be detecting the broken connection early enough. If it is not using it, it won't notice it unless we put some check in that explicitly checks every period of time.
Could we add an option to the communicator in kiwipy, to add a callback to RobustConnection.add_reconnect_callback
that does this daemon crash (i.e. raises an exception)?