Integrate `BatchCoalescer` into `RepartitionExec` and remove from `CoalesceBatches` optimization rule
Is your feature request related to a problem or challenge?
- Part of https://github.com/apache/datafusion/issues/18779
The LimitedBatchCoalescer can now be used to integrate batch coalescing directly in ExecutionPlan implementations, which is better than using an external operator for the reasons described on https://github.com/apache/datafusion/issues/18779
Currently RepartitionExec needs a CoalesceBatches operator after for performance, see https://github.com/apache/datafusion/blob/bd30fe2375318d13fc5b66e06e53da63ca4d0928/datafusion/physical-optimizer/src/coalesce_batches.rs#L64
We should integrate coalescing directly in the operator
Describe the solution you'd like
- Add batch coalescing into RepartitionExec
- Remove RepartitionExec from the
CoalesceBatchesoptimizer rule: https://github.com/apache/datafusion/blob/bd30fe2375318d13fc5b66e06e53da63ca4d0928/datafusion/physical-optimizer/src/coalesce_batches.rs#L64
Describe alternatives you've considered
@Dandandan did this for FilterExec in these two PRs, so we can probably follow a similar model
- https://github.com/apache/datafusion/pull/18604
- https://github.com/apache/datafusion/pull/18630
Additional context
No response
Not sure about this one, closing for now
take
Hi @alamb just double checking, I noticed this comment https://github.com/apache/datafusion/pull/18630/files#r2526799488, but regardless we still want to integrate to remove dependency on CoalescerBatchesExec? Thanks!
Hi @alamb just double checking, I noticed this comment https://github.com/apache/datafusion/pull/18630/files#r2526799488, but regardless we still want to integrate to remove dependency on
CoalescerBatchesExec? Thanks!
If you mean
Yes - I tried to apply this optimization to RepartitionExec but unfortunately got some regressions (my feeling is due to some decreased parallelism / work distribution).
Yes, I guess I am hoping that someone could figure out what was going on / optimize the code so that we could combine them
Hi @alamb wanted to check my understanding and discuss the approach a little bit. Please let me know your thoughts and correct me if anything sounds off.
I thought about two places where we could potentially coalesce batches:
- Coalesce hash-partitioned input batches for the same output partition in an input stream before sending to output channel, e.g. coalesce in
RepartitionExec::pull_from_inputbeforechannel.sender.send.- For order preserving case, this would result in larger input batches for merge-sort, and as merge-sort needs to wait for batches from all input partition streams to be ready, I wonder if this would impact performance.
- Coalesce batches in output stream when
poll_nextis called, e.g.- For non order-preserving case, output stream is a
PerPartitionStream; for order-preserving case, output stream is aSortPreservingMergeStreamwhose input is vector ofPerPartitionStreams. - If coalescing in e.g.
PerPartitionStream::poll_next_inner, it also means larger input batches for merge-sort. - Alternatively, could create a wrapper stream around
PerPartitionStreamthat coalesces while callingPerPartitionStream::poll_next. This would look very much like aCoalesceBatchesbut not as a separate node. - For order preserving case, maybe coalesce inside
SortPreservingMergeStream::poll_next_inner.
- For non order-preserving case, output stream is a
Thanks a lot!
Coalesce batches in output stream when poll_next is called, e.g.
This would be my personal suggestion as a place to start because:
- It (should) be the same effect as running
CoalesceBatchesinRepartitionExec. (if you coalesce on input it may affect the execution order and affect performance in unexpected ways) - It sounds relatively simple
If coalescing in e.g. PerPartitionStream::poll_next_inner, it also means larger input batches for merge-sort.
This is a good thing I think
I don't have any great suggestion about if a new stream would be better than integrating the coalescer into the existing streams -- i think we would have to try them both out and see which looked best
Thank you for working on this @jizezhang 🙏
Hi @alamb, I wonder if I may confirm the behavior of one test https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L267 with you wrt integrating with BatchCoalescer.
I noticed that this test runs two versions of the same query, one with Dataframe API and one with SessionContext::sql. The logical plans resulting from the two are slightly different, in particular the projection part:
- With DataFrame API
[2025-11-22T21:29:47Z DEBUG datafusion_optimizer::utils] Final optimized plan: Aggregate: groupBy=[[]], aggr=[[count(?table?.flag)]] TableScan: ?table? projection=[flag], full_filters=[?table?.flag = Int32(0)] - With
sql,[2025-11-22T21:31:15Z DEBUG datafusion_optimizer::utils] Final optimized plan: Projection: count(Int64(1)) AS count(*) Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] TableScan: data projection=[], full_filters=[data.flag = Int32(0)]
The SessionContext::sql version had an issue when using arrow BatchCoalescer kernel via LimitedBatchCoalescer. The reason (I think) is that the custom table provider CustomProvider in the test has a branching logic on schema depending on projection https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L208-L210
thus the resulting physical plans differ in schema:
- With DataFrame API,
AggregateExec: mode=Final, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag):Int64] CoalescePartitionsExec, schema=[count(?table?.flag)[count]:Int64] AggregateExec: mode=Partial, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag)[count]:Int64] RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[flag:Int32] CooperativeExec, schema=[flag:Int32] CustomPlan: batch_size=1, schema=[flag:Int32] - With
sql,ProjectionExec: expr=[count(Int64(1))@0 as count(*)], schema=[count(*):Int64] AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1)):Int64] CoalescePartitionsExec, schema=[count(Int64(1))[count]:Int64] AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1))[count]:Int64] RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[] CooperativeExec, schema=[] CustomPlan: batch_size=1, schema=[]
However, the batches returned by the associated custom execution plan CustomPlan is always full (not projected) https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L139
Arrow BatchCoalescer checks that the provided schema to the coalescer matches actual batch schema https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce.rs#L428, thus the SessionContext::sql version would panic.
The issue went away when I modified CustomPlan::execute to return projected batches, but I wanted to check whether the behavior is expected and whether the test should be updated, or otherwise what approaches to take. Thanks a lot!
Actually the behavior of one other test test_preserve_order_with_spilling may probably be also affected by this change. When reserving memory for an array, e.g. here https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/physical-plan/src/sorts/stream.rs#L247-L250 the buffer size is computed using capacity for primitive arrays https://github.com/apache/arrow-rs/blob/a8a63c28d14b99d8f50b32f3184ab986bad15e50/arrow-array/src/array/primitive_array.rs#L1242.
When arrow BatchCoalescer coalesces batches, it copies rows from batches to the internal in_progress_arrays, and for InProgressPrimitiveArray, this involves an ensure_capacity call that takes batch_size https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce/primitive.rs#L58. If using default batch size of 8192, 64B is not enough for this test. I plan to adjust the batch size but wanted to mention here just in case this does not make sense. Thanks.
The issue went away when I modified CustomPlan::execute to return projected batches, but I wanted to check whether the behavior is expected and whether the test should be updated, or otherwise what approaches to take. Thanks a lot!
I don't fully understand the case you laid out -- I think it would be easier to comment if / when you have code / tests changes
What I would suggest for this issue is a two part change:
- The PR that adds repartitioning to the RepartitionExec (but don't change the optimization rule)
- A second PR that removes RepartitionExec from the optimizer rule
That way we can better isolate the effects of different changes
Hi @alamb, I was running benchmarks with my PR https://github.com/apache/datafusion/pull/19002 (and some variations for comparison), though it seemed that the results somewhat varied a bit across runs, e.g. even with the same main branch, one query can be shown as faster in one run and slower in the next. I just wonder if there are any tips on running benchmarks or interpreting the results, and/or any requirements on the instance itself (e.g. number of cores, memory etc.). Thanks a lot!
Hi @alamb, I was running benchmarks with my PR #19002 (and some variations for comparison), though it seemed that the results somewhat varied a bit across runs, e.g. even with the same main branch, one query can be shown as faster in one run and slower in the next. I just wonder if there are any tips on running benchmarks or interpreting the results, and/or any requirements on the instance itself (e.g. number of cores, memory etc.). Thanks a lot!
Yes I agree there is some variation. I haven't found a good way to handle this other than to make sure the results are reproducable run to run (aka if a performance difference is detected, try rerunning the same benchmarks and see if the same difference is seen)
It would be great to have a more stable benchmarking environment, but I think that would require a more isolated environment than I have available (either a GCP machine or my laptop)
@alamb I put out a second PR https://github.com/apache/datafusion/pull/19239 to remove RepartitionExec from the optimizer rule. Please take a look when you get a chance. Thank you.
Awesome --thank you @jizezhang
It seems like after https://github.com/apache/datafusion/pull/19239 all we have left now is to integrate BatchCoalescer into the AsyncFuncExec and we really could remove the CoalesceBatches optimization rule 🤔
It seems like after #19239 all we have left now is to integrate
BatchCoalescerinto theAsyncFuncExecand we really could remove theCoalesceBatchesoptimization rule 🤔
Yes, is someone looking into that? or if not yet I can take a stab.
It seems like after #19239 all we have left now is to integrate
BatchCoalescerinto theAsyncFuncExecand we really could remove theCoalesceBatchesoptimization rule 🤔Yes, is someone looking into that? or if not yet I can take a stab.
No one I know if is looking at it.
THank you 🙏 I filed this ticket to track the work:
- https://github.com/apache/datafusion/issues/19331