prefect
prefect copied to clipboard
Docs: Add a tutorial for how to use Prefect with a job scheduler (e.g. Slurm) on an HPC machine
First check
- [X] I added a descriptive title to this issue.
- [X] I used GitHub search to find a similar request and didn't find it 😇
Describe the issue
There are currently no examples for how to use Prefect with a job scheduling system (e.g. SLURM, PBS, MOAB) on an HPC machine. I think this is a pretty important omission because most academics and users of the top supercomputers might not be aware how they can use Prefect.
Describe the proposed change
Add a tutorial.
Additional context
There's mention of spinning up a Dask cluster but no representative example of how this is done in practice with a given job scheduling system.
I'm all in for a tutorial on how to use Prefect with a stack using submitit
, hydra
, and possibly dora
!
I must admit that I have no clue what any of those 3 are 😅 I was thinking more of the recommended approach of using a prefect-dask.DaskTaskRunner
instantiated with a dask-jobqueue.SLURMCluster
, which would be passed to the task_runner
kwarg in the @flow
decorator.
This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.
I would also be interested in such a tutorial. In particular a simple tutorial that takes a Prefect workflow, submits it as a Slurm job. And monitors its execution. I'm able to help all y'all, and can test it at NERSC.
I'm also interested in embarrassingly parallel tasks, and tasks that communicate using MPI across nodes.
@JBlaschke: Here is an example I wrote up for Perlmutter. Feel free to use this and do whatever you'd like with it. I think the challenge I ran into for practical use was that you can only apply a single task runner for the entire flow, meaning if some of the individual tasks have different compute requirements, you're mostly out of luck (unless I missed something). There is also the fact that the compute nodes need to be able to communicate with the Prefect server, which not all HPC machines can do if using Prefect Cloud, but that's not a problem at NERSC.
Note to future me: the content below was taken from my 0.2.0 release of quacc
.
Some Utility Functions
from __future__ import annotations
from typing import Callable, TYPE_CHECKING
from dask_jobqueue import SLURMCluster
if TYPE_CHECKING:
from dask_jobqueue.core import DaskJobqueueJob
from prefect_dask.task_runners import DaskTaskRunner
def make_dask_runner(
cluster_kwargs: dict,
cluster_class: Callable = None,
adapt_kwargs: dict[str, int | None] | None = None,
client_kwargs: dict = None,
temporary: bool = False,
) -> DaskTaskRunner:
"""
Make a DaskTaskRunner for use with Prefect workflows.
Parameters
----------
cluster_kwargs
Keyword arguments to pass to `cluster_class`.
cluster_class
The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
adapt_kwargs
Keyword arguments to pass to `cluster.adapt` of the form `{"minimum": int, "maximum": int}`.
If `None`, no adaptive scaling will be done.
client_kwargs
Keyword arguments to pass to `dask.distributed.Client`.
temporary
Whether to use a temporary cluster. If `True`, the cluster will be
terminated once the `Flow` is finished. If `False`, the cluster will
run until the walltime is reached and can run multiple `Flow`s.
Returns
-------
DaskTaskRunner
A DaskTaskRunner object for use with Prefect workflows.
"""
from dask_jobqueue import SLURMCluster
from prefect_dask.task_runners import DaskTaskRunner
if cluster_class is None:
cluster_class = SLURMCluster
# Make the one-time-use DaskTaskRunner
if temporary:
return DaskTaskRunner(
cluster_class=cluster_class,
cluster_kwargs=cluster_kwargs,
adapt_kwargs=adapt_kwargs,
client_kwargs=client_kwargs,
)
# Make the Dask cluster
cluster = _make_dask_cluster(cluster_class, cluster_kwargs)
# Set up adaptive scaling
if adapt_kwargs and (adapt_kwargs["minimum"] or adapt_kwargs["maximum"]):
cluster.adapt(minimum=adapt_kwargs["minimum"], maximum=adapt_kwargs["maximum"])
# Return the DaskTaskRunner with the cluster address
return DaskTaskRunner(address=cluster.scheduler_address)
def _make_dask_cluster(
cluster_class: Callable = SLURMCluster, cluster_kwargs: dict, verbose: bool = True
) -> DaskJobqueueJob:
"""
Make a Dask cluster for use with Prefect workflows.
Parameters
----------
cluster_class
The Dask cluster class to use. Defaults to `dask_jobqueue.SLURMCluster`.
cluster_kwargs
Keyword arguments to pass to `cluster_class`.
verbose
Whether to print the job script to stdout.
"""
cluster = cluster_class(**cluster_kwargs)
if verbose:
print(
f"Workers are submitted with the following job script:\n{cluster.job_script()}"
)
print(f"Scheduler is running at {cluster.scheduler.address}")
print(f"Dashboard is located at {cluster.dashboard_link}")
return cluster
Example Usage
n_slurm_jobs = 1 # Number of Slurm jobs to launch in parallel.
n_nodes_per_calc = 1 # Number of nodes to reserve for each Slurm job.
n_cores_per_node = 48 # Number of CPU cores per node.
mem_per_node = "64 GB" # Total memory per node.
cluster_kwargs = {
# Dask worker options
"n_workers": n_slurm_jobs,
"cores": n_cores_per_node,
"memory": mem_per_node,
# SLURM options
"shebang": "#!/bin/bash",
"account": "AccountName",
"walltime": "00:10:00",
"job_mem": "0",
"job_script_prologue": [
"source ~/.bashrc",
"conda activate MyEnv",
],
"job_directives_skip": ["-n", "--cpus-per-task"],
"job_extra_directives": [f"-N {n_nodes_per_calc}", "-q debug", "-C cpu"],
"python": "python",
}
runner = make_dask_runner(cluster_kwargs, temporary=True)
@flow(task_runner=runner)
def workflow(*args, **kwargs):
...
When the workflow is run from the login node, it will be submitted to the job scheduling system (Slurm by default), and the results will be sent back to Prefect Cloud once completed. Refer to the Dask-Jobqueue Documentation for the available cluster_kwargs
that can be defined and how they relate to a typical job script.
To asynchronously spawn a Slurm job that continually pulls in work for the duration of its walltime (rather than starting and terminating over the lifetime of the associated Flow
), you can instead use the make_dask_runner
command without a temporary
keyword argument:
runner = make_dask_runner(cluster_kwargs)
Additionally, you can have the generated Dask cluster adaptively scale based on the amount of work available by setting adapt_kwargs
as follows:
runner = make_dask_runner(cluster_kwargs, adapt_kwargs={"minimum": 1, "maximum": 5})
This will ensure that at least one Slurm job is always running, but the number of jobs will scale up to 5 if there is enough work available.
@Andrew-S-Rosen thank you -- I'll dust this off and condense it into a tutorial for our docs.
Here is a worked example for those curious. It is meant to work on the Perlmutter machine at NERSC, but the insights are largely machine-agnostic.
Preliminary Steps
On the login node:
pip install prefect prefect-dask dask-jobqueue
prefect cloud login
Basic Example
Prefect has a Dask and Ray backend, but only Dask (via dask-jobqueue
) interfaces cleanly with Slurm. We'll use dask-jobqueue
as the backend for simplicity.
We start by defining the SLURMCluster()
object in dask-jobqueue
. Namely, we're outlining what goes in the Slurm submission script. This is the part that you'll need to adjust for your machine.
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster_kwargs = {
"cores": 128,
"memory": "64 GB",
"shebang": "#!/bin/bash",
"account": "MyAccountName",
"walltime": "00:10:00",
"job_mem": "0",
"job_script_prologue": ["source ~/.bashrc"],
"job_directives_skip": ["-n", "--cpus-per-task"],
"job_extra_directives": ["-q debug", "-C cpu"],
}
cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())
Now we define how many Slurm jobs we want with those specs and instantiate the Dask cluster. This will immediately submit a job to the queue even though we don't have any compute tasks to run just yet.
slurm_jobs = 1
cluster.scale(jobs=slurm_jobs)
client = Client(cluster)
Now we'll define our Prefect workflow (@flow
) and tell it to run the individual @task
s via our active Dask cluster. This can be done using the DaskTaskRunner
in Prefect, which allows you to pass an address for the Dask cluster.
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def workflow(a: float, b: float) -> float:
output1 = add.submit(a, b)
output2 = mult.submit(output1, b)
return output2
@task
def add(a: float, b: float) -> float:
return a + b
@task
def mult(a: float, b: float) -> float:
return a * b
Now we instantiate and execute the workflow. The progress can be traced in Prefect Cloud.
output = workflow(1, 2)
print(output.result())
Since the Dask cluster remains alive until the walltime or it is killed, we can run another workflow if we want.
Temporary Dask Cluster
Some users may prefer to spin up a Dask cluster (i.e. Slurm job) for each indiviual @flow
. This is also possible in Prefect. (Note that the example below will complain about an open port if you have already instantiated a Dask cluster above.)
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(cluster_class=SLURMCluster, cluster_kwargs=cluster_kwargs))
def workflow(a: float, b: float) -> float:
output1 = add.submit(a, b)
output2 = mult.submit(output1, b)
return output2
@task
def add(a: float, b: float) -> float:
return a + b
@task
def mult(a: float, b: float) -> float:
return a * b
workflow(1, 2).result()
At this point, since the @task
is finished, the Slurm job is no longer running.
Limitations
Prefect Cloud is Fine at NERSC But Not Everywhere
Need a network connection from the compute node.
Killed Slurm Jobs Aren't Reflected in the UI
If you scancel
a Slurm job, it's possible that the @flow
may be stuck in a running state in the UI.
Pilot Job Behavior is Limited
The main limitation of Prefect in this kind of setup is that it inherits the limitations of dask-jobqueue
. Most notably, you can't distribute @task
s over multiple nodes on a single Slurm allocation. For instance, the following is not possible:
- Running 256 single-core tasks over two 128-core CPU nodes in a single Slurm allocation.
dask-jobqueue
only knows how to do this over 1 node. - Running two concurrent VASP jobs requiring 1 GPU node each across 2 GPU nodes in a single Slurm allocation.
dask-jobqueue
does not know how to split up the underlying MPI processes over multiple nodes in a single allocation.
Some details:
- https://github.com/dask/dask-jobqueue/issues/459
- https://github.com/dask/dask-jobqueue/issues/616
Concurrent @flow
s is Not Yet Supported
If you want to run multiple concurrent @flow
s without blocking in a single Python process, you can't (yet). You have to submit each @flow
sequentially or in separate Python processes. This is kind of annoying, but there is an open issue about it.
Details:
- https://github.com/PrefectHQ/prefect/issues/6689