datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

A More General Approach for Optimizing Projections in Physical Plans

Open berkaysynnada opened this issue 1 year ago • 9 comments

Is your feature request related to a problem or challenge?

In the current version of ProjectionPushdown, there are some algorithmic limitations and it is not very friendly to be extendable. To solve this optimization in theoretical limits within the strategy of "pushing down" is not possible. We will need another approach. Also any custom plan should be easily integrable with this optimization.

Describe the solution you'd like

The new rule aims achieving the most effective use of projections in plans. It will ensures that query plans are free from unnecessary projections and that no unused columns are propagated unnecessarily between plans. The rule is designed to enhance query performance by:

  1. Preventing the transfer of unused columns from leaves to root.
  2. Ensuring projections are used only when they contribute to narrowing the schema, or when necessary for evaluation or aliasing.

The optimization is conducted in two phases:

Top-down Phase:

  • Traverses the plan from root to leaves. If the node is:
    1. Projection node, it may: a) Merge it with its input projection if merge is beneficial. b) Remove the projection if it is redundant. c) Narrow the Projection if possible. d) The projection can be nested into the source. e) Do nothing, otherwise.
    2. Non-Projection node: a) Schema needs pruning. Insert the necessary projections to the children. b) All fields are required. Do nothing.

Bottom-up Phase:

This pass is required because modifying a plan node can change the column indices used by output nodes. When such a change occurs, we store the old and new indices of the columns in the node's state. We then proceed from the leaves to the root, updating the indices of columns in the plans by referencing these mapping records. After the top-down phase, also some unnecessary projections may emerge. When projections check its input schema mapping, it can remove itself and assign new schema mapping to the new node which was the projection's input formerly.

The designed node structure is:

struct ProjectionOptimizer {
    pub plan: Arc<dyn ExecutionPlan>,
    /// The node above expects it can reach these columns.
    pub required_columns: HashSet<Column>,
    /// The nodes above will be updated according to these mathces. First element indicates
    /// the initial column index, and the second element is for the updated version.
    pub schema_mapping: HashMap<Column, Column>,
    pub children_nodes: Vec<ProjectionOptimizer>,
}

To summarize, with two state variables for each plan node (one for transferring the required columns and one for the notifying changes of column indices), and with two passes (it is actually one pass, the bottom-up pass will be done implicitly during the attachment of transformed children to the self node), we will have a future-proof projection optimizer rule.

Describe alternatives you've considered

No response

Additional context

I am currently working on this issue. I will plan to open a PR for suggestions, especially on how to update the ExecutionPlan API to get rid of if else structure of all plans. It will be ready likely next week. It's not expected to significantly alter our current plans, but it will be a solid step towards optimizing potential outcomes following other existing optimizations and future ones.

berkaysynnada avatar Feb 02 '24 11:02 berkaysynnada

(copying my comment from elsewhere)

FWIW there is a version of pushdown in InfluxDB (written by @crepererum ) that may serve as a useful reference https://github.com/influxdata/influxdb/blob/main/iox_query/src/physical_optimizer/projection_pushdown.rs

alamb avatar Feb 02 '24 20:02 alamb

I have a two questions:

  1. Do you know of any examples of "algorithmic limitations" (e.g. plans where unnecessary columns are carried through)?
  2. How does this compare to how pushdown is done during LogicalPlanning (e.g. https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/push_down_projection.rs). Are you planning changes / extension to that logic too? One of the reason to do pushdown at that level is that it is less complicated (e.g. output indexes aren't used)

The idea of supporting ProjectionPushdown for userdefined plans sounds a good idea to me 👍 . If that is indeed the usecase adding a test showing it doesn't work today and works after your changes is 🆗

I will plan to open a PR for suggestions, especially on how to update the ExecutionPlan API to get rid of if else structure of all plans.

I assume you are referring to this code: https://github.com/apache/arrow-datafusion/blob/7641a3228156aab0e48c4bab5a6834b44f722d89/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L105-L143

Maybe you could add an API like this:

trait ExecutionPlan {
...
  /// Return a copy of this plan that only produces the specified outputs, if possible, in the specified order.
  /// the projection is a BTreeMap as it is unique and in increasing order (as some nodes like HashAggregateExec
  /// can't reorder their outputs 
  /// 
  /// For a plan plans such as `ProjectionExec`, projecting a subset of columns will reduce the
  /// expression list. 
  ///
  /// for some plans such as `HashAggregateExec`, projection may not be possible (e.g. it is not possible to remove 
  /// group columns from the output)
  /// 
  /// By default, returns Ok(None)
  fn try_project(projection: BTreeSet<usize>) -> Result<Option<Arc<dyn Self>>  { Ok(None) }
...

🤔

alamb avatar Feb 02 '24 20:02 alamb

I have a two questions:

  1. Do you know of any examples of "algorithmic limitations" (e.g. plans where unnecessary columns are carried through)?
  2. How does this compare to how pushdown is done during LogicalPlanning (e.g. https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/push_down_projection.rs). Are you planning changes / extension to that logic too? One of the reason to do pushdown at that level is that it is less complicated (e.g. output indexes aren't used)
  1. What I meant by algorithmic limitations was about the current rule. The main problem occurs in such a situation: ProjectionA <- OperatorA <- OperatorB Pushdown mechanism tries to push the projection between OperatorA and OperatorB. When it cannot pushdown it, the rule gives up. However, there might be some cases which the best solution shows up like this: ProjectionB <- OperatorA <- ProjectionC <- OperatorB

Another problem occurs when a projection is inserted into the input of some operator to decrease the load, however; that projection insertion may cause to change the indices of some columns at the output operators. In such a case, all operators object to a column index change must be rewritten. I would like to eliminate those kind of restrictions by extending the capabilities of the current rule, and I believe pushing down mechanism falls behind it.

  1. I haven't deeply examined the rule in the logical plan, but I know the rule there is less complicated. However, after all the rules have been worked in a physical plan, it would be comforting for such a rule to double check the plan.

Maybe you could add an API like this:

The new version API requirements are a little different. I re-request your opinions when the PR is ready. Thanks for your evaluation and suggestions.

berkaysynnada avatar Feb 05 '24 07:02 berkaysynnada

I haven't deeply examined the rule in the logical plan, but I know the rule there is less complicated. However, after all the rules have been worked in a physical plan, it would be comforting for such a rule to double check the plan.

I was thinking the LogicalPlan pushdown rule might be interesting to you as it implements the type of pushdown you describe (pushing only columns that are needed and doesn't have to push them all).

Perhaps you can avoid having to invent a new algorithm

alamb avatar Feb 05 '24 10:02 alamb

If I remember correctly LogicalPlan rule works with qualified column names. Let's think there is a plan such as: A <- B <- C and B refers to SomeColumn with index 2. If a projection is inserted between B and C, and SomeColumn then comes from the 1th index, we should both update operator A and operator B. All expressions can be rewritten if a change occurs in the plan because of this kind of index changes. However LogicalPlan's do not deal with indices. Otherwise, you are right. Some kind of partial pushdowns could be enough with a little iterations on the current rule.

berkaysynnada avatar Feb 05 '24 11:02 berkaysynnada

All expressions can be rewritten if a change occurs in the plan because of this kind of index changes.

Right -- I think the difference is that in the Physical realm, projection pushdown could maintain the mapping of "is output column N used" where N is the index.

Then the trick will be to implement the transformation for each type of ExecutionPlan to both:

  1. Rewrite itself if one of its output columns is not used
  2. Describe which of the columns on the input are required to create its output

With thse two functions I think you could implement a pretty simple pushdown rule

alamb avatar Feb 05 '24 11:02 alamb

Yes, as you said, in short, the rule consists of tracking the required columns across the plan and rewriting the plans by new expressions with updated indices.

berkaysynnada avatar Feb 05 '24 12:02 berkaysynnada

If I remember correctly LogicalPlan rule works with qualified column names. Let's think there is a plan such as: A <- B <- C and B refers to SomeColumn with index 2. If a projection is inserted between B and C, and SomeColumn then comes from the 1th index, we should both update operator A and operator B. All expressions can be rewritten if a change occurs in the plan because of this kind of index changes. However LogicalPlan's do not deal with indices. Otherwise, you are right. Some kind of partial pushdowns could be enough with a little iterations on the current rule.

Is it like FunctionalDependence in DFSchema ?

Is it like FunctionalDependence in DFSchema ?

I couldn't understand the way you think there is a similarity, can you give more detail?

berkaysynnada avatar Feb 14 '24 06:02 berkaysynnada