datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Integrate `BatchCoalescer` into `RepartitionExec` and remove from `CoalesceBatches` optimization rule

Open alamb opened this issue 1 month ago • 13 comments

Is your feature request related to a problem or challenge?

  • Part of https://github.com/apache/datafusion/issues/18779

The LimitedBatchCoalescer can now be used to integrate batch coalescing directly in ExecutionPlan implementations, which is better than using an external operator for the reasons described on https://github.com/apache/datafusion/issues/18779

Currently RepartitionExec needs a CoalesceBatches operator after for performance, see https://github.com/apache/datafusion/blob/bd30fe2375318d13fc5b66e06e53da63ca4d0928/datafusion/physical-optimizer/src/coalesce_batches.rs#L64

We should integrate coalescing directly in the operator

Describe the solution you'd like

  1. Add batch coalescing into RepartitionExec
  2. Remove RepartitionExec from the CoalesceBatches optimizer rule: https://github.com/apache/datafusion/blob/bd30fe2375318d13fc5b66e06e53da63ca4d0928/datafusion/physical-optimizer/src/coalesce_batches.rs#L64

Describe alternatives you've considered

@Dandandan did this for FilterExec in these two PRs, so we can probably follow a similar model

  • https://github.com/apache/datafusion/pull/18604
  • https://github.com/apache/datafusion/pull/18630

Additional context

No response

alamb avatar Nov 17 '25 18:11 alamb

Not sure about this one, closing for now

alamb avatar Nov 17 '25 18:11 alamb

take

jizezhang avatar Nov 18 '25 08:11 jizezhang

Hi @alamb just double checking, I noticed this comment https://github.com/apache/datafusion/pull/18630/files#r2526799488, but regardless we still want to integrate to remove dependency on CoalescerBatchesExec? Thanks!

jizezhang avatar Nov 19 '25 07:11 jizezhang

Hi @alamb just double checking, I noticed this comment https://github.com/apache/datafusion/pull/18630/files#r2526799488, but regardless we still want to integrate to remove dependency on CoalescerBatchesExec? Thanks!

If you mean

Yes - I tried to apply this optimization to RepartitionExec but unfortunately got some regressions (my feeling is due to some decreased parallelism / work distribution).

Yes, I guess I am hoping that someone could figure out what was going on / optimize the code so that we could combine them

alamb avatar Nov 19 '25 18:11 alamb

Hi @alamb wanted to check my understanding and discuss the approach a little bit. Please let me know your thoughts and correct me if anything sounds off.

I thought about two places where we could potentially coalesce batches:

  • Coalesce hash-partitioned input batches for the same output partition in an input stream before sending to output channel, e.g. coalesce in RepartitionExec::pull_from_input before channel.sender.send.
    • For order preserving case, this would result in larger input batches for merge-sort, and as merge-sort needs to wait for batches from all input partition streams to be ready, I wonder if this would impact performance.
  • Coalesce batches in output stream when poll_next is called, e.g.
    • For non order-preserving case, output stream is a PerPartitionStream; for order-preserving case, output stream is a SortPreservingMergeStream whose input is vector of PerPartitionStreams.
    • If coalescing in e.g. PerPartitionStream::poll_next_inner, it also means larger input batches for merge-sort.
    • Alternatively, could create a wrapper stream around PerPartitionStream that coalesces while calling PerPartitionStream::poll_next. This would look very much like a CoalesceBatches but not as a separate node.
    • For order preserving case, maybe coalesce inside SortPreservingMergeStream::poll_next_inner.

Thanks a lot!

jizezhang avatar Nov 20 '25 17:11 jizezhang

Coalesce batches in output stream when poll_next is called, e.g.

This would be my personal suggestion as a place to start because:

  1. It (should) be the same effect as running CoalesceBatches in RepartitionExec . (if you coalesce on input it may affect the execution order and affect performance in unexpected ways)
  2. It sounds relatively simple

If coalescing in e.g. PerPartitionStream::poll_next_inner, it also means larger input batches for merge-sort.

This is a good thing I think

I don't have any great suggestion about if a new stream would be better than integrating the coalescer into the existing streams -- i think we would have to try them both out and see which looked best

Thank you for working on this @jizezhang 🙏

alamb avatar Nov 21 '25 15:11 alamb

Hi @alamb, I wonder if I may confirm the behavior of one test https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L267 with you wrt integrating with BatchCoalescer.

I noticed that this test runs two versions of the same query, one with Dataframe API and one with SessionContext::sql. The logical plans resulting from the two are slightly different, in particular the projection part:

  • With DataFrame API
    [2025-11-22T21:29:47Z DEBUG datafusion_optimizer::utils] Final optimized plan:
      Aggregate: groupBy=[[]], aggr=[[count(?table?.flag)]]
        TableScan: ?table? projection=[flag], full_filters=[?table?.flag = Int32(0)]
    
  • With sql,
    [2025-11-22T21:31:15Z DEBUG datafusion_optimizer::utils] Final optimized plan:
      Projection: count(Int64(1)) AS count(*)
        Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
          TableScan: data projection=[], full_filters=[data.flag = Int32(0)]
    

The SessionContext::sql version had an issue when using arrow BatchCoalescer kernel via LimitedBatchCoalescer. The reason (I think) is that the custom table provider CustomProvider in the test has a branching logic on schema depending on projection https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L208-L210 thus the resulting physical plans differ in schema:

  • With DataFrame API,
    AggregateExec: mode=Final, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag):Int64]
    CoalescePartitionsExec, schema=[count(?table?.flag)[count]:Int64]
      AggregateExec: mode=Partial, gby=[], aggr=[count(?table?.flag)], schema=[count(?table?.flag)[count]:Int64]
        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[flag:Int32]
          CooperativeExec, schema=[flag:Int32]
            CustomPlan: batch_size=1, schema=[flag:Int32]
    
  • With sql,
    ProjectionExec: expr=[count(Int64(1))@0 as count(*)], schema=[count(*):Int64]
    AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1)):Int64]
      CoalescePartitionsExec, schema=[count(Int64(1))[count]:Int64]
        AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], schema=[count(Int64(1))[count]:Int64]
          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, schema=[]
            CooperativeExec, schema=[]
              CustomPlan: batch_size=1, schema=[]
    

However, the batches returned by the associated custom execution plan CustomPlan is always full (not projected) https://github.com/apache/datafusion/blob/fc77be94570e3ada7e28db8c5412125f54e0b96d/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs#L139

Arrow BatchCoalescer checks that the provided schema to the coalescer matches actual batch schema https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce.rs#L428, thus the SessionContext::sql version would panic.

The issue went away when I modified CustomPlan::execute to return projected batches, but I wanted to check whether the behavior is expected and whether the test should be updated, or otherwise what approaches to take. Thanks a lot!

jizezhang avatar Nov 23 '25 18:11 jizezhang

Actually the behavior of one other test test_preserve_order_with_spilling may probably be also affected by this change. When reserving memory for an array, e.g. here https://github.com/apache/datafusion/blob/d24eb4a23156b7814836e765d5890186ab40682f/datafusion/physical-plan/src/sorts/stream.rs#L247-L250 the buffer size is computed using capacity for primitive arrays https://github.com/apache/arrow-rs/blob/a8a63c28d14b99d8f50b32f3184ab986bad15e50/arrow-array/src/array/primitive_array.rs#L1242.

When arrow BatchCoalescer coalesces batches, it copies rows from batches to the internal in_progress_arrays, and for InProgressPrimitiveArray, this involves an ensure_capacity call that takes batch_size https://github.com/apache/arrow-rs/blob/a67d49758b1faee7d42fe3b215e226d6d560f237/arrow-select/src/coalesce/primitive.rs#L58. If using default batch size of 8192, 64B is not enough for this test. I plan to adjust the batch size but wanted to mention here just in case this does not make sense. Thanks.

jizezhang avatar Nov 25 '25 07:11 jizezhang

The issue went away when I modified CustomPlan::execute to return projected batches, but I wanted to check whether the behavior is expected and whether the test should be updated, or otherwise what approaches to take. Thanks a lot!

I don't fully understand the case you laid out -- I think it would be easier to comment if / when you have code / tests changes

alamb avatar Nov 25 '25 14:11 alamb

What I would suggest for this issue is a two part change:

  1. The PR that adds repartitioning to the RepartitionExec (but don't change the optimization rule)
  2. A second PR that removes RepartitionExec from the optimizer rule

That way we can better isolate the effects of different changes

alamb avatar Nov 25 '25 14:11 alamb

Hi @alamb, I was running benchmarks with my PR https://github.com/apache/datafusion/pull/19002 (and some variations for comparison), though it seemed that the results somewhat varied a bit across runs, e.g. even with the same main branch, one query can be shown as faster in one run and slower in the next. I just wonder if there are any tips on running benchmarks or interpreting the results, and/or any requirements on the instance itself (e.g. number of cores, memory etc.). Thanks a lot!

jizezhang avatar Dec 01 '25 05:12 jizezhang

Hi @alamb, I was running benchmarks with my PR #19002 (and some variations for comparison), though it seemed that the results somewhat varied a bit across runs, e.g. even with the same main branch, one query can be shown as faster in one run and slower in the next. I just wonder if there are any tips on running benchmarks or interpreting the results, and/or any requirements on the instance itself (e.g. number of cores, memory etc.). Thanks a lot!

Yes I agree there is some variation. I haven't found a good way to handle this other than to make sure the results are reproducable run to run (aka if a performance difference is detected, try rerunning the same benchmarks and see if the same difference is seen)

It would be great to have a more stable benchmarking environment, but I think that would require a more isolated environment than I have available (either a GCP machine or my laptop)

alamb avatar Dec 01 '25 19:12 alamb

@alamb I put out a second PR https://github.com/apache/datafusion/pull/19239 to remove RepartitionExec from the optimizer rule. Please take a look when you get a chance. Thank you.

jizezhang avatar Dec 10 '25 17:12 jizezhang

Awesome --thank you @jizezhang

alamb avatar Dec 11 '25 22:12 alamb

It seems like after https://github.com/apache/datafusion/pull/19239 all we have left now is to integrate BatchCoalescer into the AsyncFuncExec and we really could remove the CoalesceBatches optimization rule 🤔

alamb avatar Dec 14 '25 11:12 alamb

It seems like after #19239 all we have left now is to integrate BatchCoalescer into the AsyncFuncExec and we really could remove the CoalesceBatches optimization rule 🤔

Yes, is someone looking into that? or if not yet I can take a stab.

jizezhang avatar Dec 14 '25 16:12 jizezhang

It seems like after #19239 all we have left now is to integrate BatchCoalescer into the AsyncFuncExec and we really could remove the CoalesceBatches optimization rule 🤔

Yes, is someone looking into that? or if not yet I can take a stab.

No one I know if is looking at it.

THank you 🙏 I filed this ticket to track the work:

  • https://github.com/apache/datafusion/issues/19331

alamb avatar Dec 15 '25 11:12 alamb