datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Move `create_physical_expr` to `physical-expr-common`

Open jayzhan211 opened this issue 1 year ago • 16 comments

Is your feature request related to a problem or challenge?

https://github.com/apache/arrow-datafusion/blob/671cef85c550969ab2c86d644968a048cb181c0c/datafusion/functions-aggregate/src/first_last.rs#L102-L117

I found that we need to match Alias cases, it seems we should try to reuse create_physical_sort_exprs if possible

We need to move create_physical_expr to physical-expr-common, and this might involve moving lots of things to physical-expr-common` too, not sure if it is reasonable yet.

Describe the solution you'd like

Two places that we need create_physical_expr

  1. Convert to physical-expr in accumulator with create_physical_sort_exprs.
  2. reverse_expr, convert logical aggr expr to physical aggr expr with create_physical_expr, given AggregateFunction expect to get Arc<dyn AggregateExpr>

Describe alternatives you've considered

~Actually, we have what we need of physical sorting expression ordering_req in AggregateFunctionExpr already. But since AccumulatorArgs and AggregateUDFImpl live in datafusion_expr layer, so we can't get the value from it.~

~I still think it would be nice to have a design that can deal with physical-expr for UDAF 😞 Should we move AggregateUDFImpl to a crate that imports both datafusion-expr and datafusion-physical-expr-common?~ 🤔

No response

Additional context

No response

jayzhan211 avatar Apr 14 '24 03:04 jayzhan211

I would like to continue the discussion in #9972 since I don't have a good reason not to support directly mutating physical-expr. And the solution (moving conversion to optimizer rule) is specific to FIRST/LAST for me, I'm not sure moving all the function including reverse_expr to optimizer rule is a good idea.

I still think it would be nice to have a design that can deal with physical-expr for UDAF 😞 Should we move AggregateUDFImpl to a crate that imports both datafusion-expr and datafusion-physical-expr-common

I think we can consider moving AggregateUDFImpl to functions-aggregate-common that imports both datafusion-expr and datafusion-physical-expr-common.

The pros are that we can avoid doing conversion of ordering twice in accumulator where it is already computed while creating udaf expr. https://github.com/apache/arrow-datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/core/src/physical_planner.rs#L1926-L1933

Not only accumulator but also reverse_expr has the similar issue.

https://github.com/apache/arrow-datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs#L138-L150

If we have only sort_exprs: Vec<Expr>, we need to compute PhysicalSortExpr again.

I think the benefit of avoiding re-computing is large than the complexity of introducing yet another crate functions-aggregate-common. And, note that we have aggregate function in window function too. We may have functions-window-common and functions-window crates if follow the design.

Unless the cost of re-computing logical-expr to physical-expr is negligible otherwise I think able to directly mutating physical-expr is good for long term.

jayzhan211 avatar Apr 17 '24 12:04 jayzhan211

I play around with it and I think the idea of moving AggregateUDFImpl is not possible, so I give up the idea of directly mutating physical-expr.

Back to the original issue. To support reverse_expr we need to create physical-expr from logical-expr, so we need create_physical_expr in physical-expr-common .

jayzhan211 avatar Apr 20 '24 02:04 jayzhan211

🤔 it seems like the core challenge is that the implementation of the aggregate functions -- specifically https://github.com/apache/arrow-datafusion/blob/671cef85c550969ab2c86d644968a048cb181c0c/datafusion/functions-aggregate/src/first_last.rs#L102-L117

need to have some way to create a PhysicalExpr without directly depending on create_physical_expression

Rather than moving create_physical_expression around so the aggregtes could have a direct dependency (which I think is going to be tough to do without creating a cycle as create_physical_expression ), what about passing in a callback ?

Something like

pub struct AccumulatorArgs<'a> {
...
  /// Function for converting Exprs to PhysicalExprs, if needed
  create_physical_expression: &dyn Fn(&Expr) -> Result<Arc<dyn PhysicalExpr>>;

alamb avatar Apr 25 '24 14:04 alamb

@alamb AccumulatorArgs is in expr crate, PhysicalExpr is in physical-expr-common crate. And, AccumulatorArgs is used in AggregateUDFImpl, so it is not possible for it to move out expr. PhysicalExpr makes sense to stay in physical-layer. I think the idea here does not work.

jayzhan211 avatar Apr 25 '24 23:04 jayzhan211

AccumulatorArgs is used in AggregateUDFImpl

I think this is the key problem

Can we pull AccumulatorArgs into physical-expr-common?

alamb avatar Apr 26 '24 18:04 alamb

Moving AccumulatorArgs into physical-expr-common which means we need to import physical-expr-common in expr, because AggregateUDFImpl is in expr. If we consider moving AggregateUDFImpl to physical-expr-common, then, AggregateUDF and all many things need to move too, which does not make sense

jayzhan211 avatar Apr 27 '24 01:04 jayzhan211

Hmm this is a tricky refactor -- it is like a ball knot in a piece of string -- we just need to keep tugging at it and at some point it will unravel.

alamb avatar Apr 27 '24 10:04 alamb

How about we introduce physical aggregate function trait, AggregateUDFPhysicalImpl I expect that AggregateUDFPhysicalImpl handle the physical-expr, for example, accumulator and reverse_expr.

We create logical aggregate and handle optimization in logical layer. Create physical aggregate via create_physical_expr. Once we created, we never need to deal with logical-expr and physical-expr conversion. For reverse_expr, we just create another physical-expr that implement AggregateUDFPhysicalImpl.

jayzhan211 avatar Apr 29 '24 09:04 jayzhan211

Hmm this is a tricky refactor -- it is like a ball knot in a piece of string -- we just need to keep tugging at it and at some point it will unravel.

Moving AggregateUDFImpl means that we need to move Expr::AggregateFunction, and we lost logical aggregate function in logical layer, so we should consider other approach

jayzhan211 avatar Apr 29 '24 09:04 jayzhan211

I'm thinking of moving AggregateUDFImpl to aggregate-common, which imports both physical-common and expr-common. Then, we import AggregateUDFImpl in datafusion-expr.

The overall idea is to enable us to import common things to Expr::AggregateFunction which lives in datafusion-expr. Those common things mainly include AggregateUDFImpl and of course, physical-expr trait AggregateExpr. However, having a physical-expr idea in the logical layer is quite counter-intuitive. But, if we consider it common level thing, it is a possible solution. I need to play around with it to see if I miss something.

  graph TD;
      functions-aggregates-common-->expr-common;
      functions-aggregates-common-->physical-expr-common;
      functions-aggregates-->functions-aggregates-common;
      functions-aggregates-->expr;
      physical-expr-common-->expr-common;
      expr-->expr-common;
      expr-->functions-aggregates-common;
      physical-expr-->physical-expr-common;
      core-->functions-aggregates;
      core-->physical-expr;
      third-parties-aggregate --> functions-aggregates-common;

jayzhan211 avatar May 01 '24 05:05 jayzhan211

@alamb I try the idea in #10327 and it seems promising!

I would like to split the large #10327 into several ones for easier review, before that I would like to know your thought about the design and the dependency in the above graph, does it looks good?

My thought about categorize those crate expr-common: Things other than Expr and LogicalPlan can place it here expr: Mainly for Expr and LogicalPlan. Import functions-aggregate-common for UDAF. physical-common: Physical expr trait or other common things. Similar to what it is now functions-aggregate-common: Import physical-common and expr-common, for other users to build their own udaf. functions-aggregate: datafusion builtin functions

With functions-aggregate-common, we finally can have physical-expr as returns value in AggregateUDFImpl.

pub trait AggregateUDFImpl: Debug + Send + Sync {
    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
        None
    }
}

And, we import `functions-aggregate-common` to `expr`, we can have our new `AggregateUDF` in logical layer but user defined physical-expr interaction!

```rust
pub enum AggregateFunctionDefinition {
    BuiltIn(aggregate_function::AggregateFunction),
    /// Resolved to a user defined aggregate function
    UDF(Arc<function::AggregateUDF>),
    /// A aggregation function constructed with name. This variant can not be executed directly
    /// and instead must be resolved to one of the other variants prior to physical planning.
    Name(Arc<str>),
}

jayzhan211 avatar May 01 '24 12:05 jayzhan211

The overall idea is to enable us to import common things to Expr::AggregateFunction which lives in datafusion-expr.

In the chart you have in https://github.com/apache/datafusion/issues/10074#issuecomment-2088004947 it seems like it will mean that datafusion-expr will depend (indirectly) on datafusion-physical-expr which I thought was the dependency we are trying to avoid

I am sorry I am so slow to respond and getting lost here. Can we take a step back and help figure out what problem we are trying to solve

Can you remind me why datafusion-functions-aggregate (the implementation of aggregate functions) shouldn't depend on datafusion-physical-expr and datafusion-expr? I thought the thing we are trying to do is avoid datafusion-expr depending on datafusion-physical-expr 🤔

What I believe we are trying to do is to pull the aggregate functions out of datafusion-physical-expr

I am sorry if you have explained this to me already.

alamb avatar May 01 '24 21:05 alamb

@alamb First of all, we plan to pull the aggregate function out from datafusion-physical-expr. And, we also split datafusion-physical-common from datafusion-physical, so the user can only import the minimum things they need, but this is not enough. We want to handle physical-expr in UDFImpl, which is not possible given the current crate graph, because UDFImpl should be in datafusion-expr and we previously restricted ourselves from importing datafusion-physical-expr and datafusion-physical-expr-common to datafusion-expr.

But, what if we allow datafusion-expr to import physical-expr-common 🤔

We now differentiate common as the higher level crate. datafusion-common is the highest. Introduce expr-common as the second, datafusion-physical-common as the third (import common and expr-common), and aggregate-common as the fourth, which imports all other common. Allow datafusion-expr to import all the common crates including datafusion-physical-common and aggregate-common, so we can handle physical-expr concept in UDFImpl.

As long as we don't introduce things that depend on datafusion-expr into physical-expr-common (if we have the issue, I believe either moving those dependencies to datafusion-expr-common or keeping it in physical-expr would help), I think we don't need to strictly avoid any physical-expr concept things ported into datafusion-expr.

Can you remind me why datafusion-functions-aggregate (the implementation of aggregate functions) shouldn't depend on datafusion-physical-expr and datafusion-expr

datafusion-functions-aggregate can import datafusion-expr and datafusion-physical-expr-common but not datafusion-physical-expr to keep the minimum dependencies for the user.

I thought the thing we are trying to do is avoid datafusion-expr depending on datafusion-physical-expr

This idea is what blocking us! Now, I would allow datafusion-physical-expr-common dependency for datafusion-expr

jayzhan211 avatar May 02 '24 00:05 jayzhan211

But, what if we allow datafusion-expr to import physical-expr-common 🤔

I see -- this makes sense. What I am worried about is that physical-expr-common would end up with all the code from physical-expr so while it has a different name it ends up being the same issue.

I get the sense that part of the problem is that first_value and last_value are some of the most complex aggregate functions (as they can be used by window functions, and they have various other optimziations like reverse and sort aware)

I wonder if we could try starting with some of the simpler and less used aggregates like corr, stddev, covariance that use fewer APIs and have fewer features.

That might make it easier to incrementally discover how to structure the dependencies so we could extract the aggregates out of the core 🤔

alamb avatar May 03 '24 12:05 alamb

I get the sense that part of the problem is that first_value and last_value are some of the most complex aggregate functions

Actually, that is the reason why I work on it first, to ensure the design is solid for most of the functions.

What I am worried about is that physical-expr-common would end up with all the code from physical-expr

I also thought about this, but I don't think there is more we need to move the code into common, because first value covered all the cases except grouping accumulator which is not so different from normal accumulator from the perspective of moving code to physical-expr-common, both returns Accumulator. I now believe that what I had tried to do Move create_physical_expr to physical-expr-common is the wrong step, as it moved out unnecessary things, including Column physical-expr which is already moved into common.

I agree that the design deserves more discussion. I will work on covariance first.

jayzhan211 avatar May 03 '24 12:05 jayzhan211

Actually, that is the reason why I work on it first, to ensure the design is solid for most of the functions.

I agree that the design deserves more discussion. I will work on covariance first.

That makes sense to me -- thank you. I will attempt to review those PRs in a timely manner so you can iterate more quickly. I often feel I am slowing you down as I can't review the PRs fast enough

alamb avatar May 03 '24 17:05 alamb