Outdated and/or unclear documentation about SchedulerPlugin
Describe the issue: The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
def transition(self, key, start, finish, *args, **kwargs):
# Get full TaskState
ts = self.scheduler.tasks[key]
@click.command()
def dask_setup(scheduler):
plugin = MyPlugin(scheduler)
scheduler.add_plugin(plugin)
The dask_setup function seems to appear out of no-where and click isn't imported. Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:
import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
def transition(self, key, start, finish, *args, **kwargs):
ts = self.scheduler.tasks[key]
raise Exception
@click.command()
def dask_setup(scheduler):
plugin = MyPlugin(scheduler)
scheduler.add_plugin(plugin)
def job(i):
return i*2
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
ret = client.submit(job, 1).result()
print(ret)
The console log reads "2", while we would expect an Exception to be raised.
It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
def transition(self, key, start, finish, *args, **kwargs):
ts = self.scheduler.tasks[key]
raise Exception
def job(i):
return i*2
if __name__ == "__main__":
cluster = LocalCluster()
client = Client(cluster)
plugin = MyPlugin(client.scheduler)
client.register_scheduler_plugin(plugin)
ret = client.submit(job, 1).result()
print(ret)
Environment:
- Dask version: 2024.4.2
- Python version: 3.10.12
- Operating System: Linux Mint 21.2
- Install method (conda, pip, source): pip
@RaphaelRobidas: Thanks for reporting this issue. There are several things going on here, so let me try to unpack them:
The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:
from distributed.diagnostics.plugin import SchedulerPlugin class MyPlugin(SchedulerPlugin): def __init__(self, scheduler): self.scheduler = scheduler def transition(self, key, start, finish, *args, **kwargs): # Get full TaskState ts = self.scheduler.tasks[key] @click.command() def dask_setup(scheduler): plugin = MyPlugin(scheduler) scheduler.add_plugin(plugin)The
dask_setupfunction seems to appear out of no-where andclickisn't imported.
This is a good catch, the documentation is a bit unclear here. Note that the RabbitMQ Example contains this line:
Run with:
dask scheduler --preload <filename.py>
From what I can tell, the example you mention copies that pattern, but we improve it by importing click and making it clear how to run this (e.g., by simply copying the line I quotes above). Would you be interested in contributing a PR for this?
Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:
import click from distributed.diagnostics.plugin import SchedulerPlugin from distributed import Client, LocalCluster class MyPlugin(SchedulerPlugin): def __init__(self, scheduler): self.scheduler = scheduler def transition(self, key, start, finish, *args, **kwargs): ts = self.scheduler.tasks[key] raise Exception @click.command() def dask_setup(scheduler): plugin = MyPlugin(scheduler) scheduler.add_plugin(plugin) def job(i): return i*2 if __name__ == "__main__": cluster = LocalCluster() client = Client(cluster) ret = client.submit(job, 1).result() print(ret)The console log reads "2", while we would expect an Exception to be raised.
Your script does indeed not register the scheduler plugin, which is why you don't see an exception being logged.
As an aside, raising an exception within Scheduler.transition does not raise an exception on the client but instead logs the exception on the scheduler: https://github.com/dask/distributed/blob/03035dafb939166d241fb0db09433c2bb3bc369c/distributed/scheduler.py#L2072-L2077
This is by design to avoid scheduler corruption by faulty user code.
It is unclear how to register a scheduler plugin that can access the full task state.
client.schedulerdoes not seem to be serializable, so the following code does not work and crashes with the errorTypeError: cannot pickle 'coroutine' object:from distributed.diagnostics.plugin import SchedulerPlugin from distributed import Client, LocalCluster class MyPlugin(SchedulerPlugin): def __init__(self, scheduler): self.scheduler = scheduler def transition(self, key, start, finish, *args, **kwargs): ts = self.scheduler.tasks[key] raise Exception def job(i): return i*2 if __name__ == "__main__": cluster = LocalCluster() client = Client(cluster) plugin = MyPlugin(client.scheduler) client.register_scheduler_plugin(plugin) ret = client.submit(job, 1).result() print(ret)
There are two ways to register a plugin: With a scheduler preload script (which is what the example you mention in the beginning does) and via Client.register_plugin. Both work slightly differently. For example, the dask_setup function in the preload script would hand you a Scheduler object, whereas Client.scheduler is merely an RPC object that you can use to call remote procedures on the scheduler. For Client.register_plugin, SchedulerPlugin.start()will get called upon registration on the scheduler and this would hand you aScheduler` object as well. This adds to the confusion here. I've added a ticket aimed at improving our documentation around this here: #8721
@hendrikmakait Thanks for the detailed reply, see PR #8729!