distributed icon indicating copy to clipboard operation
distributed copied to clipboard

P2P shuffle is slow with string dtypes

Open mrocklin opened this issue 2 years ago • 4 comments

import coiled
import dask.dataframe as dd
from dask.distributed import wait

cluster = coiled.Cluster(
    n_workers=30,
    worker_cpu=4,
    region="us-east-2",  # start workers close to data to minimize costs
    arm=True,
)

client = cluster.get_client()
# this takes 1m21s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="tasks").persist()
_ = wait(df)
# this takes 2m12s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="p2p").persist()
_ = wait(df)

GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.

cc @hendrikmakait @jrbourbeau

mrocklin avatar Jun 02 '23 20:06 mrocklin

I'm wondering how to address this best. In your example, the string columns are of type string[python], so converting back to that type feels like the right thing to do for me even if it's overly expensive. Maybe converting string[python] to string[arrow] during a P2P shuffle if dataframe.convert-string is set and raising a warning of we encounter a string[python] column and it's not set would be the right way?

hendrikmakait avatar Jun 05 '23 17:06 hendrikmakait

Sorry, I had the config option set. Dtypes coming in were string[pyarrow].

mrocklin avatar Jun 05 '23 20:06 mrocklin

dask.config.set({"dataframe.convert-string": True})  # use PyArrow strings by default

mrocklin avatar Jun 05 '23 20:06 mrocklin

Brief update about this in case somebody stumbled over it.

with current main we're mostly at the same performance as tasks. However, when disabling disk (dask.config.set({"distributed.p2p.disk": True})) entirely we're at 10s so disk appears to slow us down much more than one would naively assume (this is trying to be addressed in https://github.com/dask/distributed/pull/8323)

Method Duration
tasks 1min 8s
p2p 1min 15s
p2p (w/out disk) 10s

fjetter avatar Nov 16 '23 17:11 fjetter