distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Worker profile limited to a short timespan

Open RaphaelRobidas opened this issue 1 year ago • 3 comments

Describe the issue: The worker profile has a limited span and older data seems to be lost. For example, with the minimal example below, the total CPU time is 24 hours, but the profile never contains more than 4 h 26 min. At one point during the run, the profile looks like this: image

30 minutes later, it looks like this: image

The previous data is completely gone, as can be seen by the "activity over time" graph at the bottom.

This issue has been occurring for several months, most recently with dask and distributed 2024.4.2. You can look at the full discussion on Discourse: https://dask.discourse.group/t/measuring-the-overall-profile-of-long-runs/1859/11

Minimal Complete Verifiable Example:

import time
import logging

from dask.distributed import (
    Client,
    LocalCluster,
    get_client,
    as_completed,
    performance_report,
)

NUM_PROCESSES = 16

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)-8s %(message)s",
)


class DummyManager:
    def run(self):
        logging.info("Starting the manager")

        jobs = list(range(1, 97))

        client = get_client()
        futs = []
        for j in jobs:
            futs.append(client.submit(self.job, j))

        asc = as_completed(futs, with_results=True)
        for fut, ret in asc:
            logging.info(f"Processing future {str(fut)}: ret={str(ret)}")
            if ret > 0:
                logging.info(f"Launching a subjob with time {ret}")
                asc.add(client.submit(self.job, ret))
            fut.release()

    def job(self, n):
        time.sleep(60 * 15)

        return 0


if __name__ == "__main__":
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=NUM_PROCESSES,
        processes=False,
    )

    client = Client(cluster)
    with performance_report(filename=f"dask-performance_{time.time():.0f}.html"):
        try:
            manager = DummyManager()
            manager.run()
        except KeyboardInterrupt:
            logging.info("Stopping the job...")
            cluster.close()
            exit(0)
    client.close()
    cluster.close()

Environment:

  • Dask version: 2024.4.2
  • Python version: 3.10.12
  • Operating System: Linux Mint 21.2
  • Install method (conda, pip, source): pip

RaphaelRobidas avatar May 22 '24 18:05 RaphaelRobidas

You appear to be using the profile tab to try and measure the execution duration of a single function call that takes over 24 hours. Is that right? This seems like a surprising use of the profiler to me.

With task submissions that take this long would you not be better off with a workflow manager like Prefect running on top of Dask?

jacobtomlinson avatar Oct 17 '24 11:10 jacobtomlinson

There's the /tasks page which has a longer history

mrocklin avatar Oct 17 '24 13:10 mrocklin

Thanks for the info both of you!

@jacobtomlinson I did not know about Prefect, it sounds like exactly what I need at this stage. Looks like I have a huge refactor to do now!

As for the original issue, perhaps it's just a matter of context. I was under the impression that the profile would provide info about the overall profile for all tasks, for example to view statistics over arbitrarily large numbers of tasks during the whole lifetime of the scheduler. The odd total time made it seem like something unexpected was happening.

RaphaelRobidas avatar Oct 18 '24 12:10 RaphaelRobidas