datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

[EPIC] Substrait: Add producer and consumer for physical plans

Open andygrove opened this issue 2 years ago • 3 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. I would like to use substrait with physical plans. I plan on having an initial PR up this weekend.

Describe the solution you'd like

  • Create new consumer and producer for physical plans

Describe alternatives you've considered

Additional context Substrait to DataFusion's logical plan is tracked at https://github.com/apache/arrow-datafusion/issues/8149

Tasks:

  • [x] https://github.com/apache/arrow-datafusion/pull/5176
  • [ ] https://github.com/apache/arrow-datafusion/issues/9299
  • [ ] https://github.com/apache/arrow-datafusion/issues/9347
  • [ ] https://github.com/apache/arrow-datafusion/issues/8698
  • [ ] https://github.com/apache/arrow-datafusion/issues/8361
  • [x] https://github.com/apache/datafusion/issues/10815
  • [x] https://github.com/apache/datafusion/issues/10817
  • [ ] https://github.com/apache/datafusion/issues/9727
  • [ ] https://github.com/apache/datafusion/issues/9347
  • [ ] https://github.com/apache/datafusion/issues/9299
  • [ ] https://github.com/apache/datafusion/issues/10710
  • [ ] https://github.com/apache/datafusion/issues/10864

andygrove avatar Feb 04 '23 01:02 andygrove

@waynexia @nseekhao fyi

andygrove avatar Feb 04 '23 01:02 andygrove

What is the expected behavior for converting "LogicalPlan -> Substrait -> PhysicalPlan" or "PhysicalPlan -> Substrait -> LogicalPlan"? Or is it allowed?

waynexia avatar Mar 02 '23 06:03 waynexia

I renamed this ticket to be an epic and started collecting tasks needed for better support

alamb avatar Feb 26 '24 20:02 alamb

@andygrove @alamb Could you recommend the best path for implementing these tasks? Since we’re building a distributed query engine based on DataFusion, which requires splitting a physical plan into pipelines, we’re willing to contribute to enhancing the current Substrait functionality in DataFusion.

niebayes avatar Feb 18 '25 03:02 niebayes

Hi @niebayes -- I recommend coordinating with @vbarua and @Blizzara and @wackywendell , others who I think use substrait with physical plans

I think we maybe already have physical consumer/producers, see: https://docs.rs/datafusion-substrait/45.0.0/datafusion_substrait/physical_plan/index.html

The first task migh tbe to go through the existing tickets and see which ones are still relevant

alamb avatar Feb 19 '25 14:02 alamb

@alamb Thanks for your advice. I would first pick a few small tickets to be more familiar with the codebase.

niebayes avatar Feb 20 '25 03:02 niebayes

There indeed exists some kind of producer and consumer for physical plans, but quickly checking they seem very limited. My interest is only in logical plans currently, and I think the same applies for Victor at least from what I've seen (but I may be wrong there).

I don't know much about physical plans overall so dunno if it could reuse parts of the logical plans work, but at least the logical plan consumer/producer can be used as inspiration :)

Blizzara avatar Feb 20 '25 09:02 Blizzara

@Blizzara Thanks for your reply. I initially choose the physical plan because there're more computation can be distributed to executors in a distributed query engine.

Say a sql:

select avg(value) from sx1 group by sid having sid > 1;

The corresponding logical plan might be:

Projection: avg(sx1.value)                                                           |
  Aggregate: groupBy=[[sx1.sid]], aggr=[[avg(CAST(sx1.value AS Float64))]]           |
    Filter: sx1.sid > Int8(1)                                                        |
      TableScan: sx1 projection=[sid, value], partial_filters=[sx1.sid > Int8(1)]  

And the physical plan might look like:

ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                            |
  AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)]    |
    CoalesceBatchesExec: target_batch_size=8192                                      |
      RepartitionExec: partitioning=Hash([sid@0], 8), input_partitions=8             |
        AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)]       |
          CoalesceBatchesExec: target_batch_size=8192                                |
            FilterExec: sid@0 > 1                                                    |
              ParquetExec: file_groups = [..]  

By learning from the datafusion-ballista project, I know we can split the execution plan at pipeline breakers (including RepartitionExec, SortPreservingExec, CoalescePartitionsExec, etc.). So the above execution plan would be split into two parts (aka. pipelines):

ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)]                            |
  AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)]    |
    CoalesceBatchesExec: target_batch_size=8192   
       MergePipelinesExec: pipeline_ids = [...]
          AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)]       |
            CoalesceBatchesExec: target_batch_size=8192                                |
              FilterExec: sid@0 > 1                                                    |
                ParquetExec: file_groups = [..]

As you can see, the first stage of the parallel aggregation algorithm can be distributed to multiple executors which makes the resource utilization better. If we choose to split the logical plan, it seems we can't distribute the aggregation operation, even part of it. I don't know if my understanding is right.

By the way, datafusion-ballista is good for OLAP workloads and it assumes executors are stateless. However, in my scenario, executors are stateful and each executor maintain an in-memory buffer containing the most recently written data (History data are stored in shared object storage). So, when the scheduler is about to construct a physical plan, it has to query each executor for the latest statistics which is required for query optimization. I wonder if it's the standard approach to achieve distributed query based on DataFusion, since the implementation seems complicated.

I really hope the DataFusion community can provide some recommendations on building a distributed query engine based on DataFusion.

niebayes avatar Feb 20 '25 09:02 niebayes

Hi @niebayes, this PR from @robtandy may be quite relevant to your work building a distributed execuiton engine:

  • https://github.com/apache/datafusion-ray/pull/60

The larger project is here https://github.com/apache/datafusion-ray

alamb avatar Feb 22 '25 23:02 alamb

I consolidated the outstanding substrait work in a new epic, so let's close this one and continue discussion there

  • https://github.com/apache/datafusion/issues/17159

alamb avatar Aug 12 '25 19:08 alamb