distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Outdated and/or unclear documentation about SchedulerPlugin

Open RaphaelRobidas opened this issue 1 year ago • 2 comments

Describe the issue: The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

The dask_setup function seems to appear out of no-where and click isn't imported. Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:

import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)
    ret = client.submit(job, 1).result()
    print(ret)

The console log reads "2", while we would expect an Exception to be raised.

It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    plugin = MyPlugin(client.scheduler)
    client.register_scheduler_plugin(plugin)

    ret = client.submit(job, 1).result()
    print(ret)

Environment:

  • Dask version: 2024.4.2
  • Python version: 3.10.12
  • Operating System: Linux Mint 21.2
  • Install method (conda, pip, source): pip

RaphaelRobidas avatar Jun 23 '24 22:06 RaphaelRobidas

@RaphaelRobidas: Thanks for reporting this issue. There are several things going on here, so let me try to unpack them:

The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

The dask_setup function seems to appear out of no-where and click isn't imported.

This is a good catch, the documentation is a bit unclear here. Note that the RabbitMQ Example contains this line:

Run with: dask scheduler --preload <filename.py>

From what I can tell, the example you mention copies that pattern, but we improve it by importing click and making it clear how to run this (e.g., by simply copying the line I quotes above). Would you be interested in contributing a PR for this?

Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:

import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)
    ret = client.submit(job, 1).result()
    print(ret)

The console log reads "2", while we would expect an Exception to be raised.

Your script does indeed not register the scheduler plugin, which is why you don't see an exception being logged.

As an aside, raising an exception within Scheduler.transition does not raise an exception on the client but instead logs the exception on the scheduler: https://github.com/dask/distributed/blob/03035dafb939166d241fb0db09433c2bb3bc369c/distributed/scheduler.py#L2072-L2077 This is by design to avoid scheduler corruption by faulty user code.

It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    plugin = MyPlugin(client.scheduler)
    client.register_scheduler_plugin(plugin)

    ret = client.submit(job, 1).result()
    print(ret)

There are two ways to register a plugin: With a scheduler preload script (which is what the example you mention in the beginning does) and via Client.register_plugin. Both work slightly differently. For example, the dask_setup function in the preload script would hand you a Scheduler object, whereas Client.scheduler is merely an RPC object that you can use to call remote procedures on the scheduler. For Client.register_plugin, SchedulerPlugin.start()will get called upon registration on the scheduler and this would hand you aScheduler` object as well. This adds to the confusion here. I've added a ticket aimed at improving our documentation around this here: #8721

hendrikmakait avatar Jun 24 '24 09:06 hendrikmakait

@hendrikmakait Thanks for the detailed reply, see PR #8729!

RaphaelRobidas avatar Jun 24 '24 14:06 RaphaelRobidas