distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Forward `multiprocessing.current_process().authkey` to workers

Open moi90 opened this issue 3 months ago • 3 comments

I'm using a multiprocessing Manager to enable communication between the main process and it's workers (for progress reporting). I also want to support dask_jobqueue.SLURMCluster as a method to distribute work. My problem is the following:

To connect to the manager in the main process, the client needs an authkey. However, When a proxy is pickled the authkey is deliberately dropped, due to security concerns. Python's "regular" multiprocessing makes sure that multiprocessing.current_process().authkey is propagated to spawned or forked processes, so that they are still able to communicate with the main process.

With dask_jobqueue.SLURMCluster, however, this requires some manual setup to get the main process' authkey to the workers.

I'm suggesting to implement a secure way to propagate multiprocessing.current_process().authkey to the worker.

Related issue: https://github.com/python/cpython/issues/139801

moi90 avatar Oct 08 '25 18:10 moi90

For now, I solved this with a WorkerPlugin:

class AuthkeyFromEnvPlugin(distributed.diagnostics.plugin.WorkerPlugin):
    """
    A Dask WorkerPlugin to set the authkey for multiprocessing on the worker process.

    By default, `sbatch` forwards all of the user's environment to the job.
    """

    def setup(self, worker):
        import multiprocessing.process

        print("Setting authkey on worker process")

        multiprocessing.process.current_process().authkey = os.environ[
            "MULTIPROCESSING_AUTHKEY"
        ].encode("ascii")

# Generate a random authkey for the main process and all workers
authkey = secrets.token_hex(32)
os.environ["MULTIPROCESSING_AUTHKEY"] = authkey
multiprocessing.process.current_process().authkey = authkey.encode("ascii")

with (
    SLURMCluster(
        memory="1g",
        processes=1,
        cores=1,
        queue=queue,
        account=account,
        interface=interface,
        name="richer-progress-test",
    ) as cluster,
    Client(cluster) as client,
):
    client.register_plugin(AuthkeyFromEnvPlugin())

    ...

Maybe this can be integrated into dask-jobqueue (or maybe distributed)?

moi90 avatar Oct 08 '25 20:10 moi90

Thanks for writing this up and also documenting your workaround. This problem feels broad enough that we should put this in distributed. Do you have any interest in opening a PR that handles this as part of the core distributed functionality?

jacobtomlinson avatar Oct 09 '25 09:10 jacobtomlinson

Sure!

moi90 avatar Oct 10 '25 10:10 moi90