Implement a way to preserve partitioning through `UnionExec` without losing ordering
Is your feature request related to a problem or challenge?
The EnforceDistribution physical optimizer pass in DataFusion in some cases will introduce InterleaveExec to increase partitioning when data passes through a UnionExec:
https://github.com/apache/datafusion/blob/22311835bc1b4bd83b50e1c3875b0e725622b872/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L1196-L1226
Here is what InterleaveExec does: https://github.com/apache/datafusion/blob/4edbdd7d09d97f361748c086afbd7b3dda972f76/datafusion/physical-plan/src/union.rs#L286-L317
However, this has the potential downside of destroying and pre-existing ordering which is sometimes preferable than increasing / improving partitionining (e.g. see https://github.com/apache/datafusion/issues/10257 and datafusion.optimizer.prefer_existing_sort setting)
Describe the solution you'd like
I would like there to be some way to preserve the partitioning after a UnionExec without losing the ordering information and then remove the prefer_existing_union flag
Describe alternatives you've considered
One possibility is to add a preserve_order flag to InterleaveExec the same way as RepartitionExec has a preserve_order flag: https://github.com/apache/datafusion/blob/4edbdd7d09d97f361748c086afbd7b3dda972f76/datafusion/physical-plan/src/repartition/mod.rs#L328-L417
Additional context
We encountered this while working on https://github.com/apache/datafusion/pull/10259 @mustafasrepo and @phillipleblanc pointed out that config flag prefer_existing_union was effectively the same as prefer_existing_sort
Hi @alamb, I am trying to work on this.
I am not very familiar on the InterleaveExec in the optimizer. As initial thought, the interleaveExec is acting as a Repartition with equal number of input partitions and output partitions and thus a nature idea is to reuse streaming_merge with respect to the input size. Wdyt?
Hi @alamb, I am trying to work on this.
I am not very familiar on the
InterleaveExecin the optimizer. As initial thought, the interleaveExec is acting as a Repartition with equal number of input partitions and output partitions and thus a nature idea is to reusestreaming_mergewith respect to the input size. Wdyt?
Hi @xinlifoobar -- this sounds like it is on the right track
Hi @alamb, found another interesting case while testing. I am not very sure, do you think this could apply InterleaveExec with same order by sets?
explain select count(*) from ((select distinct c1, c2 from t3 order by c1 ) union all (select distinct c1, c2 from t4 order by c1)) group by cube(c1,c2);
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: COUNT(*) |
| | Aggregate: groupBy=[[CUBE (t3.c1, t3.c2)]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] |
| | Union |
| | Sort: t3.c1 ASC NULLS LAST |
| | Aggregate: groupBy=[[t3.c1, t3.c2]], aggr=[[]] |
| | TableScan: t3 projection=[c1, c2] |
| | Sort: t4.c1 ASC NULLS LAST |
| | Aggregate: groupBy=[[t4.c1, t4.c2]], aggr=[[]] |
| | TableScan: t4 projection=[c1, c2] |
| physical_plan | ProjectionExec: expr=[COUNT(*)@2 as COUNT(*)] |
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[COUNT(*)], ordering_mode=PartiallySorted([0]) |
| | SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=14 |
| | RepartitionExec: partitioning=RoundRobinBatch(14), input_partitions=2 |
| | AggregateExec: mode=Partial, gby=[(c1@0 as c1, c2@1 as c2), (NULL as c1, c2@1 as c2), (c1@0 as c1, NULL as c2), (NULL as c1, NULL as c2)], aggr=[COUNT(*)] |
| | UnionExec |
| | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as c2], aggr=[] |
| | MemoryExec: partitions=1, partition_sizes=[0] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as c2], aggr=[] |
| | MemoryExec: partitions=1, partition_sizes=[0] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
With InterleaveExec:
ProjectionExec:
AggregateExec:
InterleaveExec:
SortExec:
AggregateExec:
SortExec:
AggregateExec: