Package upload plugin
Dask's remote task execution is very straightforward, using a function not dependent on an external package. However, the most common use case relies on existing installed libraries or project packages. There are two types of dependencies:
- pip/poetry installed packages
- dependencies located in the project source/package
To deliver both dependencies to workers, we do the following:
- We create a custom dask image that contains all required extra pip packages. The primary assumption is that we don't change dependencies often, and project-specific dask images can remain untouched for a while. So, we don't rebuild images often. However, to simplify the process, we use some automation that extracts all required packages with
poetry export -f requirements.txt --output requirements.txtand builds a docker image remotely using the Kubernetes driver. PipInstall plugin is another way to do it, but it might slow down the cluster starting time till minutes. In our case, it takes less than a minute after image warmup on Kubernetes nodes. - The project source is more dynamic and requires to be uploaded each time we spin up a cluster. We use the existing client.upload_file() function that rely on UploadFile plugin plugin. To clarify, we keep running the cluster only during Python script execution and tear it down when the script finishes.
While we successfully solved the delivery of extra dependencies to remote worker nodes, this requires a deep understanding of Dask cluster deployment and extra helper functions that do not come with Dask out of the box. I propose improving the Developer's Experience in this direction. I would focus on local source delivery on worker nodes first. To be more specific:
- Creating a new function
upload_package(module: ModuleType)as a complimentary function for existing upload_file(path). - egg file automated creation by a new function
upload_package(). - Possibility to update existing
venvpackages like Dask-specific modules on remove worker nodes that should simplify the debug process. In the scope of #11160 investigation, I already proved that is possible (please see https://github.com/dask/dask/issues/11160#issuecomment-2158877551)
We already have a working prototype of the Worker/Scheduler plugin that performs all the above described. If there is a demand for such a plugin, we look forward to contributing our source. Any comments and suggestions are very welcome 🤗
Here are some usage examples:
Project source uploading to all workers:
import my_project_source
cluster = KubeCluster()
client = cluster.get_client()
# Upload the entire project source to all worker nodes in a very convenient way
clients.register_plugin(UploadModule(my_project_source))
# It will be even more convenient with a new client function
client.upload_package(my_project_source)
We can replace part of the Dask source on all worker nodes for debugging purposes:
from dask.dataframe import backends
client.upload_package(backends)
Here is an example of an adjusted function: https://github.com/dask/dask/issues/11160#issuecomment-2158877551
I would be very interested by such a plugin, there is no easy way at the moment to use a dask cluster in "development" mode, meaning when you are working on unpackaged source code.
Spent a few days dealing with this python package upload issues and env/dependency management. Came up with this solution to upload my CWD project and it seems to be working:
dask_plugins.py:
from distributed.diagnostics.plugin import SchedulerPlugin,WorkerPlugin
from dask.utils import tmpfile
import os
import sys
import uuid
import zipfile
#Modeled after UploadDirectory nanny plugin.
#Needed since ~2023 Dask serialize/deserialize architecture now requires scheduler and workers to have dependencies present.
class UploadDirectory_Base():
"""A plugin template to upload a local directory to scheduler/workers.
Parameters
----------
path: str
A path to the directory to upload
skip_words:
Sub folders to ignore
skip:
files to ignores
update_path:
Update Python path with uploaded directory.
"""
def __init__(
self,
path,
skip_words=(".git", ".github", ".pytest_cache", "tests", "docs"),
skip=(lambda fn: os.path.splitext(fn)[1] == ".pyc",),
update_path=False,
):
"""
Initialize the plugin mixin by reading in the data from the given file.
"""
path = os.path.expanduser(path)
self.path = os.path.split(path)[-1]
self.name = "upload-directory-" + os.path.split(path)[-1]
self.update_path = update_path
with tmpfile(extension="zip") as fn:
with zipfile.ZipFile(fn, "w", zipfile.ZIP_DEFLATED) as z:
for root, dirs, files in os.walk(path):
for file in files:
filename = os.path.join(root, file)
if any(predicate(filename) for predicate in skip):
continue
dirs = filename.split(os.sep)
if any(word in dirs for word in skip_words):
continue
archive_name = os.path.relpath(
os.path.join(root, file), os.path.join(path, "..")
)
z.write(filename, archive_name)
with open(fn, "rb") as f:
self.data = f.read()
async def extract(self, local_directory):
"""
Extracts the bufferized and zipped folder into a local directory.
"""
fn = os.path.join(local_directory, f"tmp-{uuid.uuid4()}.zip")
with open(fn, "wb") as f:
f.write(self.data)
import zipfile
tmp_dir = os.path.join(local_directory, f"tmp-{uuid.uuid4()}")
os.mkdir(tmp_dir)
with zipfile.ZipFile(fn) as z:
z.extractall(path=tmp_dir)
if self.update_path:
path = tmp_dir
#Add local_directory/tmp_dir to path
if path not in sys.path:
sys.path.insert(0, path)
#Add local_directory/tmp_dir/{upload directory name} to path
if os.path.join(tmp_dir, self.path) not in sys.path:
sys.path.insert(0, os.path.join(tmp_dir, self.path))
os.remove(fn)
class UploadDirectory_Scheduler(UploadDirectory_Base, SchedulerPlugin):
async def start(self, scheduler):
await self.extract(scheduler.local_directory)
class UploadDirectory_Worker(UploadDirectory_Base, WorkerPlugin):
async def setup(self, worker):
await self.extract(worker.local_directory)
Some file using dask client:
import cloudpickle
from . import dask_plugins
cloudpickle.register_pickle_by_value(dask_plugins)
...
plugin = dask_plugins.UploadDirectory_Scheduler(os.getcwd(), update_path=True)
client.register_plugin(plugin)
plugin = dask_plugins.UploadDirectory_Worker(os.getcwd(), update_path=True)
client.register_plugin(plugin)
I think all dependency uploaders now need to upload dependencies to BOTH the scheduler and workers due to new serialization changes (mentioned here): https://github.com/dask/distributed/issues/7797
I've added our implementation in PR https://github.com/dask/distributed/pull/8884 I'll add more details about the implementation later.