[EPIC] Substrait: Add producer and consumer for physical plans
Is your feature request related to a problem or challenge? Please describe what you are trying to do. I would like to use substrait with physical plans. I plan on having an initial PR up this weekend.
Describe the solution you'd like
- Create new consumer and producer for physical plans
Describe alternatives you've considered
Additional context Substrait to DataFusion's logical plan is tracked at https://github.com/apache/arrow-datafusion/issues/8149
Tasks:
- [x] https://github.com/apache/arrow-datafusion/pull/5176
- [ ] https://github.com/apache/arrow-datafusion/issues/9299
- [ ] https://github.com/apache/arrow-datafusion/issues/9347
- [ ] https://github.com/apache/arrow-datafusion/issues/8698
- [ ] https://github.com/apache/arrow-datafusion/issues/8361
- [x] https://github.com/apache/datafusion/issues/10815
- [x] https://github.com/apache/datafusion/issues/10817
- [ ] https://github.com/apache/datafusion/issues/9727
- [ ] https://github.com/apache/datafusion/issues/9347
- [ ] https://github.com/apache/datafusion/issues/9299
- [ ] https://github.com/apache/datafusion/issues/10710
- [ ] https://github.com/apache/datafusion/issues/10864
@waynexia @nseekhao fyi
What is the expected behavior for converting "LogicalPlan -> Substrait -> PhysicalPlan" or "PhysicalPlan -> Substrait -> LogicalPlan"? Or is it allowed?
I renamed this ticket to be an epic and started collecting tasks needed for better support
@andygrove @alamb Could you recommend the best path for implementing these tasks? Since we’re building a distributed query engine based on DataFusion, which requires splitting a physical plan into pipelines, we’re willing to contribute to enhancing the current Substrait functionality in DataFusion.
Hi @niebayes -- I recommend coordinating with @vbarua and @Blizzara and @wackywendell , others who I think use substrait with physical plans
I think we maybe already have physical consumer/producers, see: https://docs.rs/datafusion-substrait/45.0.0/datafusion_substrait/physical_plan/index.html
The first task migh tbe to go through the existing tickets and see which ones are still relevant
@alamb Thanks for your advice. I would first pick a few small tickets to be more familiar with the codebase.
There indeed exists some kind of producer and consumer for physical plans, but quickly checking they seem very limited. My interest is only in logical plans currently, and I think the same applies for Victor at least from what I've seen (but I may be wrong there).
I don't know much about physical plans overall so dunno if it could reuse parts of the logical plans work, but at least the logical plan consumer/producer can be used as inspiration :)
@Blizzara Thanks for your reply. I initially choose the physical plan because there're more computation can be distributed to executors in a distributed query engine.
Say a sql:
select avg(value) from sx1 group by sid having sid > 1;
The corresponding logical plan might be:
Projection: avg(sx1.value) |
Aggregate: groupBy=[[sx1.sid]], aggr=[[avg(CAST(sx1.value AS Float64))]] |
Filter: sx1.sid > Int8(1) |
TableScan: sx1 projection=[sid, value], partial_filters=[sx1.sid > Int8(1)]
And the physical plan might look like:
ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)] |
AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)] |
CoalesceBatchesExec: target_batch_size=8192 |
RepartitionExec: partitioning=Hash([sid@0], 8), input_partitions=8 |
AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)] |
CoalesceBatchesExec: target_batch_size=8192 |
FilterExec: sid@0 > 1 |
ParquetExec: file_groups = [..]
By learning from the datafusion-ballista project, I know we can split the execution plan at pipeline breakers (including RepartitionExec, SortPreservingExec, CoalescePartitionsExec, etc.). So the above execution plan would be split into two parts (aka. pipelines):
ProjectionExec: expr=[avg(sx1.value)@1 as avg(sx1.value)] |
AggregateExec: mode=FinalPartitioned, gby=[sid@0 as sid], aggr=[avg(sx1.value)] |
CoalesceBatchesExec: target_batch_size=8192
MergePipelinesExec: pipeline_ids = [...]
AggregateExec: mode=Partial, gby=[sid@0 as sid], aggr=[avg(sx1.value)] |
CoalesceBatchesExec: target_batch_size=8192 |
FilterExec: sid@0 > 1 |
ParquetExec: file_groups = [..]
As you can see, the first stage of the parallel aggregation algorithm can be distributed to multiple executors which makes the resource utilization better. If we choose to split the logical plan, it seems we can't distribute the aggregation operation, even part of it. I don't know if my understanding is right.
By the way, datafusion-ballista is good for OLAP workloads and it assumes executors are stateless. However, in my scenario, executors are stateful and each executor maintain an in-memory buffer containing the most recently written data (History data are stored in shared object storage). So, when the scheduler is about to construct a physical plan, it has to query each executor for the latest statistics which is required for query optimization. I wonder if it's the standard approach to achieve distributed query based on DataFusion, since the implementation seems complicated.
I really hope the DataFusion community can provide some recommendations on building a distributed query engine based on DataFusion.
Hi @niebayes, this PR from @robtandy may be quite relevant to your work building a distributed execuiton engine:
- https://github.com/apache/datafusion-ray/pull/60
The larger project is here https://github.com/apache/datafusion-ray
I consolidated the outstanding substrait work in a new epic, so let's close this one and continue discussion there
- https://github.com/apache/datafusion/issues/17159