dask-yarn
dask-yarn copied to clipboard
Specify scheduler node
Is it possible to specify which node should be used for the yarn-scheduler? That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly
According to the Skein documentation it is possible for a service to be run on node(s) with a specific yarn node_label (https://jcrist.github.io/skein/api.html?highlight=service#skein.Service), so maybe it would be possible to add an option to the yarncluster constructor which optionally let's a user specify a target node label for the scheduler. What do you think about that?
I'd even prefer specifying hostnames, so no cluster configuration change is needed. I opened an issue in the skein Repo ( jcrist/skein#89 ) to check if this would be possible. Looking forward to opinions on that.
That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly
The scheduler always runs in a yarn container, while (I assume) your jupyter instance is running on an edge node. From what I understand nbserverproxy only works when both jupyter and the application are running on the same node (I've never directly set up nbserverproxy before, so I may be wrong here).
When you say you can't reach the port directly, do you mean the dask scheduler, or the web ui? dask-yarn
is written assuming your client is running on the edge node, and currently I've just been ssh-tunneling out the dashboard when needed (admittedly rough).
There's not a cleaner solution currently, in the future I'd like to fix the dask dashboard to work through the yarn proxy (we'd need to optionally use ajax requests instead of web sockets). As the dashboard is just a bokeh app, I wrote a proof of concept here.
Thanks for your explanation! I'm running the Jupyter notebook directly on one of the spark nodes, so it would help to just be able to stick the scheduler to one specific node in order to ensure it runs on the same node as the jupyter. This is also the reason why the nbproxy would work for me.
I admit it is kind of specific to my use-case, edge-nodes might be a more common scenario.
This is now doable with the latest release, although the api isn't as clean as I'd like to to be. Skein's specification has many options, I'm not sure if all of them should be exposed through the YarnCluster
constructor.
Currently you can set a node
restriction by creating your own specification and passing it to YarnCluster.from_specification
, or by specifying it in the configuration file. The specification field you'd want to add is here.
A few ideas to make this nicer for the user:
-
Fully add all fields to the
YarnCluster
constructor. That's a lot of options to consider, but it may be worth doing. -
Parametrize potentially useful fields by configuration values (e.g.
yarn.scheduler.node_label
), but don't expose these in the constructor. Expert users can set these in their configuration file, or usingdask.config.set({'yarn.scheduler.node_label': ...})
. This keeps the constructor simpler, but still allows setting these parameters without creating your own full specification. -
Expose the
_make_specification
function (https://github.com/dask/dask-yarn/blob/master/dask_yarn/core.py#L30). Users could use this to build a base specification, and then mutate that specification to add/change whatever bits they needed. We may want to break the function up into smaller composable pieces (e.g. one for a worker, one for a scheduler, one for the whole thing), not sure. -
Leave as is and better document how to define your own specification (I suspect we'll want to do this anyway). I've made a start at this by cleaning up and getting some basic docs for our deployment scripts. An example specification adding a node restriction would be:
name: dask
queue: default
services:
dask.scheduler:
instances: 1
max_restarts: 0
nodes:
# Require scheduler runs on mynode1
- mynode1
resources:
memory: 2 GiB
vcores: 1
files:
environment: /path/to/your/environment.tar.gz
commands:
- source environment/bin/activate
- dask-yarn scheduler
dask.worker:
instances: 0
max_restarts: -1
resources:
memory: 2 GiB
vcores: 1
depends:
- dask.scheduler
files:
environment: /path/to/your/environment.tar.gz
commands:
- source environment/bin/activate
- dask-yarn worker
Right now I'm against option 1, and am currently thinking options 2 and 3 combined may be the nicest way forward (easy config for common things, expose functions for expert users).
Thoughts?
Sorry for the super-late reply! Awesome that this is implemented now!
Since the node
option is really specific to the spark-cluster deployment IMO the best options would be 2 or 4, since users who want to use it definitely have to know the spark deployment (otherwise there's no point in using it) and it might be confusing for people who dont have to care about the actual nodes in the cluster ( when it's exposed in the constructor)
Hi @sauercrowd, you have mentioned that feature is implemented already. Could you please let me know what is the implemented option?. I have the same requirement now to implement with Apache airflow and I need to know where the scheduler is deployed.