aiida-core
aiida-core copied to clipboard
π IMPROVE: Garbage collect on process termination
Partially addresses #4603
After completion of aiida-sleep calc -n 1 -t 1 -p 500000 -o 500000 --submit (on https://github.com/chrisjsewell/aiida-integration-tests):
without:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
2969011fe639 aiida-int-core 4.60% 915.9MiB / 1.942GiB 46.05% 2.58GB / 699MB 813MB / 11.8MB 84
with:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
2969011fe639 aiida-int-core 3.50% 816.2MiB / 1.942GiB 41.04% 2.73GB / 738MB 822MB / 11.8MB 84
so it definitely makes a difference, but see below for more debugging
EDIT:
ooo actually, if you change asyncio.sleep(0) to asyncio.sleep(1):
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS
2969011fe639 aiida-int-core 3.31% 512.6MiB / 1.942GiB 25.77% 3.05GB / 832MB 1.71GB / 12.3MB 95
and the process is gone π
EDIT: if you change asyncio.sleep(0) to asyncio.sleep(1) the SleepCalculation is now gone
To debug, after gc.collect(), I added:
from pympler import summary, muppy, refbrowser
import pprint
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
summary.print_(sum1)
dicts = [
o
for o in all_objects
if hasattr(o, "__class__") and isinstance(o, dict)
]
print("Large dicts:", len([d for d in dicts if len(d) > 1000]))
from aiida_sleep.sleep_job import SleepCalculation
calcs = [
o
for o in all_objects
if hasattr(o, "__class__") and isinstance(o, SleepCalculation)
]
print("SleepCalculations:", len(calcs))
print(calcs[0])
print()
cb = refbrowser.ConsoleBrowser(calcs[0], maxdepth=14)
tree = cb.get_tree()
cb.print_tree(tree)
print("\nSleepCalculation attributes:")
pprint.pprint(calcs[0].__dict__)
print("\nRmqSubscriber attributes:")
pprint.pprint(calcs[0]._communicator._communicator._communicator._message_subscriber.__dict__)
print("\nRmqTaskSubscriber attributes:")
pprint.pprint(calcs[0]._communicator._communicator._communicator._default_task_queue._subscriber.__dict__)
print("\nRmqTaskPublisher attributes:")
pprint.pprint(calcs[0]._communicator._communicator._communicator._default_task_queue._publisher.__dict__)
Then run aiida-sleep calc -n 1 -t 1 -p 100000 -o 100000 --submit on https://github.com/chrisjsewell/aiida-integration-tests
You can see that the Process is stil in memory π
The reference SleepCalculation <-> plumpy.process_states.Finished is known (https://github.com/aiidateam/plumpy/issues/198) and would anyway be garbage cleaned if it was the only reference.
So the SleepCalculation.broadcast_receive is what is keeping it in memory.
This is added as a broadcast_subscriber here: https://github.com/aiidateam/plumpy/blob/b1bde82403be36a76525b0c6359a175a422c0c1c/plumpy/processes.py#L302-L305, but the subscriber is also cleaned up when closing the Process (and you can see below that it is closed).
The functools.partial it is refering to is possibly from convert_to_comm: https://github.com/aiidateam/plumpy/blob/b1bde82403be36a76525b0c6359a175a422c0c1c/plumpy/communications.py#L58
I'm not sure if this should still exist at this point π€·
types | # objects | total size
======================================= | =========== | ============
dict | 33565 | 25.66 MB
str | 80138 | 12.14 MB
code | 29055 | 4.95 MB
type | 4503 | 4.32 MB
tuple | 30251 | 1.68 MB
set | 1865 | 810.65 KB
list | 7258 | 700.44 KB
weakref | 7983 | 561.30 KB
abc.ABCMeta | 396 | 407.97 KB
sqlalchemy.sql.visitors.VisitableType | 378 | 393.68 KB
function (__init__) | 2194 | 291.39 KB
builtin_function_or_method | 4007 | 281.74 KB
collections.deque | 411 | 250.45 KB
int | 8583 | 242.69 KB
frozenset | 859 | 232.45 KB
Large dicts: 10
SleepCalculations: 1
<SleepCalculation> (ProcessState.FINISHED)
aiida_sleep.sleep_job.SleepCalculation-+-method-+-dict-+-function (broadcast_receive)-+-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method-+-asyncio.events.Handle-+-collections.deque-+-dict-+-asyncio.unix_events._UnixSelectorEventLoop
| | | | | | | | | | | | +-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-cell-+-tuple-+-function (_call_check_cancel)-+-list-+-dict
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | |
| | | +-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method-+-asyncio.events.Handle-+-collections.deque-+-dict-+-asyncio.unix_events._UnixSelectorEventLoop
| | | | | | | | | | | | +-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-cell-+-tuple-+-function (_call_check_cancel)-+-list-+-dict
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | |
| | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | |
| | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| |
| +-cell-+-tuple-+-function (broadcast_receive)-+-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method-+-asyncio.events.Handle-+-collections.deque-+-dict
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-cell-+-tuple-+-function (_call_check_cancel)-+-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method-+-asyncio.events.Handle-+-collections.deque-+-dict
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-cell-+-tuple-+-function (_call_check_cancel)-+-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | |
| | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | |
| | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| |
| +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
|
+-dict-+-plumpy.process_states.Finished-+-dict-+-aiida_sleep.sleep_job.SleepCalculation-+-method-+-dict-+-function (broadcast_receive)-+-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method
| | | | | | | | | | | | +-cell
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | |
| | | | | | | +-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task-+-method
| | | | | | | | | | | | +-cell
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | |
| | | | | +-cell-+-tuple-+-function (broadcast_receive)-+-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task
| | | | | | | | | | | | +-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-functools.partial-+-cell-+-frame (codename: run_task)--coroutine-+-dict-+-asyncio.tasks.Task
| | | | | | | | | | | | +-list
| | | | | | | | | | | | +-list
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-dict-+-plumpy.process_states.Finished-+-dict-+-aiida_sleep.sleep_job.SleepCalculation-+-method-+-dict-+-function (broadcast_receive)-+-functools.partial-+-cell-+-frame (codename: run_task)
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | |
| | | | | | | | | | | +-functools.partial-+-cell-+-frame (codename: run_task)
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | |
| | | | | | | | | +-cell-+-tuple-+-function (broadcast_receive)-+-functools.partial-+-cell
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-functools.partial-+-cell
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-dict-+-plumpy.process_states.Finished-+-dict-+-aiida_sleep.sleep_job.SleepCalculation-+-method-+-dict
| | | | | | | | | | | | | +-cell
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-dict-+-plumpy.process_states.Finished
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | | +-list
| | | | | | | | | | | | |
| | | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | | +-list--frame (codename: _continue)
| | | | | | | | | | | |
| | | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | | | +-list--frame (codename: _continue)--coroutine
| | | | | | | | | | |
| | | | | | | | | | +-list--frame (codename: _continue)--coroutine--list
| | | | | | | | | |
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | | | | |
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | | | |
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | | | |
| | | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | | | |
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| | | | |
| | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| | | |
| | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
| | |
| | +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine
| |
| +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
| +-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list
|
+-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
+-list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)--coroutine--list--frame (codename: _continue)
SleepCalculation attributes:
{'_Process__event_helper': <plumpy.utils.EventHelper object at 0x7f5331142df0>,
'_called': 0,
'_cleanups': None,
'_closed': True,
'_communicator': <plumpy.communications.LoopCommunicator object at 0x7f5330e9b6a0>,
'_creation_time': 1613767584.6924987,
'_debug': False,
'_enable_persistence': True,
'_event_callbacks': {},
'_exception_handler': None,
'_future': <SavableFuture finished result={'out_array': <ArrayData: u...cb9 (pk: 394)>, 'out_dict': <Dict: uuid: ...189 (pk: 393)>, 'remote_folder': <RemoteData: ...14b (pk: 390)>, 'result': <Bool: uuid: ...) value: True>, ...}>,
'_interrupt_action': None,
'_logger': <LoggerAdapter aiida.orm.nodes.process.calculation.calcjob.CalcJobNode (REPORT)>,
'_loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
'_node': <CalcJobNode: uuid: 984e74dd-f66b-4495-b499-fc1608e729d7 (pk: 389) (aiida.calculations:sleep)>,
'_outputs': {'out_array': <ArrayData: uuid: e12d7113-9bb0-4a57-ad4b-cae9aa09acb9 (pk: 394)>,
'out_dict': <Dict: uuid: bd5c406f-cccf-435f-867d-c4ee11250189 (pk: 393)>,
'remote_folder': <RemoteData: uuid: ee29f813-50e7-4acb-a25b-ae467ff4414b (pk: 390)>,
'result': <Bool: uuid: a99f127a-9ea4-4744-8768-262719494436 (pk: 392) value: True>,
'retrieved': <FolderData: uuid: 4e51e58c-c9a7-4b5e-b007-f36e091b65f2 (pk: 391)>},
'_parent_pid': None,
'_parsed_inputs': <AttributesFrozendict {'code': <Code: Remote code 'sleep' on slurm, pk: 2, uuid: 22298c59-af81-4b6e-a2db-c30ebf57a247>, 'metadata': <AttributesFrozendict {'call_link_label': 'CALL', 'dry_run': False, 'options': <AttributesFrozendict {'append_text': '', 'custom_scheduler_commands': '', 'environment_variables': {}, 'fail_calcjob': False, 'import_sys_environment': True, 'input_filename': 'aiida.in', 'mpirun_extra_params': [], 'output_array_size': 100, 'output_dict_size': 100000, 'output_filename': 'aiida.out', 'parser_name': 'sleep', 'payload_filename': 'payload.json', 'prepend_text': '', 'resources': {'num_machines': 1, 'num_mpiprocs_per_machine': 1}, 'scheduler_stderr': '_scheduler-stderr.txt', 'scheduler_stdout': '_scheduler-stdout.txt', 'submit_script_filename': '_aiidasubmit.sh', 'withmpi': False}>, 'store_provenance': True}>, 'payload': <Dict: uuid: bae3851d-ccd7-455a-a3ee-1d695d738376 (pk: 388)>, 'time': <Int: uuid: adf591bd-1f68-4d3d-a1c2-290224fd6f5b (pk: 387) value: 1>}>,
'_paused': None,
'_persist_configured': True,
'_pid': 389,
'_pre_paused_status': None,
'_raw_inputs': <AttributesFrozendict {'code': <Code: Remote code 'sleep' on slurm, pk: 2, uuid: 22298c59-af81-4b6e-a2db-c30ebf57a247>, 'metadata': {'call_link_label': 'CALL', 'dry_run': False, 'options': <AttributesFrozendict {'append_text': '', 'custom_scheduler_commands': '', 'environment_variables': {}, 'fail_calcjob': False, 'import_sys_environment': True, 'input_filename': 'aiida.in', 'mpirun_extra_params': [], 'output_array_size': 100, 'output_dict_size': 100000, 'output_filename': 'aiida.out', 'parser_name': 'sleep', 'payload_filename': 'payload.json', 'prepend_text': '', 'resources': {'num_machines': 1, 'num_mpiprocs_per_machine': 1}, 'scheduler_stderr': '_scheduler-stderr.txt', 'scheduler_stdout': '_scheduler-stdout.txt', 'submit_script_filename': '_aiidasubmit.sh', 'withmpi': False}>, 'store_provenance': True}, 'payload': <Dict: uuid: bae3851d-ccd7-455a-a3ee-1d695d738376 (pk: 388)>, 'time': <Int: uuid: adf591bd-1f68-4d3d-a1c2-290224fd6f5b (pk: 387) value: 1>}>,
'_runner': <aiida.engine.runners.Runner object at 0x7f5330c9d160>,
'_state': <plumpy.process_states.Finished object at 0x7f5328d12be0>,
'_status': None,
'_stepping': False,
'_transition_failing': False,
'_transitioning': False}
RmqSubscriber attributes:
{'_broadcast_consumer_tag': None,
'_broadcast_queue': <Queue(broadcast-n3nNu9KTcvXCAVCcyz2bha): auto_delete=False, durable=None, exclusive=True, arguments={'x-message-ttl': 66000}>,
'_broadcast_queue_arguments': {'x-message-ttl': 66000},
'_broadcast_subscribers': {},
'_channel': <RobustChannel "amqp://guest:******@rmq:5672/?heartbeat=600#3">,
'_connection': <RobustConnection: "amqp://guest:******@rmq:5672/?heartbeat=600" 4 channels>,
'_decode': <function deserialize at 0x7f5330ddbd30>,
'_exchange': <Exchange(aiida-49e364abe4394599af772f4bd4dba24a.messages): auto_delete=False, durable=None, arguments={})>,
'_exchange_name': 'aiida-49e364abe4394599af772f4bd4dba24a.messages',
'_response_encode': functools.partial(<function serialize at 0x7f5330ddbb80>, encoding='utf-8'),
'_rmq_queue_arguments': {'x-expires': 60000, 'x-message-ttl': 66000},
'_rpc_subscribers': {},
'_testing_mode': False}
RmqTaskSubscriber attributes:
{'_channel': <RobustChannel "amqp://guest:******@rmq:5672/?heartbeat=600#1">,
'_connection': <RobustConnection: "amqp://guest:******@rmq:5672/?heartbeat=600" 4 channels>,
'_consumer_tag': 'ctag1.dbbf903a7183fe31a95b2f1eeda45d7e',
'_decode': <function deserialize at 0x7f5330ddbd30>,
'_encode': functools.partial(<function serialize at 0x7f5330ddbb80>, encoding='utf-8'),
'_exchange': <Exchange(aiida-49e364abe4394599af772f4bd4dba24a.tasks): auto_delete=False, durable=None, arguments={})>,
'_exchange_name': 'aiida-49e364abe4394599af772f4bd4dba24a.tasks',
'_exchange_params': {'type': <ExchangeType.TOPIC: 'topic'>},
'_is_closing': False,
'_loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
'_pending_tasks': [],
'_prefetch_count': 200,
'_prefetch_size': 0,
'_subscribers': {'TfxRzzk9p2jMnEgUDMFzpv': <function convert_to_comm.<locals>.converted at 0x7f5331193e50>},
'_task_queue': <Queue(aiida-49e364abe4394599af772f4bd4dba24a.process.queue): auto_delete=False, durable=True, exclusive=False, arguments={'x-message-ttl': 604800000}>,
'_task_queue_name': 'aiida-49e364abe4394599af772f4bd4dba24a.process.queue',
'_testing_mode': False}
RmqTaskPublisher attributes:
{'_awaiting_response': {},
'_channel': <RobustChannel "amqp://guest:******@rmq:5672/?heartbeat=600#2">,
'_confirm_deliveries': True,
'_connection': <RobustConnection: "amqp://guest:******@rmq:5672/?heartbeat=600" 4 channels>,
'_delivery_info': deque([]),
'_encode': functools.partial(<function serialize at 0x7f5330ddbb80>, encoding='utf-8'),
'_exchange': <Exchange(aiida-49e364abe4394599af772f4bd4dba24a.tasks): auto_delete=False, durable=None, arguments={})>,
'_exchange_name': 'aiida-49e364abe4394599af772f4bd4dba24a.tasks',
'_exchange_params': {'type': <ExchangeType.TOPIC: 'topic'>},
'_is_closing': False,
'_num_published': 0,
'_reply_queue': <Queue(aiida-49e364abe4394599af772f4bd4dba24a.tasks-reply-a5ff4d31-c27c-4f9f-83fa-2fbe540df9b3): auto_delete=False, durable=None, exclusive=True, arguments={'x-expires': 60000}>,
'_response_decode': <function deserialize at 0x7f5330ddbd30>,
'_task_queue_name': 'aiida-49e364abe4394599af772f4bd4dba24a.process.queue',
'_testing_mode': False}
Codecov Report
Merging #4767 (341fe9b) into develop (90a1987) will decrease coverage by
0.02%. The diff coverage is33.34%.
@@ Coverage Diff @@
## develop #4767 +/- ##
===========================================
- Coverage 79.37% 79.36% -0.01%
===========================================
Files 485 485
Lines 36154 36160 +6
===========================================
+ Hits 28694 28695 +1
- Misses 7460 7465 +5
| Flag | Coverage Ξ | |
|---|---|---|
| django | 73.96% <33.34%> (-<0.01%) |
:arrow_down: |
| sqlalchemy | 72.86% <33.34%> (-<0.01%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Ξ | |
|---|---|---|
| aiida/manage/external/rmq.py | 44.22% <33.34%> (-0.73%) |
:arrow_down: |
| aiida/transports/plugins/local.py | 81.54% <0.00%> (-0.25%) |
:arrow_down: |
Continue to review full report at Codecov.
Legend - Click here to learn more
Ξ = absolute <relative> (impact),ΓΈ = not affected,? = missing dataPowered by Codecov. Last update 90a1987...0e4a7f6. Read the comment docs.
with asyncio.sleep(1) then:
types | # objects | total size
======================================= | =========== | ============
str | 80061 | 12.13 MB
dict | 33410 | 10.62 MB
code | 29054 | 4.96 MB
type | 4503 | 4.32 MB
tuple | 30238 | 1.68 MB
set | 1864 | 810.44 KB
list | 7236 | 698.88 KB
weakref | 7981 | 561.16 KB
abc.ABCMeta | 396 | 407.97 KB
sqlalchemy.sql.visitors.VisitableType | 378 | 393.68 KB
function (__init__) | 2194 | 291.39 KB
builtin_function_or_method | 3987 | 280.34 KB
collections.deque | 407 | 248.02 KB
int | 8570 | 242.28 KB
frozenset | 859 | 232.45 KB
Large dicts: 7
SleepCalculations: 0
@ltalirz what do you think?
I guess the question is what should be the value of asyncio.sleep; 0 evidently does not work, but will 1 always be OK, more/less and/or perhaps a new config option
Thanks @chrisjsewell for figuring this out!
Do we know which tasks are the culprit here, i.e. which tasks are preventing automatic garbage collection without the sleep?
Depending on the tasks, no value of the sleep time may be safe...
Of course it would be best to figure this out.. on the other hand, we could even schedule a periodic garbage collection call using call_later...
we could even schedule a periodic garbage collection call
Hmm, I think that's what Python does anyway. Of course we then don't have control over when it happens.
I haven't followed the context here - but at first glance, it seems that if asyncio.sleep and gc.collect manage to get rid of the process, there is no memory leak. The normal garbage-collecting would get rid of it evenutally even without sleep and collect.
Of course the question then becomes how we test that this remains the case - but maybe the sleeping and collecting should go into the test code, not the production code.
Hmm, I think that's what Python does anyway
The problem here is likely with cyclic references, which aren't garbage-collected automatically
which aren't garbage-collected automatically
They should be.. Python memory management has two components:
- Reference counting: as soon as the ref counter hits zero, the object is destroyed
- The garbage collector, which identifies cyclic references that are no longer reachable. This runs "periodically", so you can't rely on when it happens, but it will happen eventually [1][2].
The gc.collect call just forces the second step to run immediately. But since we need to wait with asyncio.sleep it seems there's some other task keeping the process alive that needs to finish before we can deallocate it.
That could be a problem, depending on whether that task can be long-lived in a production scenario. But AFAICT asyncio.sleep and gc.collect shouldn't significantly change the (long-term) memory usage.
[1] Unless someone called gc.disable...
[2] Timing of the garbage collector runs is configurable, see https://docs.python.org/3/library/gc.html#gc.set_threshold
Sorry, you are right.
I even once looked into the default parameters for this...
Of course, it can still be useful to run garbage collection after a process is completed to avoid that the corresponding objects stay in memory until the next process starts (and, ideally, we would just get rid of the cyclic references if possible).
I even once looked into the default parameters for this...
Nice, just looked at these parameters right now π
Of course, it can still be useful to run garbage collection after a process is completed
Yeah, if we know when the cyclic references should turn stale it makes sense to run manually. But that wouldn't fix any memory leaks, just improve usage in general.
I think the real question here is what is keeping the objects in memory that makes the asyncio.sleep necessary. That seems to imply there's a different coroutine holding onto the objects - only when it has completed does the gc.collect work (or maybe, even just letting ref-counting do its magic would be enough then).
Oh, we should also check if any of the objects in cycles have __del__ methods: Those won't be collected because it can't figure out the correct order, see https://docs.python.org/3/library/gc.html#gc.garbage
Found that via another answer on the page you linked.
EDIT: Hmm, since PEP 442 I think this should be less of a problem.
This runs "periodically"
@greschd are you sure about this? Nothing in https://docs.python.org/3/library/gc.html#gc.set_threshold suggests that it runs on a timer, just that it will trigger more easily with a different threshold. Also I've never seen the memory change over time, after all processes have finished, only when garbage collection is actually called.
Good point, it's not a timer per se, it's a function of how many objects are allocated / deallocated:
In order to decide when to run, the collector keeps track of the number object allocations and deallocations since the last collection. When the number of allocations minus the number of deallocations exceeds threshold0, collection starts. Initially only generation 0 is examined. If generation 0 has been examined more than threshold1 times since generation 1 has been examined, then generation 1 is examined as well. With the third generation, things are a bit more complicated, see Collecting the oldest generation for more information.
from https://docs.python.org/3/library/gc.html#gc.set_threshold.
So if all we do is sleep, I guess that wouldn't cause a GC run - but "normal" operation should.
The "allocations - deallocations" is a measure for how the total number of objects grows -- you would expect that to keep growing if there's a memory leak due to cyclic references, right?
But yeah, I'm sure we can come up with a scenario where the number of objects is large, then GC runs (but they're still alive), then the number of objects drops and the GC doesn't run again for a long time.
Still, I think the main thing to figure out here is who keeps the process objects alive, which makes the sleep necessary.
Ideally, we could get rid of the cyclic references here, because ref-counting is much more well-behaved.
then the number of objects drops and the GC doesn't run again for a long time.
If everything has finished running then I wouldn't expect the GC to ever run again, because there would be nothing to trigger it.
Still, I think the main thing to figure out here is who keeps the process objects alive, which makes the sleep necessary.
see https://github.com/aiidateam/aiida-core/pull/4767#issuecomment-782403943, its because the broadcast subscriber has not yet been removed
Ideally, we could get rid of the cyclic references here, because ref-counting is much more well-behaved.
See https://github.com/aiidateam/plumpy/pull/205 (which is the last cyclic not addressed from #4603), but that's quite an "aggressive" change, which I'm worried could have side-effects so don't want to rush through. That is also only for the actuall Process, there may well be other things in memory.
If everything has finished running then I wouldn't expect the GC to ever run again, because there would be nothing to trigger it.
Right, but that is a "testing" scenario. In a production daemon it should eventually run, no? So to test for the memory leak we can just run gc.collect in the test code.
We can also put in a gc.collect just to be on the safe side. As long as it isn't too deep in a loop, because it's quite an expensive operation, scaling with how many objects there are:
On a fresh ipython:
In [1]: import gc
In [2]: %timeit gc.collect()
100 loops, best of 3: 8.76 ms per loop
On an fresh verdi shell:
In [1]: import gc
In [2]: %timeit gc.collect()
47.7 ms Β± 980 Β΅s per loop (mean Β± std. dev. of 7 runs, 10 loops each)
In a production daemon it should eventually run, no?
not until you submit more processes
As long as it isn't too deep in a loop, because it's quite an expensive operation
yeh fair, well its triggered once a process completes
To note, I'm certainly not suggesting that this is the complete solution, but I think it could compliment other efforts, to try to reduce peaks in memory usage.
yeh fair, well its triggered once a process completes
Yeah, I think that's sensible.
Looking at https://github.com/aiidateam/plumpy/blob/develop/plumpy/process_comms.py, the _launch coroutine also seems like it can complete a process - does it also need modification? I'm completely unfamiliar with that part of the code though, so could very well be wrong.
To note, I'm certainly not suggesting that this is the complete solution, but I think it could compliment other efforts, to try to reduce peaks in memory usage.
:+1:
does it also need modification? I'm completely unfamiliar with that part of the code though, so could very well be wrong.
see my lovely new diagram π https://github.com/aiidateam/aiida-core/pull/4766, essentially all processes on daemon runners are re-created and run with _continue (not _launch)
see my lovely new diagram π
Nice π So when is _launch used? When using engine.run instead of submit?
Nice π So when is _launch used? When using engine.run instead of submit?
cheers!
No, unless I'm missing something, it is never used within aiida-core; you either directly execute the process for run (Process.execute()), or for submit you locally instantiate the process, checkpoint it (store the node) then send a continue task to RabbitMQ (then a daemon runner continues it).