P2P shuffle is slow with string dtypes
import coiled
import dask.dataframe as dd
from dask.distributed import wait
cluster = coiled.Cluster(
n_workers=30,
worker_cpu=4,
region="us-east-2", # start workers close to data to minimize costs
arm=True,
)
client = cluster.get_client()
# this takes 1m21s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="tasks").persist()
_ = wait(df)
# this takes 2m12s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="p2p").persist()
_ = wait(df)
GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.
cc @hendrikmakait @jrbourbeau
I'm wondering how to address this best. In your example, the string columns are of type string[python], so converting back to that type feels like the right thing to do for me even if it's overly expensive. Maybe converting string[python] to string[arrow] during a P2P shuffle if dataframe.convert-string is set and raising a warning of we encounter a string[python] column and it's not set would be the right way?
Sorry, I had the config option set. Dtypes coming in were string[pyarrow].
dask.config.set({"dataframe.convert-string": True}) # use PyArrow strings by default
Brief update about this in case somebody stumbled over it.
with current main we're mostly at the same performance as tasks. However, when disabling disk (dask.config.set({"distributed.p2p.disk": True})) entirely we're at 10s so disk appears to slow us down much more than one would naively assume (this is trying to be addressed in https://github.com/dask/distributed/pull/8323)
| Method | Duration |
|---|---|
| tasks | 1min 8s |
| p2p | 1min 15s |
| p2p (w/out disk) | 10s |