distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Environment Setting in the Nanny

Open quasiben opened this issue 4 years ago • 26 comments

cuDF recently changed and it now creates a cuda context at import. This is necessary for a variety of reasons, most importantly is validation -- cuDF wants to validate if it can run on the device. This change breaks some assumptions we currently make in the dask-gpu ecosystem, namely, the dask-cuda-worker context initialization which is done via a dask-worker preload script.

The environment variable necessary for our use case is CUDA_VISIBLE_DEVICES which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes. When using LocalCluster or LocalCUDACluster, the startup looks like the following:

  1. start nanny process
  1. spawn thread for worker

In the current setup, the environment variable does not get set until step 2). If this is true, we'd like to explore how environment variable setting might happen during step 1). A solution here would also benefit other libraries which may need configurations on main thread such as OpenMP or BLAS (anything which needs to be configured before importing the library)

quasiben avatar Apr 07 '20 18:04 quasiben

It sounds like you're asking for hooks to specify environment variables with preload scripts earlier on in the startup process. Is this correct?

If so

https://github.com/dask/distributed/pull/3673 https://github.com/dask/distributed/pull/3678

On Tue, Apr 7, 2020 at 11:45 AM Benjamin Zaitlen [email protected] wrote:

cuDF recently changed and it now creates cuda context at import. This is necessary for a variety of reasons, most importantly is validation -- cuDF wants to validate if it can run on the device. This change breaks some assumptions we currently make in the dask-gpu ecosystem, namely, the dask-cuda-worker context initialization https://github.com/rapidsai/dask-cuda/blob/branch-0.14/dask_cuda/initialize.py which is done via a dask-worker preload script.

The environment variable necessary for our use case is CUDA_VISIBLE_DEVICES which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes. When using LocalCluster or LocalCUDACluster, the startup looks like the following:

  1. start nanny process
  1. spawn thread for worker

In the current setup, the environment variable does not get set until step 2). If this is true, we'd like to explore how environment variable setting might happen during step 1). A solution here would also benefit other libraries which may need configurations on main thread such as OpenMP or BLAS (anything which needs to be configured before importing the library)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGCGT5WMSAF35CUPJDRLNYEDANCNFSM4MDLAYCA .

mrocklin avatar Apr 07 '20 18:04 mrocklin

That is correct but I think those PRs handle preloading after the original nanny process has started. Additionally, I think there may also be a race between the multiple workers spawning

I think what is actually happening can be reduced to the following example -- dask does something like the following

import os
import rmm
import cudf
import multiprocessing as mp
import time


def f(name, env):
    os.environ.update(env)
    rmm.reinitialize(
        pool_allocator=True, managed_memory=False, initial_pool_size=30_000_000_000
    )
    print(env)
    time.sleep(2)

if __name__ == "__main__":
    ctx = mp.get_context("spawn")
    p1 = ctx.Process(target=f, args=("bob", {"CUDA_VISIBLE_DEVICES": "0,2"}))
    p2 = ctx.Process(target=f, args=("bob", {"CUDA_VISIBLE_DEVICES": "2,0"}))
    p1.start()
    # succeeds when the following line is uncommented
    # os.environ.update({"CUDA_VISIBLE_DEVICES": "2,0"})
    p2.start()
    p1.join()
    p2.join()

The above script fails with cuDF, because cuDF now creates a context prior to creating a new process which makes things rather challenging. The new process will still effectively have some cuda env remnants lying around and why dask took some care to create contexts after spawning.

If one uncomments the os.environ.update({"CUDA_VISIBLE_DEVICES": "2,0"}) line, both Python and CUDA are 😊because we are now changing the environment before spawning new processes.

I am not sure as to the best course of action here. We could contextualize the process creation but this may be challenging. @kkraus14 @pentschev do you have thoughts here ?

quasiben avatar Apr 08 '20 02:04 quasiben

So for some additional context (pun intended!) cuDF doesn't create a CUDA context on import, but we do initialize the CUDA Driver API which seems to cache the value of CUDA_VISIBLE_DEVICES for when a runtime context is created.

We also explored using the cudaSetDevice runtime API to set the targetted device as opposed to relying on the CUDA_VISIBLE_DEVICES and while using it from various libraries correctly affects other libraries in limited testing with cudf, cupy, and numba, it's somewhat common for libraries to call cudaSetDevice(0) which will then have unintended side effects for us (i.e. xgboost does this, unsure of interaction with UCX).

Also, in general, forking a process (even a C process, no python) after creating a CUDA context causes bad things as the CUDA context can't exist in two processes memory spaces simultaneously, so we can focus our conversation on the spawn method of launching processes.

I also went digging to see if there was a way to have multiprocessing run a piece of code or set environment variables before it starts to recreate the parent namespace or execute the target function, but I didn't see anything.

kkraus14 avatar Apr 08 '20 02:04 kkraus14

Thanks for the information gents. I'll take a look tomorrow and see if I can think of anything.

On Tue, Apr 7, 2020, 7:59 PM Keith Kraus [email protected] wrote:

So for some additional context (pun intended!) cuDF doesn't create a CUDA context on import, but we do initialize the CUDA Driver API which seems to cache the value of CUDA_VISIBLE_DEVICES for when a runtime context is created.

We also explored using the cudaSetDevice runtime API to set the targetted device as opposed to relying on the CUDA_VISIBLE_DEVICES and while using it from various libraries correctly affects other libraries in limited testing with cudf, cupy, and numba, it's somewhat common for libraries to call cudaSetDevice(0) which will then have unintended side effects for us (i.e. xgboost does this, unsure of interaction with UCX).

Also, in general, forking a process (even a C process, no python) after creating a CUDA context causes bad things as the CUDA context can't exist in two processes memory spaces simultaneously, so we can focus our conversation on the spawn method of launching processes.

I also went digging to see if there was a way to have multiprocessing run a piece of code or set environment variables before it starts to recreate the parent namespace or execute the target function, but I didn't see anything.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-610724488, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBYFTB7G6AVTGAFQZTRLPSAPANCNFSM4MDLAYCA .

mrocklin avatar Apr 08 '20 03:04 mrocklin

Thanks for the minimal example @quasiben . I think the right way would indeed be to set environment variables just at the time a new process is forked/spawned, before the child has the chance to execute any other code. However, I don't know if that's possible from Python (or even C), so this is something we should explore.

pentschev avatar Apr 08 '20 09:04 pentschev

The environment variable necessary for our use case is CUDA_VISIBLE_DEVICES which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes

I'm not sure I understand this. Don't we need to set the environment variable just before we make a CUDA context rather than before the process is created? I'm not sure I don't understand why a preload script running on the Dask worker doesn't work? Is this because UCX or cudf is creating a CUDA context at import time? If so, what is triggering those imports? I would not expect any code within Dask would trigger an import of any library that creates a CUDA context before you have a chance for your preload script to run.

mrocklin avatar Apr 08 '20 15:04 mrocklin

So what if you set the preload value to the following:

distributed:
  worker:
    preload:
    - "CUDA_VISIBLE_DEVICES=0,2"
    - dask_cuda.initialize

(although presumably you're not using yaml here, but generating this dynamically and passing it directly from within dask-cuda-worker)

mrocklin avatar Apr 08 '20 15:04 mrocklin

Or sorry, I guess it should be the following:

set_env = """
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,2"
"""

nanny = Nanny(..., preload=[set_env, "dask_cuda.initialize"], ...)

mrocklin avatar Apr 08 '20 15:04 mrocklin

Some docs on preloads, including some features (like accepting raw Python code) that weren't previously advertised: https://github.com/dask/dask/pull/6077

On Wed, Apr 8, 2020 at 2:57 AM Peter Andreas Entschev < [email protected]> wrote:

Thanks for the minimal example @quasiben https://github.com/quasiben . I think the right way would indeed be to set environment variables just at the time a new process is forked/spawned, before the child has the chance to execute any other code. However, I don't know if that's possible from Python (or even C), so this is something we should explore.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-610866063, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTA44RYGJYKDYNVKHVDRLRC6XANCNFSM4MDLAYCA .

mrocklin avatar Apr 08 '20 15:04 mrocklin

Is this because UCX or cudf is creating a CUDA context at import time?

Yes, cuDF is.

If so, what is triggering those imports?

This is done today to ensure that devices are supported and correct CUDA versions are available.

I would not expect any code within Dask would trigger an import of any library that creates a CUDA context before you have a chance for your preload script to run.

This is happening at import time, so LocalCUDACluster will spawn new processes that will start and immediately import cuDF, always creating a context on GPU 0, before the preload script has a chance to run.

So what if you set the preload value to the following: ...

That is what we currently do, but the preload script runs too late for the case I mentioned just above. For reference, this is where it happens today: https://github.com/rapidsai/dask-cuda/blob/branch-0.14/dask_cuda/local_cuda_cluster.py#L246-L254.

pentschev avatar Apr 08 '20 17:04 pentschev

Yes, cuDF is.

We actually aren't creating a CUDA context, but we are initializing the driver which caches the device enumeration from CUDA_VISIBLE_DEVICES.

May I suggest we hop on a video call to have a higher bandwidth discussion about this?

kkraus14 avatar Apr 08 '20 19:04 kkraus14

Sure. I'm free most of the afternoon.

On Wed, Apr 8, 2020 at 12:18 PM Keith Kraus [email protected] wrote:

Yes, cuDF is.

We actually aren't creating a CUDA context, but we are initializing the driver which caches the device enumeration from CUDA_VISIBLE_DEVICES.

May I suggest we hop on a video call to have a higher bandwidth discussion about this?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-611142827, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAXOCBOO4BT7VX6EC3RLTEZLANCNFSM4MDLAYCA .

mrocklin avatar Apr 08 '20 19:04 mrocklin

@quasiben @kkraus14 @pentschev and I had a quick chat. There is some bizarre behavior in imports happening that I couldn't explain. @quasiben when you have a moment can I ask you to include your minimal example with time.sleep and the foo module?

We were also curious about how to create a Python process with a set of environment variables. This was posted as a Stack Overflow question here: https://stackoverflow.com/questions/61109571/how-do-i-set-environment-variables-in-a-new-python-process

mrocklin avatar Apr 08 '20 20:04 mrocklin

Also, for completeness, here is the script I wrote up to show that the Spawn mechanism doesn't carry over imports

# foo.py
def time_to_import_pandas():
    import time
    start = time.time()
    import pandas
    stop = time.time()
    print(stop - start)
import pandas
import multiprocessing
ctx = multiprocessing.get_context("spawn")
import foo
proc = ctx.Process(target=foo.time_to_import_pandas)
proc.start()  # prints about 1s, rather than 0s which we would expect if pandas had already been imported

mrocklin avatar Apr 08 '20 20:04 mrocklin

FWIW I looked into the environment variables issue. I haven't figured out a way to set env vars with multiprocessing.Process, but multiprocessing.Pool has an initializer parameter that allows you to run setup code when a process is started. For example,

import os
import multiprocessing

def set_env(*args):
    for name, value in args:
        os.environ[name] = value

def print_env():
    from pprint import pprint
    for name in ["FOO", "BAR"]:
        print(f"{name} = {os.environ.get(name)}")

if __name__ == '__main__':
    ctx = multiprocessing.get_context("spawn")
    with ctx.Pool(processes=1, initializer=set_env, initargs=(("FOO", "1"), ("BAR", "2"))) as pool:
        print("In main process:")
        print_env()
        
        print("In process pool:")
        result = pool.apply_async(print_env)
        result.get()

        print("Back in main process:")
        print_env()

outputs

In main process:
FOO = None
BAR = None
In process pool:
FOO = 1
BAR = 2
Back in main process:
FOO = None
BAR = None

Not sure if that’s directly applicable to this situation, but it seemed at least potentially useful

jrbourbeau avatar Apr 09 '20 20:04 jrbourbeau

This still doesn't work unfortunately. I.E. here's a clearer example from @mrocklin above:

# foo.py
def do_nothing():
    pass
# bar.py
print("I was imported!")
# main.py
import foo
import bar
import multiprocessing

if __name__ == '__main__':
    ctx = multiprocessing.get_context("spawn")
    proc = ctx.Process(target=foo.do_nothing())
    proc.start()
# output
I was imported!
I was imported!

Using the same foo and bar above with a pool:

# main.py
import foo
import bar
import multiprocessing

if __name__ == '__main__':
    ctx = multiprocessing.get_context("spawn")
    with ctx.Pool(processes=1, initializer=print, initargs=("initializer",)) as pool:
        result = pool.apply_async(foo.do_nothing())
# output
I was imported!
I was imported!
initializer

kkraus14 avatar Apr 09 '20 21:04 kkraus14

@pitrou do you have any idea on what might be going on here? This breaks my understanding of how Python's multiprocessing works.

mrocklin avatar Apr 09 '20 21:04 mrocklin

multiprocessing always re-imports the main module in child processes. I'm not sure why that is, but there's probably a reason (it's been like that for as long as I remember).

pitrou avatar Apr 09 '20 21:04 pitrou

multiprocessing always re-imports the main module in child processes. I'm not sure why that is, but there's probably a reason (it's been like that for as long as I remember).

Do you know of any way to either set environment variables or run some type of user defined code before this happens?

kkraus14 avatar Apr 09 '20 21:04 kkraus14

No. However, you'll see that the main module is imported as __mp_main__ in the child process, not as __main__ (a __main__ alias is created afterwards). For example, add this code at the top of your main.py above:

print("I was imported! My name is:", __name__)

(which also means that everything under if __name__ == '__main__': doesn't get executed in the child)

pitrou avatar Apr 09 '20 21:04 pitrou

I believe what MP does is serialize the file, spawns a new process, deserializes the file, then executes the target function. I heard some rumors about how cloudpickle could be used in place of pickle for the serialization process but this may no longer be available to us. cloudpickle may have been useful here in that we could potentially have it scoped to only serializing the function. Perhaps this helps jog the memory of @pitrou should he recall these kinds of things

quasiben avatar Apr 10 '20 13:04 quasiben

I've been thinking more about locking. Assuming that we're only locking in-process (presumably os.environ is the protected resource) then if we can arrange things to put a lock around process os.environ + process creation that's small enough then I could be in favor of that. Creating many Nannys in one process is relatively rare. We would want to see what the slowdown was like, especially on a system with an NFS. We would also probably also want to only do this if env was specified.

cc @pentschev

mrocklin avatar Apr 10 '20 15:04 mrocklin

Can this be closed now that #6841 is in?

crusaderky avatar Aug 11 '22 10:08 crusaderky

Thanks @crusaderky for the reminder here. I was doing a bit of testing and I think you're right, we could close this but we must expose pre_spawn_env to Nanny's constructor first, allow me to elaborate why.

In Dask-CUDA, specifically LocalCUDACluster we launch multiple workers and each must have a different environment variable, which is the object of discussion of this thread. This is achieved by returning an updated spec for each worker with the appropriate env kwarg, that is consumed during scale. Because dask.config is global each call ends up overwriting the previous one, and the last one to run is what gets passed to new processes. By adding a pre_spawn_env kwarg to Nanny we can keep the correct variable passing at the now correct time (pre-spawn).

Are there any objections to implement that kwarg? If not, I can send a PR later today, otherwise I'm open to suggestions on handling this with dask.config.

pentschev avatar Aug 12 '22 12:08 pentschev

The whole reason why we split env and pre-spawn-env is that in the latter you can't have different values for different workers - unless you are very careful to spawn one at a time, at which point you might as well call dask.config.set instead.

crusaderky avatar Aug 12 '22 14:08 crusaderky

The whole reason why we split env and pre-spawn-env is that in the latter you can't have different values for different workers - unless you are very careful to spawn one at a time, at which point you might as well call dask.config.set instead.

Yes, you are right. I actually ran it and was successful, but it's anyway a concurrency game, specifically in https://github.com/dask/distributed/blob/9021d57f338128ea488285d5c37f5dfcc33f6482/distributed/nanny.py#L685-L688 we could have multiple processes overwriting each other's variables. In https://github.com/dask/distributed/issues/6749#issuecomment-1190739588 I attempted to ensure synchronization, but the cost of launching workers becomes far too prohibitive.

Unfortunately then we don't have a solution for this issue with https://github.com/dask/distributed/pull/6841, and we should keep it open.

pentschev avatar Aug 12 '22 17:08 pentschev