dask-yarn icon indicating copy to clipboard operation
dask-yarn copied to clipboard

Specify scheduler node

Open sauercrowd opened this issue 6 years ago • 7 comments

Is it possible to specify which node should be used for the yarn-scheduler? That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly

sauercrowd avatar Oct 22 '18 16:10 sauercrowd

According to the Skein documentation it is possible for a service to be run on node(s) with a specific yarn node_label (https://jcrist.github.io/skein/api.html?highlight=service#skein.Service), so maybe it would be possible to add an option to the yarncluster constructor which optionally let's a user specify a target node label for the scheduler. What do you think about that?

sauercrowd avatar Oct 22 '18 17:10 sauercrowd

I'd even prefer specifying hostnames, so no cluster configuration change is needed. I opened an issue in the skein Repo ( jcrist/skein#89 ) to check if this would be possible. Looking forward to opinions on that.

sauercrowd avatar Oct 22 '18 18:10 sauercrowd

That would be really handy to integrate it into Jupyter using the nbproxy, since I'm not able to reach the port directly

The scheduler always runs in a yarn container, while (I assume) your jupyter instance is running on an edge node. From what I understand nbserverproxy only works when both jupyter and the application are running on the same node (I've never directly set up nbserverproxy before, so I may be wrong here).

When you say you can't reach the port directly, do you mean the dask scheduler, or the web ui? dask-yarn is written assuming your client is running on the edge node, and currently I've just been ssh-tunneling out the dashboard when needed (admittedly rough).

There's not a cleaner solution currently, in the future I'd like to fix the dask dashboard to work through the yarn proxy (we'd need to optionally use ajax requests instead of web sockets). As the dashboard is just a bokeh app, I wrote a proof of concept here.

jcrist avatar Oct 23 '18 23:10 jcrist

Thanks for your explanation! I'm running the Jupyter notebook directly on one of the spark nodes, so it would help to just be able to stick the scheduler to one specific node in order to ensure it runs on the same node as the jupyter. This is also the reason why the nbproxy would work for me.

I admit it is kind of specific to my use-case, edge-nodes might be a more common scenario.

sauercrowd avatar Oct 24 '18 09:10 sauercrowd

This is now doable with the latest release, although the api isn't as clean as I'd like to to be. Skein's specification has many options, I'm not sure if all of them should be exposed through the YarnCluster constructor.

Currently you can set a node restriction by creating your own specification and passing it to YarnCluster.from_specification, or by specifying it in the configuration file. The specification field you'd want to add is here.

A few ideas to make this nicer for the user:

  1. Fully add all fields to the YarnCluster constructor. That's a lot of options to consider, but it may be worth doing.

  2. Parametrize potentially useful fields by configuration values (e.g. yarn.scheduler.node_label), but don't expose these in the constructor. Expert users can set these in their configuration file, or using dask.config.set({'yarn.scheduler.node_label': ...}). This keeps the constructor simpler, but still allows setting these parameters without creating your own full specification.

  3. Expose the _make_specification function (https://github.com/dask/dask-yarn/blob/master/dask_yarn/core.py#L30). Users could use this to build a base specification, and then mutate that specification to add/change whatever bits they needed. We may want to break the function up into smaller composable pieces (e.g. one for a worker, one for a scheduler, one for the whole thing), not sure.

  4. Leave as is and better document how to define your own specification (I suspect we'll want to do this anyway). I've made a start at this by cleaning up and getting some basic docs for our deployment scripts. An example specification adding a node restriction would be:

name: dask
queue: default

services:
  dask.scheduler:
    instances: 1
    max_restarts: 0
    nodes:
      # Require scheduler runs on mynode1
      - mynode1
    resources:
      memory: 2 GiB
      vcores: 1
    files:
      environment: /path/to/your/environment.tar.gz
    commands:
      - source environment/bin/activate
      - dask-yarn scheduler

  dask.worker:
    instances: 0
    max_restarts: -1
    resources:
      memory: 2 GiB
      vcores: 1
    depends:
      - dask.scheduler
    files:
      environment: /path/to/your/environment.tar.gz
    commands:
      - source environment/bin/activate
      - dask-yarn worker

Right now I'm against option 1, and am currently thinking options 2 and 3 combined may be the nicest way forward (easy config for common things, expose functions for expert users).

Thoughts?

jcrist avatar Oct 31 '18 15:10 jcrist

Sorry for the super-late reply! Awesome that this is implemented now!

Since the node option is really specific to the spark-cluster deployment IMO the best options would be 2 or 4, since users who want to use it definitely have to know the spark deployment (otherwise there's no point in using it) and it might be confusing for people who dont have to care about the actual nodes in the cluster ( when it's exposed in the constructor)

sauercrowd avatar Jan 19 '19 20:01 sauercrowd

Hi @sauercrowd, you have mentioned that feature is implemented already. Could you please let me know what is the implemented option?. I have the same requirement now to implement with Apache airflow and I need to know where the scheduler is deployed.

wilsonalbert avatar Oct 19 '20 21:10 wilsonalbert