dask-jobqueue icon indicating copy to clipboard operation
dask-jobqueue copied to clipboard

Instantiate a JobQueueCluster with an already started Scheduler

Open guillaumeeb opened this issue 4 years ago • 5 comments

While looking in several issues here, I feel that adding the ability to instanciate a JobQueueCluster with an already started Scheduler could help some use cases, and be a first step towards starting the Scheduler remotly on a job (#186).

The idea would be to add the possibility to do:

cluster = FooCluster(scheduler=my_scheduler)
#or 
cluster = FooCluster(scheduler_info=scheduler_file)

Then the JobQueueCluster won't instantiate a Scheduler, but just connect to an existing one.

I think this woul provide solutions to some issues:

  • #407, we could reuse a LocalCluster Scheduler :
loc_cluster = LocalCluster(...)
job_cluster = FooCluster(scheduler=loc_cluster.scheduler)
  • #378, #381 :
lowmem_cluster = FooCluster(cores=2,memory='4GB',resources=...)
highmem_cluster = FooCluster(scheduler=lowmeme_cluster.scheduler,cores=2,memory='16GB',resources=...)

Any thought?

guillaumeeb avatar Apr 25 '20 07:04 guillaumeeb

In addition to starting a scheduler on a different node, using an existing scheduler could be a way to create heterogeneous clusters by using the same scheduler for more than one FooCluster.

But if I remember correctly, the SpecCluster class really wants to start its own scheduler.

willirath avatar Apr 25 '20 15:04 willirath

Why not no strong informed opinion on this one ...

The thing with this thing is that the .scale and adaptive semantics are not super-well defined (do you want to scale/adapt separately the different part of your clusters or together with a fixed ration between the two). Example: you have some GPU and CPU workers, do you want a fixed ratio between CPU and GPU or scale separately the GPU and CPU parts.

There was the idea of worker pools mentioned in the distributed issue tracker that may be having another look at the things mentioned there.

We could also have a MetaCluster object or something like this that start a scheduler and managed multiple heterogeneous FooCluster. Maybe that would be cleaner API-wise, I am a bit worried that we put too many arguments into FooCluster.

In an ideal world this kind of heterogeneous cluster would be dealt with in distributed but at the same time dask-jobqueue has some use case and is a good place to experiment with this kind of ideas.

If anyone manage to make some progress on this, do let us know!

lesteve avatar Apr 28 '20 07:04 lesteve

The thing with this thing is that the .scale and adaptive semantics are not super-well defined

IMHO a much better semantic would be for the .scale call to take what are now the argument of the cluster creator. Instead of doing things like

cluster = SLURMCluster(
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        memory='255GB',
        queue="regular",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')
cluster.scale(jobs=10)

and be stuck with everything in the regular queue with the requested memory, one should be able to simply do

cluster = SLURMCluster()

cluster.scale(jobs=3,
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        memory='255GB',
        queue="regular",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')
cluster.scale(jobs=2,
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        memory='40GB',
        queue="GPU",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')
cluster.scale(jobs=5,
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        queue="preempt",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')

This way one would still get 10 jobs but much more flexible. Is there no interest in having this or is there a showstopper preventing it from happening? From a conceptual point of view it seems very simple: just create jobs scripts as they are created now (a number of them instead of one) and submit them as they are submitted now (just each with a different script rather than the same one for everything). For backward compatibility one could call the new scale method flexible_scale or something like that, and have that throw an exception if that is called on a Cluster created "the old way".

Looking at the source code, I got very soon lost into too many layers of indirection, and never found where the batch script (in this case for Slurm) is created. So I could not grasp any insight on whether or not this is easily doable as it seems to be at the conceptual level.

davide-q avatar Dec 20 '23 21:12 davide-q

Actually, even without changing the API this appears to be working (but I need to do more tests to confirm there are not unintended consequences)


cluster = SLURMCluster(
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        memory='255GB',
        queue="regular",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')
cluster.scale(jobs=10)

cluster._job_kwargs['queue'] = 'preempt'

cluster.scale_up(100)

It's ugly, but the implementation of scale via the job_header property which in turn uses the _dummy_job property implemented with the creation of a new object each time leaves only this choice if one doesn't want to modify dask-jobqueue and/or distributed/distributed/deploy/spec.py source code.

davide-q avatar Jan 10 '24 21:01 davide-q

Sorry for the delay to answer here...

Is there no interest in having this or is there a showstopper preventing it from happening?

There is an interest for sure. It's been too long since I dived into the code here (there is a lot of indirections as you said), right now my only feeling is that it somewhat changes a bit the standardisation between different Dask Cluster managers implementations, but we already have one with the jobs kwarg of scale.

Given you found some workaround that already works, it seems that would not introduce that much changes, especially if we kept default values and just override them when scaling.

The solution proposed here initially was also a bit more wide, and I hoped it would be simple enough to implement, but I've never had the time to dig into it.

I'm also a bit afraid that changing Worker specification on the fly might have some consequences with how SpecCluster class is working, especially when using adaptive. I already found some problem it might raise when scaling with cores or memory kwargs.

But we should definitly look at this solution and see if it is possible!

guillaumeeb avatar Jan 11 '24 18:01 guillaumeeb