fast job memory accumulation
This issue is a spin-off of a PR review test here: https://github.com/cylc/cylc-flow/pull/6727
Description
In particular these two comments: https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2804388207 https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2841164577
Where it was shown that memory accumulates with fast jobs i.e. those whose scripting takes little/no time:
script = sleep 0
[[[simulation]]]
default run length = PT0S
Recent fixes in the data-store on 8.4.x related to fast jobs: https://github.com/cylc/cylc-flow/pull/6656 https://github.com/cylc/cylc-flow/pull/6589
However, it was shown unlikely to be a data-store issue, see: https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2861714854
And both upb and python implementations (i.e. export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python) of protobuf reproduce the problem... The changes I was using to test:
https://github.com/dwsutherland/cylc-flow/tree/mem-testing
https://github.com/dwsutherland/cylc-flow/compare/protobuf-upb-mem-accumulation-fix...dwsutherland:cylc-flow:mem-testing
Reproducible Example
[scheduler]
UTC mode = True
allow implicit tasks = True
[scheduling]
initial cycle point = 20200101T00
final cycle point = 20210101 # run for one year
[[graph]]
R1 = prep => foo
PT12H = foo[-PT12H] => foo => bar
[runtime]
[[root]]
script = sleep 0
[[[simulation]]]
default run length = PT0S
$ cylc vip -N
or
$ cylc vip -N --mode=simulation
$ mprof attach <pid>
$ mprof plot <file>
results in a memory increase:
setting sleep 5 results in no growth:
Expected Behaviour
No long lived memory growth
Just to put the "nail in the coffin" WRT whether it's the data-store, I bypassed all window creation, add/remove, deltas, and updates (all but creation essentially):
diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index eb368a37a..b2e0b840d 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -761,7 +761,7 @@ class DataStoreMgr:
itask:
Active/Other task proxy, passed in with pool invocation.
"""
-
+ return
# common refrences
active_id = source_tokens.id
all_walks = self.n_window_node_walks
@@ -1132,6 +1132,7 @@ class DataStoreMgr:
def remove_pool_node(self, name, point):
"""Remove ID reference and flag isolate node/branch for pruning."""
+ return
tp_id = self.id_.duplicate(
cycle=str(point),
task=name,
@@ -1160,6 +1161,7 @@ class DataStoreMgr:
def add_pool_node(self, name, point):
"""Add external ID reference for internal task pool node."""
+ return
tp_id = self.id_.duplicate(
cycle=str(point),
task=name,
@@ -2249,6 +2249,7 @@ class DataStoreMgr:
def delta_broadcast(self):
"""Collects broadcasts on change event."""
+ return
w_delta = self.updated[WORKFLOW]
w_delta.id = self.workflow_id
w_delta.last_updated = time()
@@ -2309,6 +2310,7 @@ class DataStoreMgr:
objects from the workflow task pool.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2366,6 +2368,7 @@ class DataStoreMgr:
(name, cycle, is_held).
"""
+ return
tokens = self.id_.duplicate(
task=name,
cycle=str(cycle),
@@ -2387,6 +2390,7 @@ class DataStoreMgr:
Args:
itask: TaskProxy with updated flow numbers.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2403,6 +2407,7 @@ class DataStoreMgr:
removed: Flow numbers to remove from the task proxy in the
data store.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(
Tokens(task, relative=True).duplicate(**self.id_)
@@ -2433,6 +2438,7 @@ class DataStoreMgr:
objects from the workflow task pool.
"""
+ return
# TODO: Restore incremental update when we have a protocol to do so
# https://github.com/cylc/cylc-flow/issues/6307
return self.delta_task_outputs(itask)
@@ -2464,6 +2470,7 @@ class DataStoreMgr:
objects from the workflow task pool.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2490,6 +2497,7 @@ class DataStoreMgr:
objects from the workflow task pool.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2526,6 +2534,7 @@ class DataStoreMgr:
message: Trigger message.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2553,6 +2562,7 @@ class DataStoreMgr:
satisfied (bool): Trigger message.
"""
+ return
update_time = time()
for tp_id, label in self.xtrigger_tasks.get(sig, set()):
# update task instance
@@ -2575,6 +2585,7 @@ class DataStoreMgr:
objects from the workflow task pool.
"""
+ return
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
@@ -2591,6 +2602,7 @@ class DataStoreMgr:
# -----------
def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
"""Add message to job."""
+ return
j_id, job = self.store_node_fetcher(tokens)
if not job:
return
@@ -2614,6 +2626,7 @@ class DataStoreMgr:
attr_val: Any,
) -> None:
"""Set job attribute."""
+ return
j_id, job = self.store_node_fetcher(tokens)
if not job:
return
@@ -2631,6 +2644,7 @@ class DataStoreMgr:
status: str,
) -> None:
"""Set job state."""
+ return
j_id, job = self.store_node_fetcher(tokens)
if not job or status not in JOB_STATUS_SET:
return
@@ -2654,6 +2668,7 @@ class DataStoreMgr:
Set values of both event_key + '_time' and event_key + '_time_string'.
"""
+ return
j_id, job = self.store_node_fetcher(tokens)
if not job:
return
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index 46266a9d4..1fce1e189 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1986,8 +1986,8 @@ class Scheduler:
# Publish any existing before potentially creating more
- self._publish_deltas()
+ #self._publish_deltas()
# Collect/apply data store updates/deltas
- self.data_store_mgr.update_data_structure()
- self._publish_deltas()
+ #self.data_store_mgr.update_data_structure()
+ #self._publish_deltas()
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
and graph still goes up:
So, once again .. not the data-store
(I deleted several comments, redoing my tests with the datastore "turned off").
OK, turned the datastore off, by returning from most of its methods immediately.
16k tasks in sim mode:
[scheduling]
initial cycle point = 20200101T00
final cycle point = 20210101 # run for one year
[[graph]]
PT12H = "foo<m> => bar<m>" # m = 0..10
With default run length = PT0S:
With =default run length = PT5S I don't get a flat profile:
🤯
It stays flat until a good way through the workflow run, and then suddenly goes up. That suggests, I think, we don't have a memory leak in the scheduling process - otherwise it should go up more gradually (this is a pretty fine-grained workflow).
So maybe this is just memory fragmentation - Python allocates a chunk of memory and recycles that (for many objects) until fragmentation eventually clogs up the chunk with useless small holes, at which point it has to allocate another chunk.
Fragmentation can cause increasing memory footprint, even if there are no memory leaks.
Cylc is probably a prime candidate for this - a large long-running application that is constantly allocating and deallocating memory for thousands of small objects (task proxies etc.).
A blast from the past: https://github.com/cylc/cylc-flow/issues/1222#issuecomment-73823085
I tried the same again with live mode sleep 5 instead of simulation sleep 5, and got a more gradual increase over time:
I tried the same again with live mode sleep 5 instead of simulation sleep 5, and got a more gradual increase over time:
This is a revealing result. I'm assuming the live plot is the lower one?
I'm guessing this ramping up doesn't appear in cylc vip --main-loop='log memory' results? (note the profiler results will accumulate with this plugin, but that should be the only thing).
I think we should look into the job submission process and subprocess management for the source of the difference.
Actually they're both the same plot, the lower one just expands the y-axis to see more detail (should have mentioned that, sorry).
Here are two confusing plots generated using the above example.
Pympler identified slow memory accumulation in the config, so I took a peek and it turned out that schd.config.edges is gradually accumulating memory:
But on closer inspection I found that these edges were not being modified. So I commented them out of the code, only to find that the memory growth switched to the taskdefs:
Notice the exact same notched pattern in both graphs.
It looks like Pympler has identified some form of memory accumulation, but can't work out what to associate it with?
It looks like Pympler has identified some form of memory accumulation, but can't work out what to associate it with?
Wow yeah, promising though.. do they share the same references in one direction or another?
And that's not pympler accumulation? (I assume not)
I think lru_cache is a large part of this. I get a different looking memory graph when I turn it off.
If you test using this profiler with a [scheduler][main loop][log tracemalloc]interval = PT1M, the lru_cache's show up as the biggest user.
It's possible that the memory will stabilise over a longer run.
Note, the cache in lru_cache is a floating variable within the wrapper, it doesn't get assigned to the function itself or any object within the application, so will not show in most memory traces. We should probs have an easy switch to turn lcu_caching off for profiling...