dask icon indicating copy to clipboard operation
dask copied to clipboard

querying df.compute(concatenate=True)

Open martindurant opened this issue 9 months ago • 8 comments

https://github.com/dask/dask-expr/pull/1138 introduced the concatenate kwargs to dask-dataframe compute operations, and defaulted to True (a change in behaviour). This is now the default in core dask following the merger of expr into the main repo.

I am concerned that the linked PR did not provide any rationale for the change, nor document under what circumstances it should not be used.

        Concatenating enables more powerful optimizations but it also incurs additional
        data transfer cost. Generally, it should be enabled.

I suggest the following contraindications:

  • worker memory limits are generally much more strict than in the client, so concatenating in-cluster can crash the specific worker and make the workflow unrunnable
  • the concatenation task cannot begin until all of its inputs are ready, whereas the client can download each partition as it completes, so in the straggler case, concatenate=True will tend to be slower

I can see the option being useful in the case that:

  • there are a large number of small partitions in the output, and we expect the inter-worker latency to be much more favourable than the client-worker latency

I can see the option making no difference in the case that:

  • the number of partitions is small compared to the total volume of data in the output, but there is no worker memory issue

cf https://github.com/dask/community/issues/411

martindurant avatar Feb 20 '25 19:02 martindurant

This is a bit more complex topic than the community issue makes it look like. Right now, the behavior is one of three different kinds depending on what API and collection type you are using.

  1. If you're using the method, DataFrame.compute the behavior is as you are describing. It concatenates results and is fetching it remotely. This behavior is unique to DataFrames since other collections are just falling back to 2.)
  2. If you're using the top-level function dask.compute it is skipping expressions optimizations entirely and is fetching individual partitions and concatenates them on the client
  3. If you're using Client.compute it also skips the expressions optimization but is concatenating results on the cluster.

Fixing this goes way beyond a keyword argument. I'm currently working on https://github.com/dask/dask/pull/11736 which will fix all of this but my current approach will hardcode this to concatenation on the cluster. The reason for this is that it is actually much easier to implement consistently. I'm open to adding this feature back in, in a follow up PR but for now I care much more about consistency (and the HLG/expr pickle change that is the actual intention of that PR).

Below a couple of comments about your assumptions


worker memory limits are generally much more strict than in the client, so concatenating in-cluster can crash the specific worker and make the workflow unrunnable

I doubt this is true. My notebook has 16GB of RAM (M1 Mac Book) and I rarely run computations on machines with less. I think this isn't untypical. I also see Coiled users rarely running on smaller workers.

Besides, the default behavior of clients is to fetch data via the scheduler (and it's not trivial to change that for most users), i.e. in most situations where the memory on a single worker is not sufficient, the most likely outcome is that the scheduler runs OOM before the client. The scheduler dying is pretty much the worst situation because we cannot provide proper exceptions to the users in this case. However, if the concatentation happens on the worker, the scheduler is currently able to abort the finalize task and is raising a proper exception if we know that the data doesn't fit.

The only marginal benefit from client side concatenation is that we safe ourselves one cluster internal network hop but most applications will not care about this.

the concatenation task cannot begin until all of its inputs are ready, whereas the client can download each partition as it completes, so in the straggler case, concatenate=True will tend to be slower

While technically true, this behavior isn't implemented. The only argument for speed that is currently true is that there is one more cluster internal network bump.

fjetter avatar Feb 25 '25 12:02 fjetter

TLDR with https://github.com/dask/dask/pull/11736 I am proposing to hard code this to "always concat on cluster"

fjetter avatar Feb 25 '25 12:02 fjetter

I believe that the following proves that the client is able to download partitions as they become available, at least via delayed. Note that concatenate=True makes no difference for this flow, perhaps because of the use of delayed

client = Client()

class deserme:
    def __reduce__(self):
        import time
        import os
        print(os.getppid(), time.time(), "deser!")
        return (lambda: 1), ()

def part():
    import time
    time.sleep(random.randint(1, 5))
    return pd.DataFrame({"a": [deserme()]})

df = dd.from_delayed([dask.delayed(part, pure=False)() for _ in range(10)])
df.compute(concatenate=False);

# produces
99299 1740494986.1009972 deser!
99299 1740494986.101203 deser!
99299 1740494986.1013372 deser!
99299 1740494986.102061 deser!
99299 1740494986.102284 deser!
99299 1740494986.102438 deser!
99299 1740494987.1030989 deser!
99299 1740494987.103334 deser!
99299 1740494987.1034791 deser!

(note that some objects arrive before others)

martindurant avatar Feb 25 '25 16:02 martindurant

Your code proves that the individual objects are serialized sequentially and that they are all running on the same process. It doesn't tell us anything about when or how the objects were transferred or deserialized.

When using a distributed client, compute is going through Client.get which goes through Client._gather which blocks until all futures are ready, see https://github.com/dask/distributed/blob/fe5e4311cd91c097f2e4696dfbd1fa2ef47460ff/distributed/client.py#L2406-L2412

fjetter avatar Feb 25 '25 16:02 fjetter

For future reference: There are also several cases where optimization is fundamentally altered when adding a concat step at the very end. Simple example

>>> from dask.datasets import timeseries

# Clear divisions is not necessary but nice for illustration
>>> ddf = timeseries().clear_divisions()

>>> ddf.sort_values("name").optimize().pprint()

# Note how this requires a shuffle (since the divisions are not known)

Fused(5a05d):
| SortValuesBlockwise: sort_function=<methodcaller: sort_values> sort_kwargs={'by': ['name'], 'ascending': True, 'na_position': 'last', 'ignore_index': False}
|   Projection: columns=['name', 'id', 'x', 'y']
      DiskShuffle: npartitions_out=26 options={}
        RepartitionToFewer: new_partitions=26
          Fused(47f4e):
          | Assign: _partitions=
          |   _SetPartitionsPreSetIndex: new_divisions='<pandas>'
          |     Projection: columns='name'
          |       ClearDivisions:
          |         ArrowStringConversion:
          |           Timeseries: dtypes={'name': <class 'str'>, 'id': <class 'int'>, 'x': <class 'float'>, 'y': <class 'float'>} partition_freq='1D' seed=982806819 columns=['name', 'id', 'x', 'y']


# If we know that the results end up in one partition anyhow we can concat
# before sorting which is **much** cheaper

>>> ddf.sort_values("name").repartition(npartitions=1).optimize().pprint()

SortValuesBlockwise: sort_function=<methodcaller: sort_values> sort_kwargs={'by': ['name'], 'ascending': True, 'na_position': 'last', 'ignore_index': False}
  RepartitionToFewer: new_partitions=1
    Fused(aced6):
    | ClearDivisions:
    |   ArrowStringConversion:
    |     Timeseries: dtypes={'name': <class 'str'>, 'id': <class 'int'>, 'x': <class 'float'>, 'y': <class 'float'>} partition_freq='1D' seed=982806819 columns=['name', 'id', 'x', 'y']

Therefore, to fetch individual partitions we'd also have to move the sort to the client side or rather we'd have to build the optimizer in a way to return a "cluster expression" and a "client expression" which is something I don't consider in scope

fjetter avatar Feb 26 '25 17:02 fjetter

You are right that:

  • my code didn't prove anything
  • an edited version showed that the client does not download partitions as they become available, except by using something like distributed.as_completed(client.compute(df.to_delayed())). I am surprised by this. (I can provide altered code, if interested)
  • you have provided one scenario where concat makes sense, when the last operation is a shuffle to one partition; are there others?

martindurant avatar Feb 26 '25 18:02 martindurant

Is it the case that this is now also in force for arrays

martindurant avatar May 02 '25 13:05 martindurant

See https://github.com/dask/dask/issues/11768#issuecomment-2681904615 and https://docs.dask.org/en/stable/changelog.html#breaking-changes

fjetter avatar May 05 '25 06:05 fjetter