reana icon indicating copy to clipboard operation
reana copied to clipboard

Dask Integration for Analysis Grand Challenge

Open goseind opened this issue 1 year ago • 2 comments

Motivation

As we talked about in our Analysis Grand Challenge meeting today, we would like to enhance REANA to support Dask for distributed analysis on Kubernetes as a backend.

Modifications

This would involve allowing the user to specify to easily enable the usage of a Dask cluster in his workflow file e. g.:

Dask: true
# some more options tbd

In the background, this would create a Dask cluster from the user pod. Further, this would require additional helm values in the helm Chart to configure the Dask cluster backend in the first place e. g. Kubernetes Cluster.

Refer also to this CodiMD for more details on today's meeting.

goseind avatar May 30 '23 12:05 goseind

to be tested on the AGC Dask example: https://github.com/root-project/analysis-grand-challenge

egazzarr avatar May 30 '23 13:05 egazzarr

I have done a proof-of-concept test on my local REANA deployment. Everything works well.

  1. Bring up a local Dask service inside the REANA cluster:
$ helm install dask dask/dask --version 2023.1.0
  1. Detect the Dask scheduler connection URI:
$ DASK_SCHEDULER_URI=$(kubectl get services dask-scheduler | awk 'NR==2 {print "tcp://" $3 ":" substr($5,0,4)}')
$ echo $DASK_SCHEDULER_URI
tcp://10.96.4.51:8786
  1. Create a simple Dask-based user analysis example for the proof-of-concept:
import dask
import dask.array
import dask.distributed
import os

DASK_SCHEDULER_URI = os.getenv("DASK_SCHEDULER_URI", "tcp://127.0.0.1:8080")
client = dask.distributed.Client(DASK_SCHEDULER_URI)

x = dask.array.random.random((10000, 10000), chunks=(1000, 1000))
y = x + x.T
z = y[::2, 5000:].mean(axis=1)

result = z.compute()
print(result)

  1. Create accompanying reana.yaml workflow structure using the above Dask scheduler connection information:
inputs:
  files:
    - dask_demo.py
workflow:
  type: serial
  specification:
    steps:
      - name: mystep
        environment: 'ghcr.io/dask/dask:2023.1.0'
        commands:
        - DASK_SCHEDULER_URI=tcp://10.96.4.51:8786 python dask_demo.py > dask_demo.res
outputs:
  files:
    - dask_demo.res
  1. Run this example on REANA:
$ reana-client run -w dask
$ sleep 60
$ reana-client status -w dask
$ reana-client ls -w dask
$ reana-client download -w dask dask_demo.res
$ tail dask_demo.res
[0.99483301 0.99318256 1.0114246  ... 0.99163413 0.99748661 1.01648798]

Note

Beware of versioning. In the above example, we are using Dask version 2023.1.0 in both the cluster and the client. If a user happens to use say the latest Dask 2023.5.1 in their job image, then the above workflow would fail due to client vs cluster version inconsistencies:

$ kubectl logs service/dask-scheduler | tail
  File "/opt/conda/lib/python3.8/site-packages/distributed/scheduler.py", line 5213, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 904, in handle_stream
    handler(**merge(extra, msg))
TypeError: update_graph() got an unexpected keyword argument 'graph_header'
2023-05-31 12:46:26,523 - distributed.scheduler - INFO - Receive client connection: Client-279c10b7-ffb1-11ed-800d-4285cf89ac3c
2023-05-31 12:46:26,523 - distributed.core - INFO - Starting established connection to tcp://10.244.0.43:60864
2023-05-31 12:46:26,624 - distributed.core - INFO - Connection to tcp://10.244.0.43:60864 has been closed.

This means that:

  • If a REANA site would use a statically pre-created Dask cluster for user jobs, then reana-client validate will have to carefully check user images for any Dask-related version consistencies before accepting user jobs.

  • Ideally though, the user should be able to specify in their reana.yaml the desired Dask resource version, for example:

    resources:
      dask:
        version: 2023.1.0
    

    REANA would then dynamically create a new individual Dask cluster with the desired version for this concrete user at the analysis start-up time, run the analysis, and bring the Dask cluster back down afterwards. (Which will all need careful error checking when managing desired resources by REANA.)

tiborsimko avatar May 31 '23 14:05 tiborsimko