distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Worker plugin can not be registered on worker unless its entire package source uploaded on server

Open dbalabka opened this issue 1 year ago • 7 comments

Describe the issue: Let's say I've created a plugin under my_package.dask.plugins.MyWorkerPlugin.

the plugin registration:

from my_package.dask.plugins import MyWorkerPlugin
client.register_plugin(MyWorkerPlugin())

will lead to:

  File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 970, in _handle_comm
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 7893, in register_worker_plugin
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6488, in broadcast
  File "/opt/conda/lib/python3.10/site-packages/distributed/utils.py", line 251, in All
  File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6466, in send_message
  File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 1181, in send_recv
Exception: ModuleNotFoundError("No module named 'my_package'")

It would be useful if Dask could provide more meaningful error messages or upload the package automatically. However, the serialisation method should allow the use of the registered method.

Environment:

  • Dask version: 2024.5.2
  • Python version: 3.10
  • Operating System: WSL
  • Install method (conda, pip, source): poetry

dbalabka avatar Jun 12 '24 18:06 dbalabka

You should be able to define your module in cloudpickle to be pickled by value to force dask to upload this

https://github.com/cloudpipe/cloudpickle?tab=readme-ov-file#overriding-pickles-serialization-mechanism-for-importable-constructs

fjetter avatar Jun 13 '24 08:06 fjetter

@fjetter , thanks for a quick reply. I will try it and get back to you with the feedback.

dbalabka avatar Jun 13 '24 15:06 dbalabka

@fjetter , thanks it works!

from my_project.dask_plugins import MyModule
from my_project import dask_plugins 

cloudpickle.register_pickle_by_value(dask_plugins)
client.register_plugin(MyModule())

Few comments regarding this issue:

  1. Should it be documented in the Dask Distributed page about plugins? https://distributed.dask.org/en/latest/plugins.html The only mention is here: https://github.com/dask/distributed/blob/d8dc8ad2172ff34113e4bc47d57ae55401cd6705/docs/source/protocol.rst#cloudpickle-for-functions-and-some-data
  2. The solution is a bit verbose. Should it be the responsibility of the client.register_plugin(...) method? Can it be encapsulated into register_plugin?

dbalabka avatar Jun 13 '24 15:06 dbalabka

This is not a shortcoming of the plugin system. You are faced with the exact same problem if you are submitting functions as ordinary tasks so this is a problem of the serialization system. I'm not sure what would be a better place to document this. I wouldn't mind if we just cross referenced this protocol section in the worker plugin (for reference, you linked to the code, the documentation is hosted here https://distributed.dask.org/en/stable/protocol.html ; dask/dask and dask/distributed have different docs domains). Are you interested in contributing a PR for the documentation?

Regarding the encapsulation in register_plugin this isn't easily possible. It's not just that it isn't a plugin exclusive problem but rather that we simply do not know and cannot infer this on your machine. Serialization by value is expensive and should only be used as a last resort so we cannot set this as a default. Likewise, we cannot perform a cluster-wide environment check for all packages (and submodules, and functions, ...) on every call.

fjetter avatar Jun 17 '24 09:06 fjetter

@fjetter, another possible solution might be to provide more convenient way deliver a project source package to dask nodes. So, developer should upload whole project source before expecting using any plugins/functions/classes on worker nodes. Here is a ticket which describes the idea: https://github.com/dask/distributed/issues/8698

dbalabka avatar Jun 17 '24 12:06 dbalabka

@fjetter , i will contribute the documentation adjustments

dbalabka avatar Jun 17 '24 12:06 dbalabka

@fjetter , I read throught Plugins documentation and noticed that it describes two distinct methods to register the plugin. What the difference between and what method should be used distributed.scheduler.Scheduler.add_plugin, distributed.scheduler.Scheduler.register_scheduler_plugin and distributed.client.Client.register_plugin methods?

dbalabka avatar Jul 01 '24 17:07 dbalabka