ray
ray copied to clipboard
[core][experimental] Accelerated DAG should execute work on actor's main thread
Description
Currently actor tasks in the DAG execute on a background thread on the actor, which is a bit of a gotcha when the actor has some thread-local state that is initialized before the DAG is created. We should move the execution loop to the main thread and execute other system-level tasks on a background thread.
Use case
No response
Default behavior: the DAG should run on the main actor thread
Additional API: if user specifies a thread name during experimental_compile, then we run the DAG on that thread
If a thread is already being used by some other DAG, throw an error during compile.
I will be working on this.
system-level tasks => ex: close channel, resize channel
@stephanie-wang I took a look at the issue. The problem is that any normal ray.get call to the actors belonging to the graph will not work because the execution loop blocks the main thread indefinitely.
It does not seem worth it to support accessing thread-local state while disabling users from submitting normal tasks.
def test_simulate_pipeline_parallelism(ray_start_regular, single_fetch):
...
worker_0 = Worker.remote(0)
worker_1 = Worker.remote(1)
# Worker 0: FFFBBB
# Worker 1: BBB
with InputNode() as inp:
w0_input = worker_0.read_input.bind(inp)
...
output_dag = MultiOutputNode([d03, d04, d05])
output_dag = output_dag.experimental_compile()
res = output_dag.execute([0, 1, 2])
...
assert ray.get(res) == [0, 1, 2]
assert ray.get(worker_0.get_logs.remote()) == [ # <-- block here
"FWD rank-0, batch-0",
"FWD rank-0, batch-1",
"FWD rank-0, batch-2",
"BWD rank-0, batch-0",
"BWD rank-0, batch-1",
"BWD rank-0, batch-2",
]
I used py-spy to check the main thread of worker_0. It is stuck at get_objects when ray.get(worker_0.get_logs.remote()) blocks. In addition, I check the log of the core driver process. When the core driver process submits the task get_logs, 9 tasks have already been submitted, and one of them is still in-flight. The 9 tasks include 1 get_node_id, 7 do_allocate_channel, and 1 for execution loop which is still in-flight.
Maybe it is not worth supporting access to thread-local state if it blocks normal tasks.
https://github.com/ray-project/ray/pull/50032 attempts to move the execution loop to the same thread as the default thread (the thread used when concurrency_group is not specified in options).
-
The execution loop can't be running on
MainThread- The
task_execution_service_on the actor's core worker process is not only responsible for handling control plane logic, such as posting theCoreWorker.HandlePushTaskActorinstance, but also for executing actor tasks if no concurrency group is set. Therefore, if the execution doesn't finish, no other actor tasks can be executed, even if the incoming tasks belong to different concurrency groups, because the main thread responsible for control plane logic is currently busy with task execution. - TLDR: If the execution loop is on the
MainThread, normal tasks can't be submitted, regardless of whether they belong to the same concurrency group or not. - Solution: In #50032, I always create a new thread for actor tasks, allowing the main thread to focus solely on control plane logic.
- The
-
In current Ray implementations, the
__init__of the actor instance is always running on theMainThread. In the following example, the constructor will be running on theMainThreadandcomputewill be running on another thread.import threading import ray # @ray.remote(concurrency_groups={"io": 2}) @ray.remote(concurrency_groups={"io": 2}) class ThreadLocalActor: @ray.method(concurrency_group="io") def __init__(self): # data local to actor default executor thread current_thread = threading.current_thread() print(current_thread) self.local_data = threading.local() self.local_data.value = 42 self.local_data.thread_id = threading.get_ident() @ray.method(concurrency_group="io") def compute(self, value): current_thread = threading.current_thread() print(current_thread) assert threading.get_ident() == self.local_data.thread_id return value + self.local_data.value actor = ThreadLocalActor.remote() ref = actor.compute.remote(1) print(ray.get(ref))
Based on the two conclusions mentioned above, it is difficult to achieve our goal of putting the execution loop and __init__ in the same thread. One possible solution is to consider moving the constructor's execution to a thread other than the MainThread.
https://github.com/ray-project/ray/pull/50032/commits/b0006205b664cafbeb28e2e1dec3f5f54b18e6d3 this commit moves the constructor to run on the default executor. I ran the following script and the output is attached below.
Both __init__ and compute are running on the same thread and local_data has the same address, but local_data.value doesn't exist. It looks like Ray updates the thread context.
def test_execute_on_actor_thread(shutdown_only):
import threading
@ray.remote
class ThreadLocalActor:
def __init__(self):
# data local to actor default executor thread
print("init")
current_thread = threading.current_thread()
print(current_thread)
self.local_data = threading.local()
print("local data",self.local_data)
self.local_data.value = 42
print("local data value",self.local_data.value)
print("value", id(self.local_data.value))
def compute(self, value):
print("compute")
current_thread = threading.current_thread()
print(current_thread)
print("local data",self.local_data)
if hasattr(self.local_data, "value"):
print("self.local_data.value", self.local_data.value)
else:
print("self.local_data.value is not set")
return value + self.local_data.value
actor = ThreadLocalActor.remote()
assert ray.get(actor.compute.remote(1)) == 43
The destructor of MyObject will be called immediately after f1 finishes, so f2 cannot access self.thread_data.value. Based on my current observation, our Python thread behaves differently from a normal Python thread. In a normal Python thread, once the thread stops, it cannot be restarted, and thread-local data is released. However, in Ray, the underlying C++ thread is always running, but it is only considered a Python thread when it executes Python functions. From Python's perspective, this appears as if a stopped thread is being restarted.
import ray
import threading
class MyObject:
def __init__(self, value):
self.value = value
def __del__(self):
print("MyObject.__del__", self.value)
@ray.remote(concurrency_groups={"io": 1})
class Actor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
def f1(self):
print("f1")
current_thread = threading.current_thread()
print(f"current_thread: {current_thread}")
self.threading_data = threading.local()
self.threading_data.value = MyObject("f1")
print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")
@ray.method(concurrency_group="io")
def f2(self):
print("f2")
current_thread = threading.current_thread()
print(f"current_thread: {current_thread}")
print(f"threading_data: {self.threading_data}, value: {self.threading_data.value}")
a = Actor.remote()
ray.get(a.f1.remote())
ray.get(a.f2.remote())