distributed
distributed copied to clipboard
Add upload_dir function #925
Overview:
As discussed in #925, add an upload_dir
function to client
that allows users to batch-submit files. This function has two behaviors:
- If given a directory with many top level files (see below), it will add all files in that directory to the Python imports on the workers (essentially, as if you had called
upload_file
for each file in that directory).
src/
a.py
b.py
would allow you to call import a
on the workers after calling upload_dir("src")
.
- If
remote_path
is specified, the contents of the directory are stored under that path on each worker. This allows for using Python's package import syntax (e.g.import src.mypkg.subpkg.file
) on the workers.
src/
foo/
bar/
baz/
a.py
b.py
would allow you to call import src.mypkg.a
after calling, for example, upload_dir("src/baz", remote_path="src/mypkg")
.
Implementation:
This function makes a compressed tarfile of the given folder and sends that to each worker. It is built using the same plugin system that upload_file
now uses, allowing it to handle additional workers later. I tried to re-use the upload_file
code flow where I could. I also fixed the save_sys_modules
test util since I don't think it was working as intended.
Design Decisions:
- The tarfile is stored in memory on the local computer, similar to how
UploadFile
works. This means that there could be issues if the contents of the provided directory take up more space than available memory, but I assumed that in this case users would just be building a custom Docker image with their files in it. - As mentioned, I tried to reuse the
upload_file
code path, soupload_dir
shares the functions indistributed/worker.py
anddistributed/utils.py
, though I had to make some modifications to handle the additional logic. - When uploading a directory with package format again, which would force a re-import of the modules, I discovered that
pkgutil.iter_modules
won't search recursively. To force animport.reload
I had to searchsys.modules
for the directory's package name. This was my first time getting this deep into the Python import system so there might be a better way to do it. I unfortunately couldn't figure out another way that also passed all tests.
CI failures look unrelated
@mrocklin is there anyone to tag that would be good to review this?
Things seem to have been a bit quiet here for a while, @sonicxml have you found an alternate way of doing this without modifying dask source directly?
Can one of the admins verify this patch?
Thanks for the ping @andrzejnovak. We recently added a UploadDirectory
plugin (https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.UploadDirectory). You might consider trying that out to see if it suites your needs
@sonicxml does the UploadDirectory
plugin work for you too?
@jrbourbeau Cheers, that looks like it would work for me.
@jrbourbeau unfortunately this doesn't seem to work for me
For comparison what I was using before was (where workflows/some_modules.py
)
import shutil
shutil.make_archive("workflows", "zip", base_dir="workflows")
client.upload_file("workflows.zip")
That kind of works, but the code seems to randomly crash with AttributeError: Can't get attribute ... on <module ... some_module.py>
. Sometimes that worked, sometimes not, possibly related to workers restarting.
Replacing the above with
from distributed.diagnostics.plugin import UploadDirectory
client.register_worker_plugin(UploadDirectory("workflows"), nanny=True)
just crashes right away with
processor_instance = cloudpickle.loads(lz4f.decompress(processor_instance))
ModuleNotFoundError: No module named 'workflows'
Ah, I see. Could you try setting update_path=True
when creating the UploadDirectory
instance. That should put to directory on sys.path
No luck there either (tested it earlier, but went to double check)
@jrbourbeau is there a way to query paths/environ of the workers?
You can use client.run
to run any function directory on the workers and forward the results back to the client process. For example, something like (the code below is untested):
def get_env_vars():
import os
return list(os.environ.keys())
client.run(get_env_vars)
@jrbourbeau Cheers that helped me debug a bit I think. Here's what I am dealing with. Indeed, it doesn't appear the directory gets uploaded to the worker then.
Looking at the UploadDirectory
plugin source, the directory you're uploading is placed in the nanny's local_directory
https://github.com/dask/distributed/blob/25881517d101b72944a7352acdcff8bd657bc0b4/distributed/diagnostics/plugin.py#L399-L413
Could you check in nanny.local_directory
instead? You can retrieve that location using something like
client.run(lambda dask_worker: dask_worker.local_directory, nanny=True)
@jrbourbeau alright cool, calling the above with nanny=True
shows workflows
uploaded and in the path so that bit works. I assumed these were propagating from the nanny to the worker but guess not. Strangely switching UploadPlugin(..., nanny=False)
throws TypeError: setup() got an unexpected keyword argument 'worker'
on the first cell execution (I guess worker is not assigned), passes on the second and uploads the files, but doesn't update path.
@jrbourbeau I tried @andrzejnovak code snippet as well on the same facility with nanny=True
and I see that workflow directory is getting uploaded everywhere including HTCondor workers but I am keep on getting: OSError: Timed out trying to connect to tls://172.19.0.5:42245 after 30 s
where 172.19.0.5:42245
is address of nanny.
On worker I see Nanny plugin upload-directory-workflows
:
distributed.nanny - INFO - Starting Nanny plugin upload-directory-workflows
distributed.nanny - INFO - Start Nanny at: 'tls://172.19.0.5:42245'
distributed.worker - INFO - Start worker at: tls://red-c5907.unl.edu:49186
distributed.worker - INFO - Listening to: tls://red-c5907.unl.edu:49186
distributed.worker - INFO - dashboard at: red-c5907.unl.edu:33983
distributed.worker - INFO - Waiting to connect to: tls://oksana-2eshadura-40cern-2ech.dask.coffea.casa:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 2
distributed.worker - INFO - Memory: 5.72 GiB
distributed.worker - INFO - Local Directory: /var/lib/condor/execute/dir_445/dask-worker-space/worker-nlkullsk
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tls://oksana-2eshadura-40cern-2ech.dask.coffea.casa:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
@jrbourbeau I see that it works out of the box if we have only 1 k8s worker (we are running it as sidecar), but if there are HTCondor workers then I see timeout.
Update: @jthiltges confirmed that 172.19 address range is used by the condor workers (in docker containers). It won't be reachable from outside the node where it's running.
@jrbourbeau, also actually even I see that directory workflows
is uploaded on k8s dask workers, client.run(get_info)
still not able to import it. Could it be because path
was not updates?
@jrbourbeau I see that it works out of the box if we have only 1 k8s worker (we are running it as sidecar), but if there are HTCondor workers then I see timeout.
Update: @jthiltges confirmed that 172.19 address range is used by the condor workers (in docker containers). It won't be reachable from outside the node where it's running.
@jrbourbeau could you explain us please how it actually suppose to work, if timeout OSError is because we need to connect to the worker or worker is trying to connect to the scheduler to retrieve the data? It is not clear from message...
Is there any info about that feature? I am also lacking ability to send code with complex path to files.
For anyone stumbling upon this, the register_worker_plugin
function has been deprecated in favour of register_plugin