opteryx
opteryx copied to clipboard
Add flow/subplan identification to physical planner for parallel execution
Overview
This PR implements automatic flow/subplan identification in the physical planner to enable more efficient parallel execution. The planner now identifies chains of operations that can be executed together on a worker without needing to report back interim snapshots.
Problem
The parallel execution engine needs to understand which operations can be grouped together and executed as a unit. Previously, the physical plan was just a DAG of operators without any grouping information, making it difficult to optimize parallel execution and minimize communication overhead between workers.
Solution
Added identify_flows() method to PhysicalPlan that automatically annotates each operator node with a flow_id indicating which flow it belongs to. Flows are linear sequences of stateless operators that can execute together. The algorithm breaks flows at natural boundaries:
- Stateful nodes (aggregates, sorts, distinct) - require accumulation across data batches
- Join nodes - require coordination between multiple data sources
- Branch points - nodes with multiple children (data splits)
- Merge points - nodes with multiple parents (data merges)
Boundary nodes are marked with flow_id=None to indicate they require special handling.
Example
For a query with joins and aggregations:
SELECT category, COUNT(*)
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id
WHERE t1.value > 10
GROUP BY category
LIMIT 100
The physical plan creates the following flows:
Scan(t1) -> Filter(t1) -> [Flow 0]
Scan(t2) -> Filter(t2) -> [Flow 1]
Join -> [boundary: flow_id=None]
Project -> [Flow 2]
AggregateAndGroup -> [boundary: flow_id=None]
Limit -> [Flow 3]
This allows the parallel engine to:
- Execute flows 0 and 1 in parallel on separate workers
- Coordinate at the join boundary
- Execute flow 2 independently after the join
- Coordinate at the aggregation boundary
- Execute flow 3 for the final result
Implementation Details
Algorithm:
- Uses breadth-first search (BFS) from entry points to ensure parents are processed before children
- O(N) complexity where N is the number of nodes
- Processes all nodes exactly once
- Correctly handles complex plans with multiple branches and joins
Integration:
- Automatically called in
create_physical_plan()after the plan is constructed - No breaking changes to existing code
- Nodes gain a new
flow_idattribute that existing code can safely ignore
Benefits
- Reduced Communication Overhead - Workers can execute entire flows without reporting interim results
- Better Data Locality - Related operations stay together on the same worker
- Simplified Coordination - Only boundary nodes require synchronization between workers
- Clear Parallelization Strategy - Explicit identification of parallelizable work units
- Performance Improvements - Less inter-worker communication and scheduling overhead
Testing
Added comprehensive test suite covering:
- Linear chains of stateless operators forming single flows
- Stateful nodes acting as flow boundaries
- Join scenarios with separate flows for each branch
- Branch points creating new flows
- Merge points creating new flows
- Complex queries with multiple joins and aggregations
All tests verify correct flow identification and proper handling of boundary nodes.
Files Changed
opteryx/models/physical_plan.py- Addedidentify_flows()method with comprehensive documentationopteryx/planner/physical_planner.py- Integrated flow identification into plan creationtests/query_execution/test_flow_identification.py- Full test coverage for all scenarios
Next Steps
The parallel execution engine can now leverage flow information to:
- Send entire flows to workers as single units of work
- Schedule independent flows to run in parallel
- Minimize data transfer between workers by executing flows locally
- Optimize resource allocation based on flow complexity and dependencies
Original prompt
can you update the physical planner to plan so that chains of the pipeline that are able to be run in parallel are created into "flows" or "subplans", the intention being that we we implement the parallel engine, these chains are sent to a worker to execute together without needing to report back interim snapshots if it doesn't need to.
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.
Quality Gate passed
Issues
3 New issues
0 Accepted issues
Measures
0 Security Hotspots
0.0% Coverage on New Code
0.0% Duplication on New Code
Quality Gate passed
Issues
3 New issues
0 Accepted issues
Measures
0 Security Hotspots
0.0% Coverage on New Code
0.0% Duplication on New Code