[ActorPoolProject] Pipeline of multiple actor pool projects throttles later stages if earlier stages have low concurrency
Describe the bug
When running the following code:
import daft
import os
import time
@daft.udf(return_dtype=daft.DataType.string())
class MySlowUdf:
def __init__(self):
print(f"I am process ({os.getpid()}), initializing...")
time.sleep(10)
print(f"I am process ({os.getpid()}), initialized!")
self.prefix = "Hello"
def __call__(self, data):
time.sleep(60)
return [f"{self.prefix} {d}" for d in data.to_pylist()]
@daft.udf(return_dtype=daft.DataType.string())
class MyFastUdf:
def __init__(self):
pass
def __call__(self, data):
time.sleep(1)
return data
MySlowUdfWithReplicas = MySlowUdf.with_concurrency(4)
MyFastUdfWithReplicas = MyFastUdf.with_concurrency(1)
df = daft.from_pydict({"data": ["bob", "alice", "micky", "jack", "jay", "sammy", "kevin"]})
df = df.repartition(7)
df = df.with_columns({"a": MyFastUdfWithReplicas(df["data"]), "b": MyFastUdfWithReplicas(df["data"]), "c": MySlowUdfWithReplicas(df["data"]), "d": MyFastUdfWithReplicas(df["data"])})
We notice that the stage that runs MySlowUdfWithReplicas only runs 2 tasks in parallel. It is likely being "throttled" by the earlier "MyFastUdfWithReplicas" stages because if we do MyFastUdfWithReplicas = MyFastUdf.with_concurrency(4) instead, we do not observe this same behavior.
My guess is that this has something to do with the fact that we cap the maximum size of the child buffer to be 2 * concurrency: https://github.com/Eventual-Inc/Daft/blob/main/daft/execution/physical_plan.py#L219
We can maybe try disabling that?
Also make sure to check the behavior if we do .show() and ensure that that doesn't materialize a ton of children to just show a small set of rows