dask-sql icon indicating copy to clipboard operation
dask-sql copied to clipboard

[ENH] Track data already partitioned/shuffled by a column

Open ayushdg opened this issue 2 years ago • 0 comments

Is your feature request related to a problem? Please describe. In a variety of sql applications the dataset is already pre-partitioned by a given key (esp for parquet and orc) and many operations like shuffle based joins, shuffle the dataset by a join key. In this situations it would be good to track when a df is already partitioned and use fast paths for operations like joins/groupby's if applicable.

Describe the solution you'd like Add some sort of df partition info to the DataContainer object that can also be set by the user during context.create_table. This attribute can be set on the key column when a groupby or shuffle based join operation is performed. We can also have fast paths for map_partitions groupby/sort if the key columns are already partitioned in the input df. And a custom inner merge implementation that does a map_partitions merge without shuffling if the join key is already partitioned on the input and npartitions for this input is greater than npartitions of the table we are joining with.

Describe alternatives you've considered One alternative to implementing this in dask-sql is have dask dataframes implement this tracking mechanism directly on dask dataframes

Additional context Should significantly improve perf in situations where parquet datasets are already partitioned by a specific key column that users want to join on in the case of a shuffle based join. Should also improve perf for situations like SELECT * from a, b, c WHERE a.key=b.key1 and a.key=c.key1 by avoiding an intermediate shuffle on the output of aXb

ayushdg avatar Dec 14 '22 14:12 ayushdg