aiida-core icon indicating copy to clipboard operation
aiida-core copied to clipboard

Issues with new asyncio daemon (stress-test)

Open giovannipizzi opened this issue 3 years ago • 51 comments

I have tried a stress-test of the new AiiDA daemon after the replacement of tornado with asyncio.

I have updated to the most recent develop (716a1d8f6), updated also aiida-qe to develop (commit 1a9713aefbcd235c20ecfb65d0df226b5544bf7d of that repo), pip installed both, run reentry scan, and stopped+started the daemon.

I try to roughly describe also what I've been doing. Then, roughly, I have prepared a script to submit something of the order of ~2000 relax workflows. While the submission was happening, I quickly reached the number of slots (warning message at the end of verdi process list indicating a % > 100%), so I did verdi daemon incr 7 to work with 8 workers. After having submitted more than half of the workflows, I stopped because anyway 8 workers weren't enough and I didn't want to overload the supercomputer with too many connections from too many workers. I left it run overnight, the next morning I was in a stalled situation all slots were taken, so I increased a bit more the workers, and after a while submitted the rest of the workflows, and let them finish. Since I realised that most were excepting (see below), I also stopped the daemon (that took a bit, made sure it was stopped, and started again with just one worker to finish the work.

I have seen a number of issues unfortunately ( :-( ) where most calculations had some kind of problem. Pinging @sphuber @unkcpz @muhrin as they have been working on this so they should be able to help debugging/fixing the bugs.

I am going report below as different comments some of the issues that I'm seeing, but I'm not sure how to debug more, so if you need specific logs please let me know what to run (or @sphuber I can give you temporarily access to the machine if it's easier). While I write I have the last few (~30) jobs finishing, but I can already start reporting the issues I see.

giovannipizzi avatar Nov 28 '20 13:11 giovannipizzi

  • a few of the relax work chains are left in the Created state, as well as some of the internals PwBaseWorkChains, and even some PwCalculations:
27034  14h ago    PwRelaxWorkChain  ⏹ Created
33506  7h ago     PwCalculation     ⏹ Created
33986  7h ago     PwCalculation     ⏹ Created
33983  7h ago     PwBaseWorkChain   ⏹ Created
33989  7h ago     PwCalculation     ⏹ Created
33992  6h ago     PwBaseWorkChain   ⏹ Created
34025  6h ago     PwBaseWorkChain   ⏹ Created
34028  6h ago     PwBaseWorkChain   ⏹ Created
34090  6h ago     PwCalculation     ⏹ Created
34147  6h ago     PwBaseWorkChain   ⏹ Created
34153  6h ago     PwBaseWorkChain   ⏹ Created
34156  6h ago     PwBaseWorkChain   ⏹ Created
34162  6h ago     PwCalculation     ⏹ Created
34174  6h ago     PwBaseWorkChain   ⏹ Created
34189  6h ago     PwBaseWorkChain   ⏹ Created
34207  6h ago     PwBaseWorkChain   ⏹ Created
34391  6h ago     PwCalculation     ⏹ Created
34394  6h ago     PwBaseWorkChain   ⏹ Created
35252  6h ago     PwCalculation     ⏹ Created
35467  5h ago     PwCalculation     ⏹ Created
35482  5h ago     PwCalculation     ⏹ Created
35570  5h ago     PwCalculation     ⏹ Created
35869  5h ago     PwCalculation     ⏹ Created
39375  3h ago     PwCalculation     ⏹ Created

(Since I hit CTRL+C only once yesterday night, and restarted the daemon once less than 1h ago, these shouldn't be connected to an action from my side).

giovannipizzi avatar Nov 28 '20 13:11 giovannipizzi

I have many (most?) of the calculations and workflows excepted. Running

from collections import Counter

qb = QueryBuilder()
qb.append(Group, filters={'label': 'MY_GROUP_NAME'}, tag='g')
qb.append(Node, with_group='g', project=['id', 'attributes.exit_status'])
res = dict(qb.all())
print(Counter(res.values()))

I get

Counter({None: 1103, 401: 984, 0: 27, 402: 18})

as you see most of them failed, many without even an exit status (None). I report here some of the reports:

Example of a 401

$ verdi process report 16518

2020-11-28 01:14:49 [3203 | REPORT]: [16518|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29477>
2020-11-28 06:39:43 [5230 | REPORT]:   [29477|PwBaseWorkChain|run_process]: launching PwCalculation<32972> iteration #1
2020-11-28 07:07:45 [7357 | REPORT]:   [29477|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 07:18:17 [9388 | REPORT]: [16518|PwRelaxWorkChain|inspect_relax]: relax PwBaseWorkChain failed with exit status 301
2020-11-28 07:18:17 [9389 | REPORT]: [16518|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

and the corresponding failed (excepted) calculation has:

!verdi process report 32972
*** 32972: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 06:58:59.147179+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
 |     result = await super()._continue(communicator, pid, nowait, tag)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
 |     proc = saved_state.unbundle(self._load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
 |     return Savable.load(self, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
 |     return load_cls.recreate_from(saved_state, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
 |     base.call_with_super_check(process.init)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
 |     wrapped(*args, **kwargs)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
 |     super().init()
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
 |     wrapped(self, *args, **kwargs)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
 |     identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
 |     return self._communicator.add_rpc_subscriber(converted, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
 |     self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
 |     return self.__get_result()
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
 |     raise self._exception
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
 |     result = done_future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
 |     result = coro.throw(exc)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
 |     return await awaitable
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
 |     identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
 |     rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
 |     await queue.declare(timeout=timeout)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
 |     timeout=timeout,
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
 |     return await fut
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
 |     return await self.create_task(func(self, *args, **kwargs))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
 |     return await self.task
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
 |     yield self  # This tells Task to wait for completion.
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
 |     result = coro.send(None)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
 |     raise ChannelInvalidStateError("writer is None")
 | aiormq.exceptions.ChannelInvalidStateError: writer is None

and

$ verdi process status 16518
PwRelaxWorkChain<16518> Finished [401] [1:while_(should_run_relax)(1:inspect_relax)]
    └── PwBaseWorkChain<29477> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<32972> Excepted

So it failed because an internal step excepted.

Example of a None

$verdi process report 16663

2020-11-28 01:18:21 [3319 | REPORT]: [16663|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<29652>
2020-11-28 06:40:24 [5336 | REPORT]:   [29652|PwBaseWorkChain|run_process]: launching PwCalculation<33293> iteration #1
2020-11-28 07:08:13 [7465 |  ERROR]: Traceback (most recent call last):
  File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
    result = await super()._continue(communicator, pid, nowait, tag)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
    proc = saved_state.unbundle(self._load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
    return Savable.load(self, load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
    return load_cls.recreate_from(saved_state, load_context)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
    base.call_with_super_check(process.init)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
    wrapped(*args, **kwargs)
  File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
    super().init()
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
    wrapped(self, *args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
    identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
    return self._communicator.add_rpc_subscriber(converted, identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
    self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
    return self.await_submit(awaitable).result(timeout=self.task_timeout)
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
    result = done_future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
    result = coro.throw(exc)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
    return await awaitable
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
    identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
    rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
    timeout=timeout,
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
    await queue.declare(timeout=timeout)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
    timeout=timeout,
  File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
    return await fut
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
    timeout=timeout,
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
    return await self.create_task(func(self, *args, **kwargs))
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
    return await self.task
  File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
    result = coro.send(None)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
    raise ChannelInvalidStateError("writer is None")
aiormq.exceptions.ChannelInvalidStateError: writer is None

2020-11-28 07:08:24 [7597 | REPORT]:   [29652|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned

and

$verdi process status 16663

PwRelaxWorkChain<16663> Excepted [1:while_(should_run_relax)]
    └── PwBaseWorkChain<29652> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<33293> Excepted

So also in this case it's an internal step that excepted - I'm not sure of the difference between the two

The 402 (I checked a couple) only happened at the very beginning of the submission, or a couple after a while, here are the PKs:

[1680, 1762, 1886, 1951, 2044, 2061, 2247, 2295, 2357, 2436, 2580, 2594, 2611, 2656, 2876, 2890, 26166, 26572]

Also in this case probably the problem is similar:

$ verdi process report 26166

2020-11-28 07:03:10 [6908  | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<34816>
2020-11-28 07:09:09 [8037  | REPORT]:   [34816|PwBaseWorkChain|run_process]: launching PwCalculation<35225> iteration #1
2020-11-28 11:40:36 [11802 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> run with smearing and highest band is occupied
2020-11-28 11:40:36 [11803 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: BandsData<40529> has invalid occupations: Occupation of 2.0 at last band lkn<0,0,200>
2020-11-28 11:40:36 [11804 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: PwCalculation<35225> had insufficient bands
2020-11-28 11:40:36 [11805 | REPORT]:   [34816|PwBaseWorkChain|sanity_check_insufficient_bands]: Action taken: increased number of bands to 210 and restarting from scratch
2020-11-28 11:40:36 [11806 | REPORT]:   [34816|PwBaseWorkChain|inspect_process]: PwCalculation<35225> finished successfully but a handler was triggered, restarting
2020-11-28 11:40:36 [11807 | REPORT]:   [34816|PwBaseWorkChain|run_process]: launching PwCalculation<40535> iteration #2
2020-11-28 13:02:41 [11886 | REPORT]:   [34816|PwBaseWorkChain|results]: work chain completed after 2 iterations
2020-11-28 13:02:42 [11887 | REPORT]:   [34816|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:02:42 [11888 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 1 cell volume of relaxed structure is 519.7184571850364
2020-11-28 13:02:42 [11889 | REPORT]: [26166|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<40724>
2020-11-28 13:02:43 [11890 | REPORT]:   [40724|PwBaseWorkChain|run_process]: launching PwCalculation<40727> iteration #1
2020-11-28 13:10:43 [11891 | REPORT]:   [40724|PwBaseWorkChain|results]: work chain completed after 1 iterations
2020-11-28 13:10:43 [11892 | REPORT]:   [40724|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:10:43 [11893 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: after iteration 2 cell volume of relaxed structure is 519.7184503782762
2020-11-28 13:10:43 [11894 | REPORT]: [26166|PwRelaxWorkChain|inspect_relax]: relative cell volume difference 1.3097014580438056e-08 smaller than convergence threshold 0.01
2020-11-28 13:10:43 [11895 | REPORT]: [26166|PwRelaxWorkChain|run_final_scf]: launching PwBaseWorkChain<40736> for final scf
2020-11-28 13:10:45 [11896 | REPORT]:   [40736|PwBaseWorkChain|run_process]: launching PwCalculation<40739> iteration #1
2020-11-28 13:11:45 [11898 | REPORT]:   [40736|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:11:45 [11899 | REPORT]: [26166|PwRelaxWorkChain|inspect_final_scf]: final scf PwBaseWorkChain failed with exit status 301
2020-11-28 13:11:45 [11900 | REPORT]: [26166|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

and

$verdi process status 26166

PwRelaxWorkChain<26166> Finished [402] [2:if_(should_run_final_scf)(1:inspect_final_scf)]
    ├── PwBaseWorkChain<34816> Finished [0] [7:results]
    │   ├── PwCalculation<35225> Finished [0]
    │   └── PwCalculation<40535> Finished [0]
    ├── PwBaseWorkChain<40724> Finished [0] [7:results]
    │   └── PwCalculation<40727> Finished [0]
    └── PwBaseWorkChain<40736> Finished [301] [6:while_(should_run_process)(2:inspect_process)]
        └── PwCalculation<40739> Excepted

and finally:

$ verdi process report 40739

*** 40739: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 1 LOG MESSAGES:
+-> ERROR at 2020-11-28 13:10:45.225990+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/manage/external/rmq.py", line 206, in _continue
 |     result = await super()._continue(communicator, pid, nowait, tag)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_comms.py", line 538, in _continue
 |     proc = saved_state.unbundle(self._load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 52, in unbundle
 |     return Savable.load(self, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/persistence.py", line 443, in load
 |     return load_cls.recreate_from(saved_state, load_context)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 223, in recreate_from
 |     base.call_with_super_check(process.init)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 27, in call_with_super_check
 |     wrapped(*args, **kwargs)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/process.py", line 126, in init
 |     super().init()
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/base/utils.py", line 14, in wrapper
 |     wrapped(self, *args, **kwargs)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 275, in init
 |     identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/communications.py", line 108, in add_rpc_subscriber
 |     return self._communicator.add_rpc_subscriber(converted, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 184, in add_rpc_subscriber
 |     self._communicator.add_rpc_subscriber(self._wrap_subscriber(subscriber), identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 155, in await_
 |     return self.await_submit(awaitable).result(timeout=self.task_timeout)
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 435, in result
 |     return self.__get_result()
 |   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
 |     raise self._exception
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 35, in done
 |     result = done_future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 251, in __step
 |     result = coro.throw(exc)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/pytray/aiothreads.py", line 169, in proxy
 |     return await awaitable
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 435, in add_rpc_subscriber
 |     identifier = await msg_subscriber.add_rpc_subscriber(subscriber, identifier)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/kiwipy/rmq/communicator.py", line 110, in add_rpc_subscriber
 |     rpc_queue = await self._channel.declare_queue(exclusive=True, arguments=self._rmq_queue_arguments)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/robust_channel.py", line 180, in declare_queue
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/channel.py", line 310, in declare_queue
 |     await queue.declare(timeout=timeout)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aio_pika/queue.py", line 101, in declare
 |     timeout=timeout,
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 414, in wait_for
 |     return await fut
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 713, in queue_declare
 |     timeout=timeout,
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 168, in wrap
 |     return await self.create_task(func(self, *args, **kwargs))
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/base.py", line 25, in __inner
 |     return await self.task
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
 |     yield self  # This tells Task to wait for completion.
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
 |     raise self._exception
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 249, in __step
 |     result = coro.send(None)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/aiormq/channel.py", line 121, in rpc
 |     raise ChannelInvalidStateError("writer is None")
 | aiormq.exceptions.ChannelInvalidStateError: writer is None

giovannipizzi avatar Nov 28 '20 14:11 giovannipizzi

While this seems to be a quite important bug that happens often and has quite some important consequences for users, the "good news" is that from this simple analysis it seems to be mostly generated by the same aiormq.exceptions.ChannelInvalidStateError: writer is None error, so maybe fixing this would fix the majority of the problems (except maybe for the few calculations and workflows that were left in the Created state?

giovannipizzi avatar Nov 28 '20 14:11 giovannipizzi

A final comment there are a few more e.g. this:

$  verdi process report 45613
2020-11-28 13:16:06 [12490 | REPORT]: [45613|PwRelaxWorkChain|run_relax]: launching PwBaseWorkChain<46239>
2020-11-28 13:16:17 [12531 | REPORT]:   [46239|PwBaseWorkChain|run_process]: launching PwCalculation<46565> iteration #1
2020-11-28 13:25:50 [13890 | REPORT]:   [46239|PwBaseWorkChain|on_except]: Traceback (most recent call last):
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
    next_state = await self._run_task(self._state.execute)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
    result = await coro(*args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
    result = await self._waiting_future
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

2020-11-28 13:25:50 [13893 | REPORT]:   [46239|PwBaseWorkChain|on_terminated]: remote folders will not be cleaned
2020-11-28 13:25:59 [14414 | REPORT]: [45613|PwRelaxWorkChain|on_except]: Traceback (most recent call last):
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
    next_state = await self._run_task(self._state.execute)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
    result = await coro(*args, **kwargs)
  File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/process_states.py", line 306, in execute
    result = await self._waiting_future
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

2020-11-28 13:25:59 [14419 | REPORT]: [45613|PwRelaxWorkChain|on_terminated]: remote folders will not be cleaned

The error is different, and while the workchain is failed, there some leftover orphan calculations still queued or running on the supercomputer:

$ verdi process status 45613
PwRelaxWorkChain<45613> Excepted [1:while_(should_run_relax)]
    └── PwBaseWorkChain<46239> Excepted [6:while_(should_run_process)(1:run_process)]
        └── PwCalculation<46565> Waiting

and

$ verdi process list | grep 46565
46565  53m ago    PwCalculation     ⏵ Waiting        Monitoring scheduler: job state RUNNING

giovannipizzi avatar Nov 28 '20 14:11 giovannipizzi

@giovannipizzi thanks for such elaborate bug report!

As for the 401 and None cases, after reading the traceback, though I'm not sure what's going on here, it reminds me of a piece of code haunt hideously in my head. In the traceback, the program complaint when it adding the rpc subscriber to the communicator(a LoopCommunicator), this involves calling the plum_to_kiwi_future of plumpy. In this function, there are lines never get tested by the unittests but the same code once caused problems elsewhere. That's the if isinstance(result, futures.Future): to get the result from the future and unwrap it if it's another future. But when it is a SavableFuture it is a _asyncio.Future (PY>3.6) which not an instance of plumpy.Future (a asyncio.Future). So in aiida_core I use asyncio.isfuture(result) to identify it.

I'm not sure this relates to this issue, but it might be an inspiration for how to debug this. @muhrin is more expert in this, looking forward to him for inputs.

unkcpz avatar Nov 29 '20 07:11 unkcpz

Thanks @unkcpz ! Unfortunately I'm also not an expert of that code. One important thing to mention (see #4598) is that I had been running with a lot of jobs, so (maybe?) there was just a system overload? I'm not sure, the errors are different than in #4598, but just to keep in mind. In case I could try to run again in a few days, but without "exaggerating" (e.g. only ~200 was at at time) to see if the problem appears also with low computer load. Otherwise, any help in how to debug this more would be appreciated (it's relatively simple to reproduce, I'm just running a lot of PwRelaxWorkChains).

giovannipizzi avatar Nov 30 '20 09:11 giovannipizzi

Thanks for the report @giovannipizzi. The exit codes here are not really useful. All those exit codes of the PwBaseWorkChain and PwRelaxWorkChain simply reflect that a sub process failed and so they don't provide any useful information. Note that the cases where exit_status == None that also apply to processes that are either not yet terminated, or are excepted/killed. Finding those in and of themselves then is not problematic, especially if you know that you have a lot of excepted processes. The most interesting query we should do here is simply find all the excepted processes and retrieve the corresponding exception. As you have noticed, there probably are a few types of exceptions that then cause all sorts of knock on effects.

The most critical one seems to be:

aiormq.exceptions.ChannelInvalidStateError: writer is None

Here there seems to be something wrong with the connection to RabbitMQ and so any operation that involves it will raise an exception that will cause the process to fall over. The other one:

concurrent.futures._base.CancelledError

is likely just a result of the former one. A process was waiting on a sub process, which excepted due to the first, causing the associated future to be cancelled and here I think we might not yet be catching the correct exceptions. Either because it has been changed, or because multiple ones can be thrown and we didn't expect this one.

sphuber avatar Nov 30 '20 10:11 sphuber

OK - I'll try to get more information about the actual errors/exceptions.

Anyway, would it be possible (at least for now, or maybe even in the mid term) to 'catch' these specific exceptions and just consider them as connection failures, so AiiDA's retry mechanisms is triggered and eventually the process is paused if the problem persists?

I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?

giovannipizzi avatar Nov 30 '20 12:11 giovannipizzi

I guess these are errors that can occur, e.g. if the RMQ goes down (or if it's on a different machine and the network goes down) so we should be able to allow operations to restart once the connection is re-established instead of just excepting the whole process?

That would be ideal, but I think this is not going be straightforward to implement because we cannot put this into plumpy where we catch all exceptions and retry. Some exceptions are real and need to fail the process. It won't be possible to have plumpy automatically determine which failures can be transient and need to be "retried", where the last is in brackets because even that is not straightforward. The exponential backoff mechanism is something we implemented completely on top of the Process state machine, specifically and only for the CalcJob subclass. It does not exist as a basic piece of the machinery.

sphuber avatar Nov 30 '20 14:11 sphuber

Hi Gio,

If there's any way you can still reproduce this then it might be worth looking at whether there could be a connection between the CancelledError and the ChannelInvalidStateError.

In aiormq there are only a few ways that the writer can be set to None and one of them is if a CancelledError [1] is caught in an RPC call.

So, if you can reproduce it, would you be able to enable logging for aiormq, and possibly asynio? We might see the signature "Closing channel .. because RPC call ... cancelled" warning log, but I would have thought that this would appear by default (as it is at WARNING level).

The only other ways I can see, at the moment, that the writer could be None is if the connector had a None writer when the Channel was created, or if the Connection itself was closed which I think is probably the most likely, likley due to missed heartbeats.

[1] This CancelledError is indeed a concurrent.futures._base.CancelledError if you follow the alias from asyncio

muhrin avatar Nov 30 '20 20:11 muhrin

Hi Martin, I could manage to run again in a few days maybe. Just to be sure, how should I increase the logging leve exactly? Is it something I can do from verdi config? (Just to be sure that we have all information). Also, it seems AiiDA is rotating the logs very quickly, maybe I should increase some config to avoid to loose too much information?

giovannipizzi avatar Dec 01 '20 12:12 giovannipizzi

, how should I increase the logging level exactly? Is it something I can do from verdi config?

Hi @giovannipizzi, the logger setter of asyncio (it is there to configure the tornado logger setting but I removed it since it is less used) was moved in this new commit. The easiest way(I guess so. @sphuber correct me if I'm wrong) to enable logger of aiormq and asyncio would be add them to the logger setter. Then you can config it through verdi config logging.asyncio_loglevel DEBUG.

unkcpz avatar Dec 03 '20 03:12 unkcpz

Just a short message to confirm that, with a "low throughput" (100 WorkChains/~300 processes running at the same time), everything works fine (I submitted a total of ~1000 work chains). So I guess these errors start occurring only when the machine gets slow because of too many submissions - still to investigate, but maybe to do together with #4603 and #4598

giovannipizzi avatar Dec 22 '20 11:12 giovannipizzi

first to note, there is now an logging.aiopika_loglevel config value, and also kiwipy has debugging for what messages are sent (since the aiopika logging does not give the content of the messages)

then for the aiormq.exceptions.ChannelInvalidStateError, as already mentioned it is some issue with connecting to rabbitmq (the communicator has been closed?) that is possibly beyond our control.

In Process.init we already catch a connection error, in the form of futures.TimeoutError and ignore it: https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy

            try:
                identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
                self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

this means that process does not have except, but the trade-off is that it will be unreachable if trying to use verdi kill. So we could also catch: aiormq.exceptions.ChannelInvalidStateError and accept this trade-off.

Alternatively, at a "higher level", perhaps it would be better not to ignore these exceptions but rather, in plumpy.ProcessLauncher._continue catch them and convert them to a TaskRejected exception (and also make sure this does not trigger aiida.ProcessLauncher.handle_continue_exception).

https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy/process_comms.py#L586 https://github.com/aiidateam/aiida-core/blob/c07e3ef0b9e9fff64888115aaf30479bbd76392e/aiida/manage/external/rmq.py#L206

If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers. Then you should not end up with unreachable processes.

How's this sound @muhrin @sphuber?

chrisjsewell avatar Feb 25 '21 19:02 chrisjsewell

from #4598 kiwipy.communications.DuplicateSubscriberIdentifier should also be add to the exceptions that are handled and result in a TaskRejected exception

in respect to:

           try:
                identifier = self._communicator.add_rpc_subscriber(self.message_receive, identifier=str(self.pid))
                self.add_cleanup(functools.partial(self._communicator.remove_rpc_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as an RPC subscriber', self.pid)

            try:
                identifier = self._communicator.add_broadcast_subscriber(
                    self.broadcast_receive, identifier=str(self.pid)
                )
                self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
            except kiwipy.TimeoutError:
                self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)

I guess there should be some thought on what happens if add_rpc_subscriber is successful and add_broadcast_subscriber fails, i.e. we should try to remove_rpc_subscriber (catching exceptions)?

chrisjsewell avatar Feb 25 '21 20:02 chrisjsewell

If I understand correctly, this would then mean that the process will not be started running and the continue task would be punted back to rabbitmq, to re-broadcast again to the daemon workers. Then you should not end up with unreachable processes.

Yes, if in ProcessLauncher._continue we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ. However, this might not actually be a solution. If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever. Once all daemon workers have the same problem, they will all just fail to restart processes and send the task back, causing the tasks to start ping-ponging. So the proposed fix would be one if and only if we also fix the underlying communicator problem, or determine that it is transient and not permanent for a daemon worker.

sphuber avatar Feb 26 '21 08:02 sphuber

from #4598 kiwipy.communications.DuplicateSubscriberIdentifier should also be add to the exceptions that are handled and result in a TaskRejected exception

Same story here, it would definitely be good to handle this more gracefully and reject the task instead of excepting the process. However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.

sphuber avatar Feb 26 '21 08:02 sphuber

thanks for the reply @sphuber

Yes, if in ProcessLauncher._continue we simply reject the task if we cannot successfully deserialize and restart it, the task will be requeued with RabbitMQ.

note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception: https://github.com/aiidateam/aiida-core/blob/c07e3ef0b9e9fff64888115aaf30479bbd76392e/aiida/manage/external/rmq.py#L211-L214

i.e. here we should at least catch TaskRejected exceptions separately and not call handle_continue_exception before re-raising

If the failure really is due to the communicator being broken, and it doesn't fix itself, then this daemon worker will be broken forever.

Indeed, if it is a "permanently broken" communicator then I guess we should just kill the daemon worker. Within aiida.ProcessLauncher._continue you could certainly implement an (exponential backoff) retry mechanism, whereby if the Process initiation fails, you simply sleep for some time then retry. Then perhaps if these retries fail you kill the daemon worker entirely.

However, I feel also here it would be very important to find the underlying cause of this problem because it indicates a bigger bug elsewhere.

indeed but its just so hard to debug, because it is so hard to consistently replicate. perhaps there is some way to mock rabbitmq heartbeats being missed 🤷

chrisjsewell avatar Feb 26 '21 09:02 chrisjsewell

note this is not currently the case for aiida-core, because the node is set as excepted and sealed for every exception:

I know, I said if we were to start doing that, which to me seems like a good idea indeed, provided that we fix the underlying problem of a potentially "permanently" broken communicator. Here we should consult with @muhrin because recent versions of kiwipy should have a robust communicator that reconnects automatically, in which case either rejecting the task or having an automatic retry would maybe work. The reject would definitely be the simplest.

sphuber avatar Feb 26 '21 09:02 sphuber

So just to consolidate known failure modes when adding the process rpc/broadcast reciever:

  • kiwipy.DuplicateSubscriberIdentifier: it seems this should always be rejected, because we don't want multiple instances of the same process running. (see also in #4598 and #3973)
  • aiormq.exceptions.ChannelInvalidStateError: writer is None
  • pika.exceptions.ChannelWrongStateError: Channel is closed: noted in slack
  • RuntimeError: The channel is closed: from topika.common, noted in slack, but topika is no longer used I guess?
  • future.TimoutError: this one is perhaps a bit tricky, because perhaps a timeout does not always mean that the subscriber has not been added, just that we do not receive a confirmation quick enough. In fact thinking on it now, perhaps this is related to kiwipy.DuplicateSubscriberIdentifier, since if this exception is encountered then a broadcast remove task will not be added to the process cleanup. Perhaps @giovannipizzi could check if when he sees this exception, he also sees "failed to register as a broadcast subscriber" in the daemon log? (EDIT: its probably more likely related to a heartbeat missed. but good to check)

chrisjsewell avatar Feb 26 '21 10:02 chrisjsewell

see https://github.com/aiidateam/plumpy/pull/212 for something I think will help

chrisjsewell avatar Feb 27 '21 23:02 chrisjsewell

Hey @muhrin, related to this, perhaps you could summarise your understanding of what happens when an RMQ heartbeat is missed; on the server (RMQ) and client (daemon worker) sides.

e.g. will RMQ discard the original daemon worker connection and resend any pending tasks (i.e. process continues) it was working on? How does aiormq/kiwipy handle this; will it notice that the connection has been lost and create a new one? Will the daemon worker just continue to work on tasks until it potentially excepts

chrisjsewell avatar Mar 02 '21 07:03 chrisjsewell

RabbitMQ will actually only consider a connection dead when two subsequent heartbeats are missed. In that case, all tasks that were associated with that worker are immediately requeued and so will eventually be resent. If the heartbeats were missed because the daemon worker actually died, all is working as intended. However, if the daemon worker was actually still working on the tasks and it missed the heartbeats for another reason (failure to respond because it was too busy or whatever) then we run into undefined behaviour because another worker will potentially start to run the same processes and now there will be all sorts of problems where one of the two workers will perform an action that the other already did and an exception will most likely be raised.

Note that a long time ago, @muhrin changed the kiwipy communication significantly by moving all communication with RabbitMQ to a separate thread from the main thread that does the actual business logic in running the processes. Without this separation we were indeed seeing cases where a daemon worker, when it was under heavy load, would fail to respond to the heartbeats in time and so the tasks would be requeued. After the rework, this should no longer be an issue, but there is no guarantee.

Then there is the question of the daemon worker actually losing its connection but otherwise still being functional. At some point it would and could not automatically reestablish the connection and so at some point the tasks would be requeued. I think @muhrin also worked on this in trying to create a robust connection that would automatically reconnect if broken, but I am not sure if this actually works.

sphuber avatar Mar 02 '21 08:03 sphuber

thanks @sphuber, just for some extra clarification; when we set the heartbeat to 600s in broker parameters, this will create a connection URL with ...?heartbeat=600, which says to RMQ "expect a heartbeat within a 600s frequency", then in kiwipy/aiormq a task is set up on the thread to send out this heartbeat every 600x0.5=300s (see https://github.com/mosquito/aiormq/blob/a0920bb65da2f0fc40bf589e5fb6cc1183f0aa40/aiormq/connection.py#L294), if RMQ does not receive two of these or any other traffic it closes the connection (https://www.rabbitmq.com/heartbeats.html#heartbeats-interval). Similarly, if aiormq does not receive a heartbeat within 600x3=1800s, it will close the connection on the client side.

chrisjsewell avatar Mar 02 '21 08:03 chrisjsewell

Hey lads. Sorry I don't have much time at the moment - we did the trip down to Switzerland last weekend and are currently on holiday/getting set up in CH.

To give some input on the discussions:

  • kiwipy uses robust connections which do reconnect when they can. Of course, if the connection has been dropped then all messages that were being processed by that client will have been requeued.
  • Communication is running on a separate thread and so heartbeats should get through even under heavy Python load with the exceptions of things that hold the GIL.
  • Clients can request a particular heartbeat but IIRC RMQ will always use min(server heartbeat, client heartbeat) and I think the server is set to 60s by default.
  • My suspicion for some of the the duplicate subscriber errors (for example when a laptop goes to sleep) is because the heartbeats are missed and the same subscriber tries to subscribe themselves (not knowing that they have already been 'kicked').
  • Currently there is no mechanism in kiwipy to find out when a connection has dropped and so there is no clearing of the subscribers mapping (this is a bug and should be fixed but it's not clear what the downstream consequences would be. In any case I'm happy to investigate this in the coming weeks)

muhrin avatar Mar 02 '21 12:03 muhrin

we did the trip down to Switzerland last weekend and are currently on holiday/getting set up in CH

welcome back 😄

Clients can request a particular heartbeat but IIRC RMQ will always use min(server heartbeat, client heartbeat) and I think the server is set to 60s by default.

so does this mean that with ..?heartbeat=600 RMQ will still send out a heartbeat every 60s, but will only expect one back every 600s?

chrisjsewell avatar Mar 02 '21 13:03 chrisjsewell

Currently there is no mechanism in kiwipy to find out when a connection has dropped and so there is no clearing of the subscribers mapping (this is a bug and should be fixed but it's not clear what the downstream consequences would be. In any case I'm happy to investigate this in the coming weeks)

I think this is a very important point. This could definitely explain the behaviour. If a connection is dropped and so RabbitMQ reschedules the tasks, if the worker re-establishes its connection and gets one of the tasks that it was already running, that would definitely cause that problem. The real problem here is of course that if a connection is dropped the worker should automatically and a.s.a.p stop running the tasks it had, because technically it no longer has the "right" to run them as they have been put back in the queue.

sphuber avatar Mar 02 '21 13:03 sphuber

The real problem here is of course that if a connection is dropped the worker should automatically and a.s.a.p stop running the tasks it had, because technically it no longer has the "right" to run them as they have been put back in the queue.

yep thats my thinking. In a way kiwipy's reconnect mechanism is "too elegant" lol, in that rather than reconnecting it should just raise an exception and crash that daemon worker (instantly killing all its tasks). (Then circus will just spawn a fresh worker)

chrisjsewell avatar Mar 02 '21 13:03 chrisjsewell

I think the main challenge here would be detecting the broken connection early enough. If it is not using it, it won't notice it unless we put some check in that explicitly checks every period of time.

sphuber avatar Mar 02 '21 13:03 sphuber

Could we add an option to the communicator in kiwipy, to add a callback to RobustConnection.add_reconnect_callback that does this daemon crash (i.e. raises an exception)?

chrisjsewell avatar Mar 02 '21 13:03 chrisjsewell