ray icon indicating copy to clipboard operation
ray copied to clipboard

[core][experimental] Accelerated DAG should execute work on actor's main thread

Open stephanie-wang opened this issue 1 year ago • 2 comments
trafficstars

Description

Currently actor tasks in the DAG execute on a background thread on the actor, which is a bit of a gotcha when the actor has some thread-local state that is initialized before the DAG is created. We should move the execution loop to the main thread and execute other system-level tasks on a background thread.

Use case

No response

stephanie-wang avatar Jun 29 '24 01:06 stephanie-wang

Default behavior: the DAG should run on the main actor thread

Additional API: if user specifies a thread name during experimental_compile, then we run the DAG on that thread

If a thread is already being used by some other DAG, throw an error during compile.

stephanie-wang avatar Oct 16 '24 17:10 stephanie-wang

I will be working on this.

xslingcn avatar Oct 18 '24 23:10 xslingcn

system-level tasks => ex: close channel, resize channel

kevin85421 avatar Jan 14 '25 22:01 kevin85421

@stephanie-wang I took a look at the issue. The problem is that any normal ray.get call to the actors belonging to the graph will not work because the execution loop blocks the main thread indefinitely.

It does not seem worth it to support accessing thread-local state while disabling users from submitting normal tasks.

def test_simulate_pipeline_parallelism(ray_start_regular, single_fetch):
    ...
    worker_0 = Worker.remote(0)
    worker_1 = Worker.remote(1)

    # Worker 0: FFFBBB
    # Worker 1: BBB
    with InputNode() as inp:
        w0_input = worker_0.read_input.bind(inp)
        ...
        output_dag = MultiOutputNode([d03, d04, d05])

    output_dag = output_dag.experimental_compile()
    res = output_dag.execute([0, 1, 2])
    ...
    assert ray.get(res) == [0, 1, 2]

    assert ray.get(worker_0.get_logs.remote()) == [ # <-- block here
        "FWD rank-0, batch-0",
        "FWD rank-0, batch-1",
        "FWD rank-0, batch-2",
        "BWD rank-0, batch-0",
        "BWD rank-0, batch-1",
        "BWD rank-0, batch-2",
    ]

I used py-spy to check the main thread of worker_0. It is stuck at get_objects when ray.get(worker_0.get_logs.remote()) blocks. In addition, I check the log of the core driver process. When the core driver process submits the task get_logs, 9 tasks have already been submitted, and one of them is still in-flight. The 9 tasks include 1 get_node_id, 7 do_allocate_channel, and 1 for execution loop which is still in-flight.

Image

Image

kevin85421 avatar Jan 21 '25 08:01 kevin85421

Maybe it is not worth supporting access to thread-local state if it blocks normal tasks.

kevin85421 avatar Jan 21 '25 09:01 kevin85421

https://github.com/ray-project/ray/pull/50032 attempts to move the execution loop to the same thread as the default thread (the thread used when concurrency_group is not specified in options).

  • The execution loop can't be running on MainThread

    • The task_execution_service_ on the actor's core worker process is not only responsible for handling control plane logic, such as posting the CoreWorker.HandlePushTaskActor instance, but also for executing actor tasks if no concurrency group is set. Therefore, if the execution doesn't finish, no other actor tasks can be executed, even if the incoming tasks belong to different concurrency groups, because the main thread responsible for control plane logic is currently busy with task execution.
    • TLDR: If the execution loop is on the MainThread, normal tasks can't be submitted, regardless of whether they belong to the same concurrency group or not.
    • Solution: In #50032, I always create a new thread for actor tasks, allowing the main thread to focus solely on control plane logic.
  • In current Ray implementations, the __init__ of the actor instance is always running on the MainThread. In the following example, the constructor will be running on the MainThread and compute will be running on another thread.

     import threading
     import ray
    
     # @ray.remote(concurrency_groups={"io": 2})
     @ray.remote(concurrency_groups={"io": 2})
     class ThreadLocalActor:
         @ray.method(concurrency_group="io")
         def __init__(self):
             # data local to actor default executor thread
             current_thread = threading.current_thread()
             print(current_thread)
             self.local_data = threading.local()
             self.local_data.value = 42
             self.local_data.thread_id = threading.get_ident()
    
         @ray.method(concurrency_group="io")
         def compute(self, value):
             current_thread = threading.current_thread()
             print(current_thread)
             assert threading.get_ident() == self.local_data.thread_id
             return value + self.local_data.value
    
     actor = ThreadLocalActor.remote()
     ref = actor.compute.remote(1)
     print(ray.get(ref))
    

Based on the two conclusions mentioned above, it is difficult to achieve our goal of putting the execution loop and __init__ in the same thread. One possible solution is to consider moving the constructor's execution to a thread other than the MainThread.

kevin85421 avatar Jan 23 '25 08:01 kevin85421

https://github.com/ray-project/ray/pull/50032/commits/b0006205b664cafbeb28e2e1dec3f5f54b18e6d3 this commit moves the constructor to run on the default executor. I ran the following script and the output is attached below.

Both __init__ and compute are running on the same thread and local_data has the same address, but local_data.value doesn't exist. It looks like Ray updates the thread context.

def test_execute_on_actor_thread(shutdown_only):
    import threading

    @ray.remote
    class ThreadLocalActor:
        def __init__(self):
            # data local to actor default executor thread
            print("init")
            current_thread = threading.current_thread()
            print(current_thread)
            self.local_data = threading.local()
            print("local data",self.local_data)
            self.local_data.value = 42
            print("local data value",self.local_data.value)
            print("value", id(self.local_data.value))

        def compute(self, value):
            print("compute")
            current_thread = threading.current_thread()
            print(current_thread)
            print("local data",self.local_data)
            if hasattr(self.local_data, "value"):
                print("self.local_data.value", self.local_data.value)
            else:
                print("self.local_data.value is not set")
            return value + self.local_data.value

    actor = ThreadLocalActor.remote()
    assert ray.get(actor.compute.remote(1)) == 43
Image

kevin85421 avatar Jan 24 '25 09:01 kevin85421

The destructor of MyObject will be called immediately after f1 finishes, so f2 cannot access self.thread_data.value. Based on my current observation, our Python thread behaves differently from a normal Python thread. In a normal Python thread, once the thread stops, it cannot be restarted, and thread-local data is released. However, in Ray, the underlying C++ thread is always running, but it is only considered a Python thread when it executes Python functions. From Python's perspective, this appears as if a stopped thread is being restarted.

import ray
import threading


class MyObject:
    def __init__(self, value):
        self.value = value

    def __del__(self):
        print("MyObject.__del__", self.value)


@ray.remote(concurrency_groups={"io": 1})
class Actor:
    def __init__(self):
        pass

    @ray.method(concurrency_group="io")
    def f1(self):
        print("f1")
        current_thread = threading.current_thread()
        print(f"current_thread: {current_thread}")
        self.threading_data = threading.local()
        self.threading_data.value = MyObject("f1")
        print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")

    @ray.method(concurrency_group="io")
    def f2(self):
        print("f2")
        current_thread = threading.current_thread()
        print(f"current_thread: {current_thread}")
        print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")

a = Actor.remote()
ray.get(a.f1.remote())
ray.get(a.f2.remote())
Image

kevin85421 avatar Jan 24 '25 18:01 kevin85421