[Data] Execution hangs when there are many operators in the pipeline
What happened + What you expected to happen
I have a long-chained plan as below:
InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> ActorPoolMapOperator[Map(StatefulFn1)] -> ActorPoolMapOperator[MapBatches(StatefulFn2)] -> ActorPoolMapOperator[Map(StatefulFn3)] -> ActorPoolMapOperator[MapBatches(StatefulFn4)] -> ActorPoolMapOperator[Map(StatefulFn2)] -> ActorPoolMapOperator[Map(StatefulFn3)] -> ActorPoolMapOperator[MapBatches(StatefulFn1)] -> ActorPoolMapOperator[MapBatches(StatefulFn2)] -> ActorPoolMapOperator[MapBatches(StatefulFn3)] -> ActorPoolMapOperator[Map(StatefulFn1)] -> ActorPoolMapOperator[Map(StatefulFn2)] -> ActorPoolMapOperator[MapBatches(StatefulFn3)]
The streaming pipeline executes a while and it seem to hang once enter the Map(StatefulFn3) stage. My script can reproduce it stably. I use most default settings than the ones in the script.
Versions / Dependencies
Ray 2.30.0
Reproduction script
import time
import numpy as np
import ray
class StatefulFn1:
def __init__(self):
self.num_reuses = 0
def __call__(self, x):
r = self.num_reuses
time.sleep(1.5)
self.num_reuses += 1
return {"id": np.array([r])}
class StatefulFn2:
def __init__(self):
self.num_reuses = 0
def __call__(self, x):
r = self.num_reuses
time.sleep(1.5)
self.num_reuses += 1
# mock大的输出
r = list(range(1000))
return {"id": np.array(r)}
class StatefulFn3:
def __init__(self):
print("StatefulFn3 init")
self.num_reuses = 0
def __call__(self, x):
r = self.num_reuses
# mock 计算重的操作
for i in range(1000):
for j in range(1000):
r += i * j
time.sleep(1.5)
self.num_reuses += 1
return {"id": np.array([r])}
class StatefulFn4:
def __init__(self):
self.num_reuses = 0
def __call__(self, x):
r = self.num_reuses
time.sleep(1.5)
self.num_reuses += 1
return {"id": np.array([r])}
min_concurrency = 1
max_concurrency = 100
def test_case(shutdown_only):
ray.init(num_cpus=100)
ts = time.time()
n = 10
ds = ray.data.range(n)
actor_reuse = ds.map(StatefulFn1, num_cpus=1, concurrency=(min_concurrency, max_concurrency)).map_batches(
StatefulFn2, num_cpus=2,
batch_size=1,
concurrency=(min_concurrency,
max_concurrency)).map(
StatefulFn3,
num_cpus=1,
concurrency=(
min_concurrency,
max_concurrency)).map_batches(
StatefulFn4, num_cpus=2, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map(StatefulFn2,
num_cpus=1,
concurrency=(
min_concurrency,
max_concurrency)).map(
StatefulFn3,
num_cpus=1,
concurrency=(
min_concurrency, max_concurrency)).map_batches(
StatefulFn1, num_cpus=2, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map_batches(StatefulFn2,
num_cpus=1,
batch_size=1,
concurrency=(
min_concurrency,
max_concurrency)).map_batches(
StatefulFn3, num_cpus=4, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map(StatefulFn1,
num_cpus=1,
concurrency=(
min_concurrency,
max_concurrency)).map(
StatefulFn2,
num_cpus=1,
concurrency=(
min_concurrency, max_concurrency)).map_batches(
StatefulFn3, num_cpus=1, batch_size=1, concurrency=(min_concurrency, max_concurrency)).take_all()
print(f"cost time {time.time() - ts} seconds")
print(actor_reuse.stats())
Issue Severity
High: It blocks me from completing my task.