dask-jobqueue
dask-jobqueue copied to clipboard
start of work to add flux
This is the start of work to add Flux Framework to dask. For some context, we are looking to run Dask in the Flux Operator so I'm looking into how to make that possible!
Update: everything working locally! I'll tweak here as necessary for linting / tests to pass.
I am really happy to see this! I think adding Flux support brings a ton of possibilities since it would allow us to run our own scheduler inside a larger allocation (which is great for my use case in jobqueue_features)
Okay! Did some more work / debugging today and I learned the following:
- Flux does better to be given an executable, full path (not a relative path in /tmp that isn't executable)
- Writing the file as a context makes a race condition where it's not always the case that the file has been read and submit by the time the context is finished. To get around this I removed the context, an we will need to do cleanup later / elsewhere.
- For the above, I've adopted the job close function to use the job id to retrieve the jobspec and then cleanup the original job file.
I think I'm making progress and will push some updates soon! I also think it shouldn't be the case we have some kind of race condition with the submit and getting the job id back - I'll minimally report this to the flux team to see if it's something we can fix (so the behavior is similar to the other managers).
Update: updates are pushed!
Either GitHub is having issues (I saw something similar with Flux earlier this week) or some change I introduced / the docker compose setup with flux is leading to the others to stall, although strangely htcondor seems to have worked okay? I'll keep watching!
@ocaisa I am totally new to Dask - could you tell me about jobqueue features? The MPI mention has peaked my interest - Flux is very good at that! :laughing:
If the Flux job queue here could work, if you want to point me to a vanilla example there I could definitely try it out or help think about it.
Doing more testing (with an actual kubernetes cluster and flux!) I think the cleanup might not be in the right space still - I am running a distributed training and everything looked ok, but I started seeing the job errors (failures) because the job file is reported to not be found.

I'm testing a slightly modified branch to not do the cleanup and will report back.
okay update - what I'm seeing is that the model actually finishes before all the jobs are done, maybe because it launches more than it needs / decides it's finished / at some threshold and then doesn't need the rest? Because they are technically just workers waiting to receive work? Here is the output printed, which happens after the joblib context:

and here is another terminal that is just watching the flux queue - green "CD" means completed and most are running

When I look at one of the running workers it seems to just be registered / waiting

First I tried changing the cluster and client to be used as context managers, and left the cleanup. This still led to a few errored jobs, so I removed it for now (PR updated here). I also noticed that the cluster.wait_for_workers seems to launch one job, and then wait for workers as processes on a single node? I'm not sure if there is a "wait for the entire cluster" function, but that isn't super relevant I think. I also noticed when I didn't clean up the temporary space, for the next round I would get a timeout starting the cluster, I think (maybe) because node/worker information is cached there, and then it never finds them.
TLDR: my guess with the above is that the training finishes, and jobs were left that were scheduled, but then when they actually run, the script has long since been cleaned up. For the time being (until we have a better idea) it makes sense to disable the cleanup. I don't see any errors in this case:

And since one of those job scripts is technically just starting a worker, e.g.,
fluxuser@flux-sample-0:/tmp/workflow$ cat tmp/flux-job-okvkaeiy.sh
#!/usr/bin/env bash
#flux: --job-name=dask-worker
#flux: -n 1
#flux: --cores-per-task=2
#flux: -t 00:30:00
/opt/conda/bin/python3 -m distributed.cli.dask_worker tcp://10.244.0.64:38251 --nthreads 0 --nworkers 4 --memory-limit 476.84MiB --name FluxCluster-4 --nanny --death-timeout 60
It probably doesn't hurt that it just randomly runs - the worker seems to start and be waiting around! In this context, when the job finishes (and the broker exits with 0) the rest of the jobs/pods exit too so it largely doesn't matter. Here is the full workflow and documentation I'm working on:
https://github.com/flux-framework/flux-operator/blob/add/dask-example-scikit-learn/examples/machine-learning/dask/scikit-learn/README.md
Specifically check out the notes section where I've added more of the comments above. I'd like to understand the best practices for ending / cleaning up so we don't have dangling jobs or workers. Thank you!
Another update: I was wrong in using flux submit, it should be flux batch! I thought the directives worked in either (but they don't). I'm working on fixing it now.
okay updated to use flux batch and thanks @grondo and @garlick for the immense help today! The reason it took me so long is because flux only detected one core per docker compose container, so when I tried specifying two, it gave me an error. Instead of adding a deploy section with resources I just manually set the cores per container to 2. That works locally and seems to work here (and will mirror the slurm tests). I'm testing again in the operator now.
I think I found the bug for why the worker jobs weren't cleaning up - it catches any runtime error (assuming the worker might be done) but lo and behold, flux cancel isn't a command for this build I used. I guess I'm kind of confused because I was told here https://github.com/flux-framework/cheat-sheet/pull/3 that we are trying to use flux cancel (which doesn't seem to be a command). So for now I'm falling back to flux job cancel.
FYI: flux cancel became a thing in flux-core 0.48 (released march 8).
This is interesting - with this change (so cancel should work now) I get an almost immediate failure (it doesn't wait 120s) when I do a client.restart()
broker.info[0]: quorum-full: quorum->run 32.7692s
nodes: 4
cores: 2
timeout: 60
Preparing cluster with 4 workers
Client: <Client: 'tcp://10.244.0.30:38095' processes=4 threads=8, memory=7.44 GiB>
Value of n is 8
Iteration 0 tasks
2023-04-24 00:05:46,519 - distributed.core - ERROR - 4/4 nanny worker(s) did not shut down within 120s
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/distributed/utils.py", line 752, in wrapper
return await func(*args, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 5962, in restart
raise TimeoutError(
asyncio.exceptions.TimeoutError: 4/4 nanny worker(s) did not shut down within 120s
okay confirmed that when cancel works, it actually cancels the batch job before it's being done waited for! I just pushed a change that is going to test removing the weakref in the start command, will report back.
I think I'm going to need some feedback or insight from as Dask developer here - when I remove the cleanup weakref line in start my jobs run as I'd expect, but now the tests are failing. I think we probably want a way to do the cleanup but not have it killing the workers before the cluster has been waited for!
@vsoch There's a tutorial for jobqueue_features (yes it is a terrible name, but it didn't seem to matter much at the time) at https://github.com/E-CAM/jobqueue_features_workshop_materials and just looking at the notebooks there should give you a quick idea about what is intended. The one big benefit it can bring is that you get access to memory space of the root process in the cluster, so can avoid using disk/stdout to communicate between tasks. To be fully capable, I believe this feature would require PEP554 as right now, using this MPI mode (rather than forking), your task can control the main thread too long and not return the Dask heartbeat. It's not mature, more like a hobby project, but with your PR I might come back to it again.
I'm interested in multiscale problems and that frequently means coupling different codes together within an ensemble approach. I wanted the ability to resize the resources I'm using for one or the other (MPI-enabled) codes, and Dask does this pretty nicely by allowing me to define multiple clusters. With flux support I could take a "big allocation" on an exascale resource, start up flux and use flux clusters to execute the various components of the workflow. Nesting would be cool too, spinning up jobs that add to the resource pool for the flux clusters which do the actual work.
@ocaisa do you know how the review process works here? Is the project even active?
Hi @vsoch, sorry I did not find the time to review your work here, and as I don't know anything about Flux, I need to take some time to deep into it. Please be patient 🙂.
oh hey!! Yeah I'm totally good with that! I just wanted to make sure I didn't muck this up by putting it in the wrong place, etc. Thank you for the response @guillaumeeb I didn't mean to be impatient (more anticipating that I did something wrong, really, and wanting to fix it) :laughing:
Thank you @ocaisa ! It's been a while since I tested this, but I think I had some questions about the expected functionality in my original description / post (for example, the worker cleanup). If you want me to go back and walk through a simple case, maybe that would be a good start?
Starting work on this now! We have had some changes the Flux Operator in the last month and a half, so I'm going to start fresh and walk through the tests / feedback above. Stay tuned!
If / when tests pass please don't merge yet! I'm testing in the Flux operator and trying to optimize for the new design. I'll post an update when I'm done.
okay this is a bad design and I can't support moving forward without more thinking. Right now, dask requires a shared temporary space to write the job scripts and have a scratch space for itself. I was doing a hack to make a host volume that would be this location (via TMPDIR) but this gets ugly really quickly because other applications grab onto it, and all of a sudden we have the case of needing to clean up the dask stuff, but not the other temporary files (e.g., a flux local socket) and then if you aren't absolultely careful, the entire thing breaks (and it's a really bad setup / standard to start from). If we use this approach:
# Ensure the place dask writes files is shared by all nodes!
dask.config.set(temporary_directory='/tmp/workflow/tmp')
That works for scratch, but not for the job files, so then we run the job and the other workers (different pods) can't find them. And my current situation is that I can't get it working, period, under any circumstances. I'm seeing all kinds of wonky stuff from flux, trying each of interactive submit and headless (which should theoretically be the same, but the latter gives less time for things to find one another):
2023-06-04 21:19:55,430 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.244.0.59:40305'
Jun 04 21:19:56.070879 broker.err[0]: rc2.0: /run/flux/jobtmp-0-FDhiv7Fd/script Hangup (rc=129) 1.7s
Jun 04 21:19:56.115549 broker.err[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Hangup (rc=129) 0.0s
Jun 04 21:05:23.607221 broker.err[0]: rc1.0: /etc/flux/rc1 Hangup (rc=129) 2.0s
Jun 04 21:05:23.640004 broker.err[0]: rc3.0: /etc/flux/rc3 Hangup (rc=129) 0.0s
Jun 04 21:05:23.903494 broker.err[0]: flux_respond to insmod resource: Function not implemented
E: (flux-broker) 23-06-04 21:05:24 [197]dangling 'PAIR' socket created at shmem.c:148
E: (flux-broker) 23-06-04 21:05:24 [197]dangling 'PAIR' socket created at shmem.c:148
E: (flux-broker) 23-06-04 21:05:24 [197]dangling 'PAIR' socket created at shmem.c:148
E: (flux-broker) 23-06-04 21:05:24 [197]dangling 'PAIR' socket created at shmem.c:148
E: (flux-broker) 23-06-04 21:05:24 [197]dangling 'PAIR' socket created at shmem.c:148
I don't see a way forward with this design - I've gotten it working maybe twice when I first opened the PR, and it seemed to work under some mysterious circumstances once today, but it's not consistently not working and I'm out of ideas. I think likely we need to step back, rethink this design, and (for the time being) remove the example from the operator because it doesn't work. Let me know your thoughts!
It's been a long time since this PR had any activity and given that you don't want to continue with the current design so I'm going to close it out.
Would dask ever work without having this shared location for the workers? A shared filesystem is nontrivial in Kubernetes.
The worker temporary directory doesn't need to be shared between all workers. Each worker uses it's own directory.
Gotcha - this PR is old enough that it would be worth trying again. I'll start fresh and open a new one if I have any success. Thanks for keeping on top of this @jacobtomlinson.