datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

[EPIC] Stop copying `LogicalPlan` during OptimizerPasses

Open alamb opened this issue 1 year ago • 47 comments

Is your feature request related to a problem or challenge?

Broken out from https://github.com/apache/arrow-datafusion/issues/9577 where @mustafasrepo @comphead and @jayzhan211 and I were discussing optimizer performance

TLDR is that the datafusion optimizer is slow. When I did some profiling locally by running the following

cargo bench --bench sql_planner -- physical_plan_tpch_all

My analysis is that almost 40% of the planning time is spent in SimplifyExprs and CommonSubexprEliminate and most of that time is related to copying expressions from what I can tell

Screenshot 2024-03-14 at 11 07 57 AM

While those passes themselves internally make a bunch of clones, which we are improving (e.g. @jayzhan211 on https://github.com/apache/arrow-datafusion/pull/9628) I think there is a more fundamental structural problem

I think a core challenge is that the OptimizerRule trait pretty much requires copying Exprs on each pass, as it gets a &LogicalPlan input, but produces a LogicalPlan output

    // Required methods
    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Option<LogicalPlan>, DataFusionError>;

This mean any pass that works on Exprs must clone all Exprs (by calling LogicalPlan::expressions()) rewrite them, and then then create a new LogicalPlan with those new Exprs.

Here is that pattern in the expression simplifier:

https://github.com/apache/arrow-datafusion/blob/0eec5f8e1d0f55e48f5cdc628fbb5ddd89b91512/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs#L112-L123

Describe the solution you'd like

Find some way to avoid clone'ing exprs during LogicalPlan rewrite

Update: here are the tasks:

Infrastructure Preparation

  • [x] https://github.com/apache/arrow-datafusion/pull/9948
  • [x] https://github.com/apache/arrow-datafusion/pull/9999

Update OptimizerRules to avoid copying

  • [x] SimplifyExpressions: https://github.com/apache/arrow-datafusion/pull/9954
  • [ ] CommonSubexprEliminate: https://github.com/apache/datafusion/issues/9873
  • [x] DecorrelatePredicateSubquery: https://github.com/apache/datafusion/issues/10289
  • [x] EliminateCrossJoin: https://github.com/apache/datafusion/issues/10287
  • [x] EliminateDuplicatedExpr: https://github.com/apache/datafusion/pull/10218
  • [x] EliminateFilter: https://github.com/apache/datafusion/issues/10288
  • [x] EliminateJoin: https://github.com/apache/datafusion/pull/10184
  • [x] EliminateLimit: #10212
  • [x] EliminateNestedUnion: https://github.com/apache/datafusion/issues/10296
  • [x] EliminateOneUnion: https://github.com/apache/datafusion/pull/10184
  • [x] EliminateOuterJoin: https://github.com/apache/arrow-datafusion/pull/10081
  • [x] ExtractEquijoinPredicate: https://github.com/apache/datafusion/pull/10165
  • [x] FilterNullJoinKeys https://github.com/apache/datafusion/pull/10166
  • [x] OptimizeProjections: https://github.com/apache/datafusion/issues/10209
  • [x] PropagateEmptyRelation: https://github.com/apache/datafusion/issues/10290
  • [x] PushDownFilter: https://github.com/apache/datafusion/issues/10291
  • [x] PushDownLimit: https://github.com/apache/datafusion/issues/10292
  • [x] ReplaceDistinctWithAggregate: https://github.com/apache/datafusion/issues/10293
  • [x] RewriteDisjunctivePredicate: #10213
  • [x] ScalarSubqueryToJoin: https://github.com/apache/datafusion/issues/10294
  • [ ] SingleDistinctToGroupBy: https://github.com/apache/datafusion/issues/10295
  • [x] UnwrapCastInComparison https://github.com/apache/arrow-datafusion/pull/10087 / https://github.com/apache/arrow-datafusion/pull/10115

Update AnalyzerRules to avoid copying

  • [x] AnalyzerMisc: https://github.com/apache/arrow-datafusion/pull/9974
  • [x] InlineTableScan: https://github.com/apache/arrow-datafusion/pull/10038
  • [x] ApplyFunctionRewrites: https://github.com/apache/arrow-datafusion/pull/10032
  • [ ] TypeCoercion: https://github.com/apache/datafusion/issues/10210
  • [ ] TypeCoercion more: https://github.com/apache/datafusion/issues/10365
  • [x] CountWildcardRule: (needs a little reworking to avoid clones) https://github.com/apache/arrow-datafusion/pull/10066

Update Other to avoid copying

  • [x] https://github.com/apache/arrow-datafusion/pull/10016

Describe alternatives you've considered

No response

Additional context

We have talked about various other ways to reduce copying of LogicalPlans as well as its challenges in other tickets:

  • https://github.com/apache/arrow-datafusion/issues/4628
  • https://github.com/apache/arrow-datafusion/issues/5157
  • https://github.com/apache/arrow-datafusion/issues/9577

alamb avatar Mar 16 '24 11:03 alamb

I have two idea before deep diving to the code

  1. rewrite to have owned LogicalPlan then we can have owned expression easily. <- lots of code change
  2. lazy clone on rewrite, the spirit similar to copy on write. The idea from simplify() #9304 may help here too or we can rely on Transformed<Expr>
enum SimpliedExpr {
  Simplified(Expr)
  Original
}

fn simply_expr(&self, expr: &Expr) -> SimpliedExpr

if expr is Original, we can avoid clone.

jayzhan211 avatar Mar 16 '24 15:03 jayzhan211

I have two idea before deep diving to the code

I think 2 sounds interesting

Another think I was thinking was something like LogicalPlan::rewrite(mut self, rewriter)

I think that along with Arc::try_unwrap could be used to minimize the places where cloning was actually needed

Maybe we can prototype with

impl OptimizerRule {
...

  /// does this rule support rewriting owned plans?
  fn supports_owned(&self) -> bool { return false }

  /// if supports_owned returns true, calls try_optimize_owned
  fn try_optimize_owned(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...

And then play around with the code that calls try_optimize here https://github.com/apache/arrow-datafusion/blob/f4107d47bb4c0260d301294ddfc7c67d96574636/datafusion/optimizer/src/optimizer.rs#L360-L368 to try and use the try_optimize_owned API (without having to rewrite all the optimizers) for SimplifyExprs

If we could show a significant performance improvement for the sql_planner benchmarks then I think it would be worth spending time reworking the other APIs

alamb avatar Mar 16 '24 23:03 alamb

@alamb I think being able to skip failed rules is the main challenge to rewriting the plan with ownership

https://github.com/apache/arrow-datafusion/blob/37253e57beb25f0f1a4412b75421a489c2cb3c6a/datafusion/optimizer/src/optimizer.rs#L325

We need to restore the original plan if the rewrite fails at some point, however, the old plan is consumed and we lose ownership. Giving up ownership too early is not a good idea if we need to restore the original plan.

Instead of Transformed, I think we need to preserve the old data and fail state to mimic Err in Result but with old data

#[derive(Debug, PartialEq, Eq)]
pub enum OptimizedState {
    Yes,
    No,
    Fail,
}

#[derive(Debug)]
pub struct Optimized<T, E = DataFusionError> {
    pub optimzied_data: Option<T>,
    // Used to store the original data if optimized successfully
    pub original_data: T,
    pub optimized_state: OptimizedState,
    // Used to store the error if optimized failed, so we can early return but preserve the original data
    pub error: Option<E>,
}

impl<T, E> Optimized<T, E> {
    pub fn yes(optimzied_data: T, original_data: T) -> Self {
        Self {
            optimzied_data: Some(optimzied_data),
            original_data,
            optimized_state: OptimizedState::Yes,
            error: None,
        }
    }

    pub fn no(original_data: T) -> Self {
        Self {
            optimzied_data: None,
            original_data,
            optimized_state: OptimizedState::No,
            error: None,
        }
    }

    pub fn fail(original_data: T, e: E) -> Self {
        Self {
            optimzied_data: None,
            original_data,
            optimized_state: OptimizedState::Fail,
            error: Some(e),
        }
    }
}

Then, for every optimization, we return Optimized<LogicalPlan> or Optimzied<Expr>

jayzhan211 avatar Mar 17 '24 07:03 jayzhan211

@alamb I think being able to skip failed rules is the main challenge to rewriting the plan with ownership

This is an excellent point

Instead of Transformed, I think we need to preserve the old data and fail state to mimic Err in Result but with old data

If we need to preserve the old data, there is no choice but to copy the LogicalPlan on each pass 🤔 (well I guess there could be some way to repair the partially rewritten plan, but that sounds very compliated).

Since skip_failed_rules is false by default:

https://github.com/apache/arrow-datafusion/blob/dcfe70987e98da0410146b5e1292ab20f3f118e0/datafusion/common/src/config.rs#L549

Could we maybe only do the copy when skip_failed_rules is enabled? Something like this:

if skip_failed_rules {
  let original_plan = plan.clone();
  let new_plan = optimizer.try_optimize_owned(plan)
   .ok_or_else(|e| original_plan);
} else {
  optimizer.try_optimize_owned(plan)?
}

alamb avatar Mar 17 '24 10:03 alamb

I think it is possible to preserve unnecessary clones even if we preserve the old plan, and only clone the data for the new expr and new plan, but I agree it is a little complicated, it needs the enum I show above. We can optimize for the default path first, and others later on.

jayzhan211 avatar Mar 17 '24 12:03 jayzhan211

Let's give it a try!

alamb avatar Mar 17 '24 14:03 alamb

note: I profile my change and find out the time does not improve at all, I find that the cloned inside expr_list_to_fields may be the core issue.

jayzhan211 avatar Mar 17 '24 15:03 jayzhan211

I analyze the sql_planner again and find that exprlist_to_fields and calc_func_dependencies_for_project are the two most time spending func.

so I think #9595 might be an important step for the optimization here.

https://github.com/apache/arrow-datafusion/blob/ad8d552b9f150c3c066b0764e84f72b667a649ff/datafusion/expr/src/logical_plan/plan.rs#L1804-L1813

jayzhan211 avatar Mar 20 '24 05:03 jayzhan211

I played around with some ideas last night and it is looking promising (though not yet done). I put my draft here https://github.com/apache/arrow-datafusion/pull/9708. I hope to try and work on it more over the next few days, but I am pretty busy preparing for meetups / presentations / papers for next week. It may be a few more days

alamb avatar Mar 20 '24 13:03 alamb

I would be surprised if #9708 improves, let me see if I can improve #9658 based on it.

jayzhan211 avatar Mar 20 '24 14:03 jayzhan211

BTW here is a video of how I profile DataFusion: https://github.com/apache/arrow-datafusion/issues/9577#issuecomment-2009749007

alamb avatar Mar 20 '24 14:03 alamb

I'm curious, It seems like most of the reasoning behind Arc<LogicalPlan> and clone are due to the optimizer stage. Polars uses an intermediate representation ALogicalPlan (arena allocated logical plan) solely for the optimizer stage. I wonder if the same techinque could be applied here.

universalmind303 avatar Mar 20 '24 21:03 universalmind303

I think area allocating is pretty common in various compiler/query optimizer systems because you know the lifetime of a plan doesn't outlive the the optimizer pass (as in you can bound the intermediate results)

I think rust actually is ideally setup to avoid having to arena allocate (which still requires copying stuff to a new plan) (using ownership) -- we just need to regigger how DataFusion is setup to use this. I think we are close

alamb avatar Mar 20 '24 23:03 alamb

There is one issue that we are not able to call Arc::into_inner for skip-failed-rules path since we hold a clone for restoration.

jayzhan211 avatar Mar 21 '24 00:03 jayzhan211

There is one issue that we are not able to call Arc::into_inner for skip-failed-rules path since we hold a clone for restoration.

What I did in https://github.com/apache/arrow-datafusion/pull/9708 which seems to have worked pretty well is to copy the LogicalPlan only if skip_failed_rules is set

Like this

                let prev_plan = if options.optimizer.skip_failed_rules {
                    Some(new_plan.clone())
                } else {
                    None
                };

alamb avatar Mar 21 '24 10:03 alamb

Upd: I see the latest commit that uses mem::swap for updating Arc<T> in rewrite_arc. 👍

I think when you rewrite inputs, you still need to call Arc::into_inner to get T: LogicalPlan without clone. And, it is where things are getting tricky. But I played around without mut before, maybe there are ways to solve this with mut.

On Thu, Mar 21, 2024, 6:04 PM Andrew Lamb @.***> wrote:

There is one issue that we are not able to call Arc::into_inner for skip-failed-rules path since we hold a clone for restoration.

What I did in #9708 https://github.com/apache/arrow-datafusion/pull/9708 which seems to have worked pretty well is to copy the LogicalPlan only if skip_failed_rules is set

Like this https://github.com/apache/arrow-datafusion/pull/9708/files#diff-42d66905c3fa6b245c3493fb4df4816e0fde3036941315ab42198fb16f3907dfR319

            let prev_plan = if options.optimizer.skip_failed_rules {
                Some(new_plan.clone())
            } else {
                None
            };

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-datafusion/issues/9637#issuecomment-2011805443, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADZCLR2E6LQDXLPP7PW2ZJ3YZKWBJAVCNFSM6AAAAABEZIFWPOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAMJRHAYDKNBUGM . You are receiving this because you were mentioned.Message ID: @.***>

jayzhan211 avatar Mar 21 '24 11:03 jayzhan211

But I played around without mut before, maybe there are ways to solve this

Yes, this is indeed the case. The code in https://github.com/apache/arrow-datafusion/pull/9708 is pretty terrible / hacky to deal with the Arcd inputs, but it does seem to work:

alamb avatar Mar 21 '24 12:03 alamb

With some somewhat hacky code in https://github.com/apache/arrow-datafusion/pull/9708#issuecomment-2012196911 I saw a 25% performance improvement.

Given the results I saw in https://github.com/apache/arrow-datafusion/pull/9708 here is my proposal:

  1. Update the Optimizer implementation / Optimizer APIs to rewrite existing LogicalPlans rather than make new ones
  2. Add the necessary APIs to LogicalPlan
  3. Contemplate eventually switching to use Box<LogicalPlan> instead of Arc<LogicalPlan> to store children in LogicalPlan....

I am not sure which of these APIs would be better (rewrite in place via &mut plan or take ownership (via plan)). I feel like the first one would likely be faster (as it avoids copying at all), but the second is easier to reason about 🤔

impl OptimizerRule {
...
  /// does this rule support rewriting plans rather than copying them?
  fn supports_mut (&self) -> bool { return false }

  /// if supports_mut, returns true, rewrites `plan` in place 
  fn optimize_mut(
        &self,
        plan: &mut LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<()>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...
impl OptimizerRule {
...

  /// does this rule support rewriting owned plans?
  fn supports_owned(&self) -> bool { return false }

  /// if supports_owned returns true, rewrites the LogicalPlan in returning a newly rewritten plan
  fn try_optimize_owned(
        &self,
        plan: LogicalPlan,
        config: &dyn OptimizerConfig
    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
      not_implemented_err!("try_optimized_owned is not implemented for this rule")
    }
...

alamb avatar Mar 21 '24 12:03 alamb

Upd: I realize we need to do the std::mem::take for Box too. 😢 Should we change to box first, so we can easily modify plan without hack?

rewrite in place via &mut plan or take ownership (via plan))

I think we can take ownership by default, and mutable inplace if better? For example, change to muable for rewriting new expressions and new plans.

jayzhan211 avatar Mar 24 '24 02:03 jayzhan211

Should we change to box first, so we can easily modify plan without hack?

I was exploring keeping the Arc initally for two reasons:

  1. It avoids breaking changes in downstream crates
  2. Until we fix the optimzier so it doens't do so much copying, switching to Box will only make the problem worse (as now all clones will do deep clones of all children.

SO I was thinking we could start with the hacky solution that kept Arc<LogicalPlan> and then we could potentially switch to Box<LogicalPlan> afterwards.

alamb avatar Mar 24 '24 18:03 alamb

After playing around with this, I have become convinced what is really needed is a "TreeMutator" type API on TreeNode that lets us rewrite LogicalNodes in place -- if we had that API I think we can make the DataFusion Optimizer screaming fast

alamb avatar Mar 24 '24 18:03 alamb

After playing around with this, I have become convinced what is really needed is a "TreeMutator" type API on TreeNode that lets us rewrite LogicalNodes in place -- if we had that API I think we can make the DataFusion Optimizer screaming fast

It seems treemutaor is &mut T at the first place, and in your previous design #9708 we have &mut T for rewrite_inputs and rewrite_exprs. The think the latter switch to &mut T when necessary, so I don't understand why the former one is better, I guess they have the similar performance.

jayzhan211 avatar Mar 25 '24 02:03 jayzhan211

After playing around with this, I have become convinced what is really needed is a "TreeMutator" type API on TreeNode that lets us rewrite LogicalNodes in place -- if we had that API I think we can make the DataFusion Optimizer screaming fast

I think a LogicalPlan tree is made of Arc links between the nodes currently. So I wonder if obtaining mutable references to a LogicalPlan node's children is possible at all without cloning the children first? (I.e. can we implement this todo https://github.com/apache/arrow-datafusion/pull/9780/files#diff-9619441d9605f143a911319cea75ae5192e6c5b17acfcbc17a3c73a9e32a8e61R138 effectively?)

peter-toth avatar Mar 25 '24 08:03 peter-toth

so I don't understand why the former one is better, I guess they have the similar performance.

I was having trouble actually implementing the required TreeNode API for &mut LogicalPlan -- I can't remember the actual compiler error. I don't think they will differ in performance

I think a LogicalPlan tree is made of Arc links between the nodes currently. So I wonder if obtaining mutable references to a LogicalPlan node's children is possible at all without cloning the children first?

Yes, I was able to do so (see the horrible code here to do it): https://github.com/apache/arrow-datafusion/pull/9708/files#diff-f69e720234d9da6cb3a4a178d3a5575fcd34f68191335a8c2465a172e8c4e6f1R498-R538

I also verified that it was actually unwrapping Arcs most of the time (not cloning)

(I.e. can we implement this todo https://github.com/apache/arrow-datafusion/pull/9780/files#diff-9619441d9605f143a911319cea75ae5192e6c5b17acfcbc17a3c73a9e32a8e61R138 effectively?)

I tried to implement this API and it was not going well. However, implementing a TreeNodeMutator (similar to TreeNodeRewriter but that took a mut &T rather than T) seems to be more promsising.

I am traveling this week so I won't have as much time as I would like to work on this, but I hope to push some more progress to https://github.com/apache/arrow-datafusion/pull/9780 soon

alamb avatar Mar 25 '24 11:03 alamb

Before https://github.com/apache/arrow-datafusion/pull/8891 I ran some experiments in https://github.com/apache/arrow-datafusion/pull/7942 with different APIs to test if transform/rewrite could in place mutate the nodes. In place mutation indeed seemed very effective, especially on Expr trees: https://github.com/apache/arrow-datafusion/pull/7942#issuecomment-1864247136. It seemed to improve less on LogicalPlan trees: https://github.com/apache/arrow-datafusion/pull/7942#issuecomment-1866908166, but I tested the case when a reference is kept to the original tree so the Arc::try_unwrap trick wouldn't work there. But then https://github.com/apache/arrow-datafusion/pull/8891 didn't add that API and kept the TreeNodeRewriter as a node consuming and producing one for the sake of simplicity.

peter-toth avatar Mar 25 '24 13:03 peter-toth

I have the first PR for this issue here: https://github.com/apache/arrow-datafusion/pull/9780 that introduces TreeNodeMutator and uses it to avoid some copies. There is more to go but it makes a 10% improvement in planning

alamb avatar Apr 01 '24 15:04 alamb

An update here is that with a few PRs in flight, some of the planning benchmarks go 50% faster (see details on https://github.com/apache/arrow-datafusion/pull/9954)

It will take a while to get these PRs in (there are a few chained up) but the "don't copy the plan so many times" approach seems to be paying off very well. Having the TreeNode API to build off is super helpful

alamb avatar Apr 05 '24 11:04 alamb

https://github.com/apache/arrow-datafusion/pull/9999 is now ready for review

alamb avatar Apr 08 '24 15:04 alamb

Here is the next PR ready for review: https://github.com/apache/arrow-datafusion/pull/9948 (10% faster planning)

alamb avatar Apr 09 '24 14:04 alamb

Here is a PR that gets (another) 20% speed boost by not copying in Expr simplifier: https://github.com/apache/arrow-datafusion/pull/9954

alamb avatar Apr 10 '24 19:04 alamb