ray icon indicating copy to clipboard operation
ray copied to clipboard

[Data] Execution hangs when there are many operators in the pipeline

Open njumzs opened this issue 1 year ago • 0 comments

What happened + What you expected to happen

I have a long-chained plan as below: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> ActorPoolMapOperator[Map(StatefulFn1)] -> ActorPoolMapOperator[MapBatches(StatefulFn2)] -> ActorPoolMapOperator[Map(StatefulFn3)] -> ActorPoolMapOperator[MapBatches(StatefulFn4)] -> ActorPoolMapOperator[Map(StatefulFn2)] -> ActorPoolMapOperator[Map(StatefulFn3)] -> ActorPoolMapOperator[MapBatches(StatefulFn1)] -> ActorPoolMapOperator[MapBatches(StatefulFn2)] -> ActorPoolMapOperator[MapBatches(StatefulFn3)] -> ActorPoolMapOperator[Map(StatefulFn1)] -> ActorPoolMapOperator[Map(StatefulFn2)] -> ActorPoolMapOperator[MapBatches(StatefulFn3)]

The streaming pipeline executes a while and it seem to hang once enter the Map(StatefulFn3) stage. My script can reproduce it stably. I use most default settings than the ones in the script.

image

Versions / Dependencies

Ray 2.30.0

Reproduction script

import time
import numpy as np
import ray


class StatefulFn1:
    def __init__(self):
        self.num_reuses = 0

    def __call__(self, x):
        r = self.num_reuses
        time.sleep(1.5)
        self.num_reuses += 1
        return {"id": np.array([r])}


class StatefulFn2:
    def __init__(self):
        self.num_reuses = 0

    def __call__(self, x):
        r = self.num_reuses
        time.sleep(1.5)
        self.num_reuses += 1
        # mock大的输出
        r = list(range(1000))
        return {"id": np.array(r)}


class StatefulFn3:
    def __init__(self):
        print("StatefulFn3 init")
        self.num_reuses = 0

    def __call__(self, x):
        r = self.num_reuses
        # mock 计算重的操作
        for i in range(1000):
            for j in range(1000):
                r += i * j
        time.sleep(1.5)
        self.num_reuses += 1
        return {"id": np.array([r])}


class StatefulFn4:
    def __init__(self):
        self.num_reuses = 0

    def __call__(self, x):
        r = self.num_reuses
        time.sleep(1.5)
        self.num_reuses += 1
        return {"id": np.array([r])}


min_concurrency = 1

max_concurrency = 100

def test_case(shutdown_only):
    ray.init(num_cpus=100)
    ts = time.time()

    n = 10
    ds = ray.data.range(n)

    actor_reuse = ds.map(StatefulFn1, num_cpus=1, concurrency=(min_concurrency, max_concurrency)).map_batches(
        StatefulFn2, num_cpus=2,
        batch_size=1,
        concurrency=(min_concurrency,
                     max_concurrency)).map(
        StatefulFn3,
        num_cpus=1,
        concurrency=(
            min_concurrency,
            max_concurrency)).map_batches(
        StatefulFn4, num_cpus=2, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map(StatefulFn2,
                                                                                                   num_cpus=1,
                                                                                                   concurrency=(
                                                                                                       min_concurrency,
                                                                                                       max_concurrency)).map(
        StatefulFn3,
        num_cpus=1,
        concurrency=(
            min_concurrency, max_concurrency)).map_batches(
        StatefulFn1, num_cpus=2, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map_batches(StatefulFn2,
                                                                                                           num_cpus=1,
                                                                                                           batch_size=1,
                                                                                                           concurrency=(
                                                                                                               min_concurrency,
                                                                                                               max_concurrency)).map_batches(
        StatefulFn3, num_cpus=4, batch_size=1, concurrency=(min_concurrency, max_concurrency)).map(StatefulFn1,
                                                                                                   num_cpus=1,
                                                                                                   concurrency=(
                                                                                                       min_concurrency,
                                                                                                       max_concurrency)).map(
        StatefulFn2,
        num_cpus=1,
        concurrency=(
            min_concurrency, max_concurrency)).map_batches(
        StatefulFn3, num_cpus=1, batch_size=1, concurrency=(min_concurrency, max_concurrency)).take_all()
    print(f"cost time {time.time() - ts} seconds")
    print(actor_reuse.stats())

Issue Severity

High: It blocks me from completing my task.

njumzs avatar Jun 28 '24 07:06 njumzs