prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Docs: Add a tutorial for how to use Prefect with a job scheduler (e.g. Slurm) on an HPC machine

Open Andrew-S-Rosen opened this issue 1 year ago • 7 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used GitHub search to find a similar request and didn't find it 😇

Describe the issue

There are currently no examples for how to use Prefect with a job scheduling system (e.g. SLURM, PBS, MOAB) on an HPC machine. I think this is a pretty important omission because most academics and users of the top supercomputers might not be aware how they can use Prefect.

Describe the proposed change

Add a tutorial.

Additional context

There's mention of spinning up a Dask cluster but no representative example of how this is done in practice with a given job scheduling system.

Andrew-S-Rosen avatar Jul 01 '23 21:07 Andrew-S-Rosen

I'm all in for a tutorial on how to use Prefect with a stack using submitit, hydra, and possibly dora!

alexisthual avatar Jul 03 '23 20:07 alexisthual

I must admit that I have no clue what any of those 3 are 😅 I was thinking more of the recommended approach of using a prefect-dask.DaskTaskRunner instantiated with a dask-jobqueue.SLURMCluster, which would be passed to the task_runner kwarg in the @flow decorator.

Andrew-S-Rosen avatar Jul 03 '23 20:07 Andrew-S-Rosen

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] avatar Aug 02 '23 21:08 github-actions[bot]

I would also be interested in such a tutorial. In particular a simple tutorial that takes a Prefect workflow, submits it as a Slurm job. And monitors its execution. I'm able to help all y'all, and can test it at NERSC.

I'm also interested in embarrassingly parallel tasks, and tasks that communicate using MPI across nodes.

JBlaschke avatar Nov 29 '23 19:11 JBlaschke

@JBlaschke: Here is an example I wrote up for Perlmutter. Feel free to use this and do whatever you'd like with it. I think the challenge I ran into for practical use was that you can only apply a single task runner for the entire flow, meaning if some of the individual tasks have different compute requirements, you're mostly out of luck (unless I missed something). There is also the fact that the compute nodes need to be able to communicate with the Prefect server, which not all HPC machines can do if using Prefect Cloud, but that's not a problem at NERSC.

Note to future me: the content below was taken from my 0.2.0 release of quacc.

Some Utility Functions

from __future__ import annotations

from typing import Callable, TYPE_CHECKING

from dask_jobqueue import SLURMCluster

if TYPE_CHECKING:
    from dask_jobqueue.core import DaskJobqueueJob
    from prefect_dask.task_runners import DaskTaskRunner

def make_dask_runner(
    cluster_kwargs: dict,
    cluster_class: Callable = None,
    adapt_kwargs: dict[str, int | None] | None = None,
    client_kwargs: dict = None,
    temporary: bool = False,
) -> DaskTaskRunner:
    """
    Make a DaskTaskRunner for use with Prefect workflows.

    Parameters
    ----------
    cluster_kwargs
        Keyword arguments to pass to `cluster_class`.
    cluster_class
        The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
    adapt_kwargs
        Keyword arguments to pass to `cluster.adapt` of the form `{"minimum": int, "maximum": int}`.
        If `None`, no adaptive scaling will be done.
    client_kwargs
        Keyword arguments to pass to `dask.distributed.Client`.
    temporary
        Whether to use a temporary cluster. If `True`, the cluster will be
        terminated once the `Flow` is finished. If `False`, the cluster will
        run until the walltime is reached and can run multiple `Flow`s.

    Returns
    -------
    DaskTaskRunner
        A DaskTaskRunner object for use with Prefect workflows.
    """
    from dask_jobqueue import SLURMCluster
    from prefect_dask.task_runners import DaskTaskRunner

    if cluster_class is None:
        cluster_class = SLURMCluster

    # Make the one-time-use DaskTaskRunner
    if temporary:
        return DaskTaskRunner(
            cluster_class=cluster_class,
            cluster_kwargs=cluster_kwargs,
            adapt_kwargs=adapt_kwargs,
            client_kwargs=client_kwargs,
        )

    # Make the Dask cluster
    cluster = _make_dask_cluster(cluster_class, cluster_kwargs)

    # Set up adaptive scaling
    if adapt_kwargs and (adapt_kwargs["minimum"] or adapt_kwargs["maximum"]):
        cluster.adapt(minimum=adapt_kwargs["minimum"], maximum=adapt_kwargs["maximum"])

    # Return the DaskTaskRunner with the cluster address
    return DaskTaskRunner(address=cluster.scheduler_address)


def _make_dask_cluster(
    cluster_class: Callable = SLURMCluster, cluster_kwargs: dict, verbose: bool = True
) -> DaskJobqueueJob:
    """
    Make a Dask cluster for use with Prefect workflows.

    Parameters
    ----------
    cluster_class
        The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
    cluster_kwargs
        Keyword arguments to pass to `cluster_class`.
    verbose
        Whether to print the job script to stdout.
    """

    cluster = cluster_class(**cluster_kwargs)
    if verbose:
        print(
            f"Workers are submitted with the following job script:\n{cluster.job_script()}"
        )
        print(f"Scheduler is running at {cluster.scheduler.address}")
        print(f"Dashboard is located at {cluster.dashboard_link}")

    return cluster

Example Usage

n_slurm_jobs = 1 # Number of Slurm jobs to launch in parallel.
n_nodes_per_calc = 1 # Number of nodes to reserve for each Slurm job.
n_cores_per_node = 48 # Number of CPU cores per node.
mem_per_node = "64 GB" # Total memory per node.

cluster_kwargs = {
    # Dask worker options
    "n_workers": n_slurm_jobs,
    "cores": n_cores_per_node,
    "memory": mem_per_node,
    # SLURM options
    "shebang": "#!/bin/bash",
    "account": "AccountName",
    "walltime": "00:10:00",
    "job_mem": "0",
    "job_script_prologue": [
        "source ~/.bashrc",
        "conda activate MyEnv",
    ],
    "job_directives_skip": ["-n", "--cpus-per-task"],
    "job_extra_directives": [f"-N {n_nodes_per_calc}", "-q debug", "-C cpu"],
    "python": "python",
}


runner = make_dask_runner(cluster_kwargs, temporary=True)

@flow(task_runner=runner)
def workflow(*args, **kwargs):
    ...

When the workflow is run from the login node, it will be submitted to the job scheduling system (Slurm by default), and the results will be sent back to Prefect Cloud once completed. Refer to the Dask-Jobqueue Documentation for the available cluster_kwargs that can be defined and how they relate to a typical job script.

To asynchronously spawn a Slurm job that continually pulls in work for the duration of its walltime (rather than starting and terminating over the lifetime of the associated Flow), you can instead use the make_dask_runner command without a temporary keyword argument:

runner = make_dask_runner(cluster_kwargs)

Additionally, you can have the generated Dask cluster adaptively scale based on the amount of work available by setting adapt_kwargs as follows:

runner = make_dask_runner(cluster_kwargs, adapt_kwargs={"minimum": 1, "maximum": 5})

This will ensure that at least one Slurm job is always running, but the number of jobs will scale up to 5 if there is enough work available.

Andrew-S-Rosen avatar Nov 29 '23 19:11 Andrew-S-Rosen

@Andrew-S-Rosen thank you -- I'll dust this off and condense it into a tutorial for our docs.

JBlaschke avatar Nov 29 '23 22:11 JBlaschke

Here is a worked example for those curious. It is meant to work on the Perlmutter machine at NERSC, but the insights are largely machine-agnostic.

Preliminary Steps

On the login node:

pip install prefect prefect-dask dask-jobqueue
prefect cloud login

Basic Example

Prefect has a Dask and Ray backend, but only Dask (via dask-jobqueue) interfaces cleanly with Slurm. We'll use dask-jobqueue as the backend for simplicity.

We start by defining the SLURMCluster() object in dask-jobqueue. Namely, we're outlining what goes in the Slurm submission script. This is the part that you'll need to adjust for your machine.

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

cluster_kwargs = {
    "cores": 128,  
    "memory": "64 GB",
    "shebang": "#!/bin/bash",
    "account": "MyAccountName",
    "walltime": "00:10:00",
    "job_mem": "0",
    "job_script_prologue": ["source ~/.bashrc"],
    "job_directives_skip": ["-n", "--cpus-per-task"],  
    "job_extra_directives": ["-q debug", "-C cpu"],  
}

cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())

Now we define how many Slurm jobs we want with those specs and instantiate the Dask cluster. This will immediately submit a job to the queue even though we don't have any compute tasks to run just yet.

slurm_jobs = 1
cluster.scale(jobs=slurm_jobs)
client = Client(cluster)

Now we'll define our Prefect workflow (@flow) and tell it to run the individual @tasks via our active Dask cluster. This can be done using the DaskTaskRunner in Prefect, which allows you to pass an address for the Dask cluster.

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def workflow(a: float, b: float) -> float:
    output1 = add.submit(a, b)
    output2 = mult.submit(output1, b)
    return output2

@task
def add(a: float, b: float) -> float:
    return a + b

@task
def mult(a: float, b: float) -> float:
    return a * b

Now we instantiate and execute the workflow. The progress can be traced in Prefect Cloud.

output = workflow(1, 2)
print(output.result())

Since the Dask cluster remains alive until the walltime or it is killed, we can run another workflow if we want.

Temporary Dask Cluster

Some users may prefer to spin up a Dask cluster (i.e. Slurm job) for each indiviual @flow. This is also possible in Prefect. (Note that the example below will complain about an open port if you have already instantiated a Dask cluster above.)

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(cluster_class=SLURMCluster, cluster_kwargs=cluster_kwargs))
def workflow(a: float, b: float) -> float:
    output1 = add.submit(a, b)
    output2 = mult.submit(output1, b)
    return output2

@task
def add(a: float, b: float) -> float:
    return a + b

@task
def mult(a: float, b: float) -> float:
    return a * b

workflow(1, 2).result()

At this point, since the @task is finished, the Slurm job is no longer running.

Limitations

Prefect Cloud is Fine at NERSC But Not Everywhere

Need a network connection from the compute node.

Killed Slurm Jobs Aren't Reflected in the UI

If you scancel a Slurm job, it's possible that the @flow may be stuck in a running state in the UI.

Pilot Job Behavior is Limited

The main limitation of Prefect in this kind of setup is that it inherits the limitations of dask-jobqueue. Most notably, you can't distribute @tasks over multiple nodes on a single Slurm allocation. For instance, the following is not possible:

  • Running 256 single-core tasks over two 128-core CPU nodes in a single Slurm allocation. dask-jobqueue only knows how to do this over 1 node.
  • Running two concurrent VASP jobs requiring 1 GPU node each across 2 GPU nodes in a single Slurm allocation. dask-jobqueue does not know how to split up the underlying MPI processes over multiple nodes in a single allocation.

Some details:

  • https://github.com/dask/dask-jobqueue/issues/459
  • https://github.com/dask/dask-jobqueue/issues/616

Concurrent @flows is Not Yet Supported

If you want to run multiple concurrent @flows without blocking in a single Python process, you can't (yet). You have to submit each @flow sequentially or in separate Python processes. This is kind of annoying, but there is an open issue about it.

Details:

  • https://github.com/PrefectHQ/prefect/issues/6689

Andrew-S-Rosen avatar Apr 26 '24 21:04 Andrew-S-Rosen