datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Add single-step versions of ExecutionPlan::execute() and PhysicalExpr::evaluate() methods

Open iajoiner opened this issue 2 years ago • 3 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

We would like to implement new ExecutionPlans and PhysicalExprs that are very similar to existing ones in DataFusion but with some side effects. This requires revising ExecutionPlan::execute() and PhysicalExpr::evaluate() methods. The problem we have is that we end up re-evaluating and re-executing a lot just for our additional side effects to happen.

Describe the solution you'd like

We need single step, public versions of ExecutionPlan::execute() and PhysicalExpr::evaluate() methods with the execution/evaluation results of children of a node given as parameters.

Describe alternatives you've considered

I have considered copying execute and evaluate code en masse such as implementing our own slightly tweaked versions of datafusion::physical_plan::ProjectionStream which can fix the problem in the short term. However this will make upgrading almost impossible.

Additional context

I will file a PR to contribute this feature if accepted.

iajoiner avatar Aug 02 '22 19:08 iajoiner

@iajoiner I don't quite follow what you want to do...

Is it like you want a way to override PhysicalExpr::execute for all nodes?

Have you considered a wrapper that does your side effect and then calls into the node something like:


struct PhysicalExprWithEffects {
  inner: Arc<dyn PhysicalExpr> 
}

impl PhysicalExpr for PhysicalExprWithEffects
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
      // TODO: add your side effects here
      // then call the inner expr:
      self.inner.evaluate(batch)
    } 
  ...
}

Then you could use the same operators, etc

One challenge with this approach is that at the moment there is no good way to traverse down a tree of PhysicalExpr so you may have to add something like

pub trait PhysicalExpr {
....
  fn children(&self) -> [&dyn PhysicalExpr];
}

to the trait

alamb avatar Aug 03 '22 13:08 alamb

@alamb I'm already using wrappers on my side. The issue I currently have is that self.inner.evaluate(batch) evaluates all the children of the raw PhysicalExpr but none of the side effects for the children actually take place. So what happens right now is that I had to manually call evaluate for wrapped children, leading to evaluate being called way too many times for inner nodes.

What I want to achieve is to add one public method per every non-leaf node.

E.g. for IsNullExpr:

https://github.com/apache/arrow-datafusion/blob/master/datafusion/physical-expr/src/expressions/is_null.rs

pub fn single_step_evaluate(&self, input: ColumnarValue) -> Result<ColumnarValue> {
    match input {
        ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
            compute::is_null(array.as_ref())?,
        ))),
        ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
            ScalarValue::Boolean(Some(scalar.is_null())),
        )),
    }
}

Then evaluate() is simply

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
    let arg = self.arg.evaluate(batch)?;
    self.single_step_evaluate(arg)
}

That way I will simply call single_step_evaluate in IsNullExprWithSideEffects::evaluate() and handle the issue of evaluating children myself, avoiding repeated evaluation of nodes.

iajoiner avatar Aug 03 '22 19:08 iajoiner

@iajoiner I see -- you want to annotate the entire tree

Here is what I recommend:

  1. add children() and new_with_children to the PhysicalExpr trait that will allow you to walk the tree, following the model of ExecutionPlan:

https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.children https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.with_new_children

pub trait PhysicalExpr {
....
  fn children(&self) -> [&dyn PhysicalExpr];
}
  1. write a function that recursively wraps all nodes in a a tree with your wrapper (bonus points for writing something like ExprRewriter but for PhysicalExprs - see https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlanVisitor.html and https://docs.rs/datafusion/10.0.0/datafusion/logical_plan/trait.ExprRewriter.html

for inspiration

fn wrap_one_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
 // make whatever wrapper type you want here
 Arc::new(MyWrapper(expr))
}

/// Calls `wrap_one_expr` on all children in a tree of `PhysicalExpr`:
fn wrap_all_nodes(root: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
  // recursively wrap all children:
  let wrapped_children = root
    .children()
    .into_iter()
    .map(wrap_all_nodes)
    .collect::<Vec<_>>();
  
  // create newly wrapped root node
  wrap_one_expr(root.new_with_children(wrapped_children))
}

What do you think?

alamb avatar Aug 03 '22 21:08 alamb