cylc-flow icon indicating copy to clipboard operation
cylc-flow copied to clipboard

fast job memory accumulation

Open dwsutherland opened this issue 7 months ago • 9 comments

This issue is a spin-off of a PR review test here: https://github.com/cylc/cylc-flow/pull/6727

Description

In particular these two comments: https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2804388207 https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2841164577

Where it was shown that memory accumulates with fast jobs i.e. those whose scripting takes little/no time:

        script = sleep 0
        [[[simulation]]]
            default run length = PT0S

Recent fixes in the data-store on 8.4.x related to fast jobs: https://github.com/cylc/cylc-flow/pull/6656 https://github.com/cylc/cylc-flow/pull/6589

However, it was shown unlikely to be a data-store issue, see: https://github.com/cylc/cylc-flow/pull/6727#issuecomment-2861714854

And both upb and python implementations (i.e. export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python) of protobuf reproduce the problem... The changes I was using to test: https://github.com/dwsutherland/cylc-flow/tree/mem-testing https://github.com/dwsutherland/cylc-flow/compare/protobuf-upb-mem-accumulation-fix...dwsutherland:cylc-flow:mem-testing

Reproducible Example

[scheduler]
    UTC mode = True
    allow implicit tasks = True

[scheduling]
    initial cycle point = 20200101T00
    final cycle point = 20210101  # run for one year
    [[graph]]
        R1 = prep => foo
        PT12H = foo[-PT12H] => foo => bar

[runtime]
    [[root]]
        script = sleep 0
        [[[simulation]]]
            default run length = PT0S
$ cylc vip -N

or

$ cylc vip -N --mode=simulation
$ mprof attach <pid>
$ mprof plot <file>

results in a memory increase: image

setting sleep 5 results in no growth: image

Expected Behaviour

No long lived memory growth

dwsutherland avatar May 08 '25 04:05 dwsutherland

Just to put the "nail in the coffin" WRT whether it's the data-store, I bypassed all window creation, add/remove, deltas, and updates (all but creation essentially):

diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index eb368a37a..b2e0b840d 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -761,7 +761,7 @@ class DataStoreMgr:
             itask:
                 Active/Other task proxy, passed in with pool invocation.
         """
-
+        return
         # common refrences
         active_id = source_tokens.id
         all_walks = self.n_window_node_walks
@@ -1132,6 +1132,7 @@ class DataStoreMgr:
 
     def remove_pool_node(self, name, point):
         """Remove ID reference and flag isolate node/branch for pruning."""
+        return
         tp_id = self.id_.duplicate(
             cycle=str(point),
             task=name,
@@ -1160,6 +1161,7 @@ class DataStoreMgr:
 
     def add_pool_node(self, name, point):
         """Add external ID reference for internal task pool node."""
+        return
         tp_id = self.id_.duplicate(
             cycle=str(point),
             task=name,
@@ -2249,6 +2249,7 @@ class DataStoreMgr:
 
     def delta_broadcast(self):
         """Collects broadcasts on change event."""
+        return
         w_delta = self.updated[WORKFLOW]
         w_delta.id = self.workflow_id
         w_delta.last_updated = time()
@@ -2309,6 +2310,7 @@ class DataStoreMgr:
                 objects from the workflow task pool.
 
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2366,6 +2368,7 @@ class DataStoreMgr:
                 (name, cycle, is_held).
 
         """
+        return
         tokens = self.id_.duplicate(
             task=name,
             cycle=str(cycle),
@@ -2387,6 +2390,7 @@ class DataStoreMgr:
         Args:
             itask: TaskProxy with updated flow numbers.
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2403,6 +2407,7 @@ class DataStoreMgr:
             removed: Flow numbers to remove from the task proxy in the
                 data store.
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(
             Tokens(task, relative=True).duplicate(**self.id_)
@@ -2433,6 +2438,7 @@ class DataStoreMgr:
                 objects from the workflow task pool.
 
         """
+        return
         # TODO: Restore incremental update when we have a protocol to do so
         # https://github.com/cylc/cylc-flow/issues/6307
         return self.delta_task_outputs(itask)
@@ -2464,6 +2470,7 @@ class DataStoreMgr:
                 objects from the workflow task pool.
 
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2490,6 +2497,7 @@ class DataStoreMgr:
                 objects from the workflow task pool.
 
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2526,6 +2534,7 @@ class DataStoreMgr:
             message: Trigger message.
 
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2553,6 +2562,7 @@ class DataStoreMgr:
             satisfied (bool): Trigger message.
 
         """
+        return
         update_time = time()
         for tp_id, label in self.xtrigger_tasks.get(sig, set()):
             # update task instance
@@ -2575,6 +2585,7 @@ class DataStoreMgr:
                 objects from the workflow task pool.
 
         """
+        return
         tproxy: Optional[PbTaskProxy]
         tp_id, tproxy = self.store_node_fetcher(itask.tokens)
         if not tproxy:
@@ -2591,6 +2602,7 @@ class DataStoreMgr:
     # -----------
     def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
         """Add message to job."""
+        return
         j_id, job = self.store_node_fetcher(tokens)
         if not job:
             return
@@ -2614,6 +2626,7 @@ class DataStoreMgr:
         attr_val: Any,
     ) -> None:
         """Set job attribute."""
+        return
         j_id, job = self.store_node_fetcher(tokens)
         if not job:
             return
@@ -2631,6 +2644,7 @@ class DataStoreMgr:
         status: str,
     ) -> None:
         """Set job state."""
+        return
         j_id, job = self.store_node_fetcher(tokens)
         if not job or status not in JOB_STATUS_SET:
             return
@@ -2654,6 +2668,7 @@ class DataStoreMgr:
 
         Set values of both event_key + '_time' and event_key + '_time_string'.
         """
+        return
         j_id, job = self.store_node_fetcher(tokens)
         if not job:
             return
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index 46266a9d4..1fce1e189 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1986,8 +1986,8 @@ class Scheduler:
         # Publish any existing before potentially creating more
-         self._publish_deltas()
+        #self._publish_deltas()
         # Collect/apply data store updates/deltas
-        self.data_store_mgr.update_data_structure()
-         self._publish_deltas()
+        #self.data_store_mgr.update_data_structure()
+        #self._publish_deltas()
         # Database update
         self.workflow_db_mgr.put_task_pool(self.pool)

and graph still goes up:

Image

So, once again .. not the data-store

dwsutherland avatar May 09 '25 04:05 dwsutherland

(I deleted several comments, redoing my tests with the datastore "turned off").

hjoliver avatar May 22 '25 06:05 hjoliver

OK, turned the datastore off, by returning from most of its methods immediately.

16k tasks in sim mode:

[scheduling]
    initial cycle point = 20200101T00
    final cycle point = 20210101  # run for one year
    [[graph]]
        PT12H = "foo<m> => bar<m>"  # m = 0..10

With default run length = PT0S:

Image

With =default run length = PT5S I don't get a flat profile:

Image

🤯

It stays flat until a good way through the workflow run, and then suddenly goes up. That suggests, I think, we don't have a memory leak in the scheduling process - otherwise it should go up more gradually (this is a pretty fine-grained workflow).

So maybe this is just memory fragmentation - Python allocates a chunk of memory and recycles that (for many objects) until fragmentation eventually clogs up the chunk with useless small holes, at which point it has to allocate another chunk.

Fragmentation can cause increasing memory footprint, even if there are no memory leaks.

Cylc is probably a prime candidate for this - a large long-running application that is constantly allocating and deallocating memory for thousands of small objects (task proxies etc.).

A blast from the past: https://github.com/cylc/cylc-flow/issues/1222#issuecomment-73823085

hjoliver avatar May 22 '25 07:05 hjoliver

I tried the same again with live mode sleep 5 instead of simulation sleep 5, and got a more gradual increase over time:

Image

Image

hjoliver avatar May 22 '25 12:05 hjoliver

I tried the same again with live mode sleep 5 instead of simulation sleep 5, and got a more gradual increase over time:

This is a revealing result. I'm assuming the live plot is the lower one?

I'm guessing this ramping up doesn't appear in cylc vip --main-loop='log memory' results? (note the profiler results will accumulate with this plugin, but that should be the only thing).

I think we should look into the job submission process and subprocess management for the source of the difference.

oliver-sanders avatar May 27 '25 09:05 oliver-sanders

Actually they're both the same plot, the lower one just expands the y-axis to see more detail (should have mentioned that, sorry).

hjoliver avatar May 27 '25 09:05 hjoliver

Here are two confusing plots generated using the above example.

Pympler identified slow memory accumulation in the config, so I took a peek and it turned out that schd.config.edges is gradually accumulating memory:

Image

But on closer inspection I found that these edges were not being modified. So I commented them out of the code, only to find that the memory growth switched to the taskdefs:

Image

Notice the exact same notched pattern in both graphs.

It looks like Pympler has identified some form of memory accumulation, but can't work out what to associate it with?

oliver-sanders avatar Jun 11 '25 16:06 oliver-sanders

It looks like Pympler has identified some form of memory accumulation, but can't work out what to associate it with?

Wow yeah, promising though.. do they share the same references in one direction or another?

And that's not pympler accumulation? (I assume not)

dwsutherland avatar Jul 01 '25 08:07 dwsutherland

I think lru_cache is a large part of this. I get a different looking memory graph when I turn it off.

If you test using this profiler with a [scheduler][main loop][log tracemalloc]interval = PT1M, the lru_cache's show up as the biggest user.

It's possible that the memory will stabilise over a longer run.

Note, the cache in lru_cache is a floating variable within the wrapper, it doesn't get assigned to the function itself or any object within the application, so will not show in most memory traces. We should probs have an easy switch to turn lcu_caching off for profiling...

oliver-sanders avatar Oct 17 '25 15:10 oliver-sanders