Forward `multiprocessing.current_process().authkey` to workers
I'm using a multiprocessing Manager to enable communication between the main process and it's workers (for progress reporting). I also want to support dask_jobqueue.SLURMCluster as a method to distribute work. My problem is the following:
To connect to the manager in the main process, the client needs an authkey. However, When a proxy is pickled the authkey is deliberately dropped, due to security concerns.
Python's "regular" multiprocessing makes sure that multiprocessing.current_process().authkey is propagated to spawned or forked processes, so that they are still able to communicate with the main process.
With dask_jobqueue.SLURMCluster, however, this requires some manual setup to get the main process' authkey to the workers.
I'm suggesting to implement a secure way to propagate multiprocessing.current_process().authkey to the worker.
Related issue: https://github.com/python/cpython/issues/139801
For now, I solved this with a WorkerPlugin:
class AuthkeyFromEnvPlugin(distributed.diagnostics.plugin.WorkerPlugin):
"""
A Dask WorkerPlugin to set the authkey for multiprocessing on the worker process.
By default, `sbatch` forwards all of the user's environment to the job.
"""
def setup(self, worker):
import multiprocessing.process
print("Setting authkey on worker process")
multiprocessing.process.current_process().authkey = os.environ[
"MULTIPROCESSING_AUTHKEY"
].encode("ascii")
# Generate a random authkey for the main process and all workers
authkey = secrets.token_hex(32)
os.environ["MULTIPROCESSING_AUTHKEY"] = authkey
multiprocessing.process.current_process().authkey = authkey.encode("ascii")
with (
SLURMCluster(
memory="1g",
processes=1,
cores=1,
queue=queue,
account=account,
interface=interface,
name="richer-progress-test",
) as cluster,
Client(cluster) as client,
):
client.register_plugin(AuthkeyFromEnvPlugin())
...
Maybe this can be integrated into dask-jobqueue (or maybe distributed)?
Thanks for writing this up and also documenting your workaround. This problem feels broad enough that we should put this in distributed. Do you have any interest in opening a PR that handles this as part of the core distributed functionality?
Sure!