reana
reana copied to clipboard
Dask Integration for Analysis Grand Challenge
Motivation
As we talked about in our Analysis Grand Challenge meeting today, we would like to enhance REANA to support Dask for distributed analysis on Kubernetes as a backend.
Modifications
This would involve allowing the user to specify to easily enable the usage of a Dask cluster in his workflow file e. g.:
Dask: true
# some more options tbd
In the background, this would create a Dask cluster from the user pod. Further, this would require additional helm values in the helm Chart to configure the Dask cluster backend in the first place e. g. Kubernetes Cluster.
Refer also to this CodiMD for more details on today's meeting.
to be tested on the AGC Dask example: https://github.com/root-project/analysis-grand-challenge
I have done a proof-of-concept test on my local REANA deployment. Everything works well.
- Bring up a local Dask service inside the REANA cluster:
$ helm install dask dask/dask --version 2023.1.0
- Detect the Dask scheduler connection URI:
$ DASK_SCHEDULER_URI=$(kubectl get services dask-scheduler | awk 'NR==2 {print "tcp://" $3 ":" substr($5,0,4)}')
$ echo $DASK_SCHEDULER_URI
tcp://10.96.4.51:8786
- Create a simple Dask-based user analysis example for the proof-of-concept:
import dask
import dask.array
import dask.distributed
import os
DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080")
client = dask.distributed.Client(DASK_SCHEDULER_URI)
x = dask.array.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
result = z.compute()
print(result)
- Create accompanying
reana.yaml
workflow structure using the above Dask scheduler connection information:
inputs:
files:
- dask_demo.py
workflow:
type: serial
specification:
steps:
- name: mystep
environment: 'ghcr.io/dask/dask:2023.1.0'
commands:
- DASK_SCHEDULER_URI=tcp://10.96.4.51:8786 python dask_demo.py > dask_demo.res
outputs:
files:
- dask_demo.res
- Run this example on REANA:
$ reana-client run -w dask
$ sleep 60
$ reana-client status -w dask
$ reana-client ls -w dask
$ reana-client download -w dask dask_demo.res
$ tail dask_demo.res
[0.99483301 0.99318256 1.0114246 ... 0.99163413 0.99748661 1.01648798]
Note
Beware of versioning. In the above example, we are using Dask version 2023.1.0 in both the cluster and the client. If a user happens to use say the latest Dask 2023.5.1 in their job image, then the above workflow would fail due to client vs cluster version inconsistencies:
$ kubectl logs service/dask-scheduler | tail
File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5213, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
handler(**merge(extra, msg))
TypeError: update_graph() got an unexpected keyword argument 'graph_header'
2023-05-31 12:46:26,523 - distributed.scheduler - INFO - Receive client connection: Client-279c10b7-ffb1-11ed-800d-4285cf89ac3c
2023-05-31 12:46:26,523 - distributed.core - INFO - Starting established connection to tcp://10.244.0.43:60864
2023-05-31 12:46:26,624 - distributed.core - INFO - Connection to tcp://10.244.0.43:60864 has been closed.
This means that:
-
If a REANA site would use a statically pre-created Dask cluster for user jobs, then
reana-client validate
will have to carefully check user images for any Dask-related version consistencies before accepting user jobs. -
Ideally though, the user should be able to specify in their
reana.yaml
the desired Dask resource version, for example:resources: dask: version: 2023.1.0
REANA would then dynamically create a new individual Dask cluster with the desired version for this concrete user at the analysis start-up time, run the analysis, and bring the Dask cluster back down afterwards. (Which will all need careful error checking when managing desired resources by REANA.)
The proof-of-concept has been done, the RFC with the desired features for the Dask integration is being prepared in #823, and we are starting the regular work on implementing the features. Hence I'm closing this proof-of-concept issue.