Worker plugin can not be registered on worker unless its entire package source uploaded on server
Describe the issue:
Let's say I've created a plugin under my_package.dask.plugins.MyWorkerPlugin.
the plugin registration:
from my_package.dask.plugins import MyWorkerPlugin
client.register_plugin(MyWorkerPlugin())
will lead to:
File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 970, in _handle_comm
File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 7893, in register_worker_plugin
File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6488, in broadcast
File "/opt/conda/lib/python3.10/site-packages/distributed/utils.py", line 251, in All
File "/opt/conda/lib/python3.10/site-packages/distributed/scheduler.py", line 6466, in send_message
File "/opt/conda/lib/python3.10/site-packages/distributed/core.py", line 1181, in send_recv
Exception: ModuleNotFoundError("No module named 'my_package'")
It would be useful if Dask could provide more meaningful error messages or upload the package automatically. However, the serialisation method should allow the use of the registered method.
Environment:
- Dask version: 2024.5.2
- Python version: 3.10
- Operating System: WSL
- Install method (conda, pip, source): poetry
You should be able to define your module in cloudpickle to be pickled by value to force dask to upload this
https://github.com/cloudpipe/cloudpickle?tab=readme-ov-file#overriding-pickles-serialization-mechanism-for-importable-constructs
@fjetter , thanks for a quick reply. I will try it and get back to you with the feedback.
@fjetter , thanks it works!
from my_project.dask_plugins import MyModule
from my_project import dask_plugins
cloudpickle.register_pickle_by_value(dask_plugins)
client.register_plugin(MyModule())
Few comments regarding this issue:
- Should it be documented in the Dask Distributed page about plugins? https://distributed.dask.org/en/latest/plugins.html The only mention is here: https://github.com/dask/distributed/blob/d8dc8ad2172ff34113e4bc47d57ae55401cd6705/docs/source/protocol.rst#cloudpickle-for-functions-and-some-data
- The solution is a bit verbose. Should it be the responsibility of the
client.register_plugin(...)method? Can it be encapsulated intoregister_plugin?
This is not a shortcoming of the plugin system. You are faced with the exact same problem if you are submitting functions as ordinary tasks so this is a problem of the serialization system. I'm not sure what would be a better place to document this. I wouldn't mind if we just cross referenced this protocol section in the worker plugin (for reference, you linked to the code, the documentation is hosted here https://distributed.dask.org/en/stable/protocol.html ; dask/dask and dask/distributed have different docs domains). Are you interested in contributing a PR for the documentation?
Regarding the encapsulation in register_plugin this isn't easily possible. It's not just that it isn't a plugin exclusive problem but rather that we simply do not know and cannot infer this on your machine. Serialization by value is expensive and should only be used as a last resort so we cannot set this as a default. Likewise, we cannot perform a cluster-wide environment check for all packages (and submodules, and functions, ...) on every call.
@fjetter, another possible solution might be to provide more convenient way deliver a project source package to dask nodes. So, developer should upload whole project source before expecting using any plugins/functions/classes on worker nodes. Here is a ticket which describes the idea: https://github.com/dask/distributed/issues/8698
@fjetter , i will contribute the documentation adjustments
@fjetter , I read throught Plugins documentation and noticed that it describes two distinct methods to register the plugin. What the difference between and what method should be used distributed.scheduler.Scheduler.add_plugin, distributed.scheduler.Scheduler.register_scheduler_plugin and distributed.client.Client.register_plugin methods?