datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

incorrect results when using NOT physical expression in `physical_expr::analyze`

Open rroelke opened this issue 1 month ago • 0 comments

Describe the bug

I am using the physical_expr::analyze function on arbitrary expressions in order to infer bounds which I can push down into another library. When the expression tree contains a NotExpr, this function erroneously refines the input intervals to None, which the documentation for ExprBoundaries indicates that this means an expression is not satisfiable within the input interval.

Example:

/// Tests analysis of `NOT (x = 0.0)` when the input
/// domain of `x` is an interval which contains zero.
#[test]
fn analyze_not_eq_around() -> DataFusionResult<()> {
    let pred_eq = Expr::Column(test_column()).eq(Expr::Literal(0f32.into(), None));
    let out_eq = analyze(bounded(-1.0, 1.0), &pred_eq)?;
    assert_eq!(
        Some(Interval::make::<f32>(Some(0.0), Some(0.0))?),
        out_eq.interval
    );

    let pred_not_eq = pred_eq.not();
    let out_not_eq = analyze(bounded(-1.0, 1.0), &pred_not_eq)?;

    // `NOT (a = 0.0)` could be true over the whole input domain.
    // The analysis result should be some interval encompassing the input domain.
    assert_eq!(
        Some(Interval::make::<f32>(Some(-1.0), Some(1.0))?),
        out_not_eq.interval
    );

    Ok(())
}

Executing this unit test results in:

thread 'analyze_not_eq_around' (242611) panicked at src/lib.rs:83:5:
assertion `left == right` failed
  left: Some(Interval { lower: Float32(-1), upper: Float32(1) })
 right: None

To Reproduce

.rs is not a supported file type apparently, so below are the contents of my minimal reproducer, which can be executed using cargo test as a standalone src/lib.rs.

//! Docs for `ExprBoundaries::interval`:
//!
//! "Minimum and maximum values this expression can have. A None value indicates that evaluating
//! the given column results in an empty set. For example, if the column a has values in the
//! range [10, 20], and there is a filter asserting that a > 50, then the resulting interval
//! range of a will be None."
//!
//! We are passing a set of input bounds and a predicate to `physical_expr::analyze` and then
//! make an inference from the `interval` fields of the returned bounds.
//! We make the following inferences:
//! - if a column's output interval is a subset of the input interval of the same column,
//!   then the predicate is always false in the region contains outside of the output
//!   interval, and could be either true or false within the output interval
//! - if a column's output interval is `None` then the predicate is always false
//!   over the entirety of the input interval
//! - otherwise, we can make no inference as to how the predicate could evaluate
//!   within the input interval
//!
//! This module provides some unit tests demonstrating some examples where
//! those inferences do not appear to hold.

#[cfg(test)]
mod tests {

    use std::ops::Not;

    use datafusion::common::arrow::datatypes::{DataType, Field, Schema};
    use datafusion::common::stats::Precision;
    use datafusion::common::{
        Column, ColumnStatistics, DFSchema, Result as DataFusionResult, ScalarValue,
    };
    use datafusion::logical_expr::Expr;
    use datafusion::logical_expr_common::interval_arithmetic::Interval;
    use datafusion::physical_expr::{self, AnalysisContext, ExprBoundaries};

    /// Analyzes a predicate on one variable using the bounds provided by `stats`.
    ///
    /// Returns an `ExprBoundaries` which describes the output.
    fn analyze(stats: ColumnStatistics, pred: &Expr) -> DataFusionResult<ExprBoundaries> {
        let schema =
            DFSchema::try_from(Schema::new(vec![Field::new("a", DataType::Float32, true)]))?;
        let analysis_context = AnalysisContext::new(vec![ExprBoundaries::try_from_column(
            schema.inner(),
            &stats,
            0,
        )?]);
        let phys = physical_expr::create_physical_expr(pred, &schema, &Default::default())?;
    
        let analysis_out =
            physical_expr::analyze(&phys, analysis_context.clone(), schema.as_arrow())?;
        Ok(analysis_out.boundaries[0].clone())
    }

    #[test]
    fn analyze_not_eq_clamped() -> DataFusionResult<()> {
        let pred_eq = Expr::Column(test_column()).eq(Expr::Literal(0f32.into(), None));
        let out_eq = analyze(bounded(0.0, 0.0), &pred_eq)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(0.0), Some(0.0))?),
            out_eq.interval
        );
    
        let pred_not_eq = pred_eq.not();
        let out_not_eq = analyze(bounded(0.0, 0.0), &pred_not_eq)?;
    
        // `NOT (a = 0.0)` is always false over the input domain `[0.0, 0.0]`.
        // The analysis result should be `None`.
        assert_eq!(None, out_not_eq.interval);

        Ok(())
    }

    /// Tests analysis of `NOT (x = 0.0)` when the input
    /// domain of `x` is an interval which contains zero.
    #[test]
    fn analyze_not_eq_around() -> DataFusionResult<()> {
        let pred_eq = Expr::Column(test_column()).eq(Expr::Literal(0f32.into(), None));
        let out_eq = analyze(bounded(-1.0, 1.0), &pred_eq)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(0.0), Some(0.0))?),
            out_eq.interval
        );

        let pred_not_eq = pred_eq.not();
        let out_not_eq = analyze(bounded(-1.0, 1.0), &pred_not_eq)?;

        // `NOT (a = 0.0)` could be true over the whole input domain.
        // The analysis result should be some interval encompassing the input domain.
        assert_eq!(
            Some(Interval::make::<f32>(Some(-1.0), Some(1.0))?),
            out_not_eq.interval
        );

        Ok(())
    }

    /// Tests analysis of `x BETWEEN low AND high` when the
    /// input domain of `x` is exactly `[low, high]`.
    #[test]
    fn analyze_not_between_clamped() -> DataFusionResult<()> {
        let pred_between = Expr::Column(test_column()).between(
            Expr::Literal(ScalarValue::Float32(Some(-1.0f32)), None),
            Expr::Literal(ScalarValue::Float32(Some(1.0f32)), None),
        );
        let out_between = analyze(bounded(-1.0, 1.0), &pred_between)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(-1.0), Some(1.0))?),
            out_between.interval
        );

        let pred_between_negated = Expr::Column(test_column()).not_between(
            Expr::Literal(ScalarValue::Float32(Some(-1.0f32)), None),
            Expr::Literal(ScalarValue::Float32(Some(1.0f32)), None),
        );
        let out_between_negated = analyze(bounded(-1.0, 1.0), &pred_between_negated)?;
        assert_eq!(None, out_between_negated.interval);

        let pred_not_between = pred_between.not();
        let out_not_between = analyze(bounded(-1.0, 1.0), &pred_not_between)?;
        assert_eq!(None, out_not_between.interval);

        Ok(())
    }

    /// Tests analysis of `x BETWEEN low AND high` when the
    /// input domain of `x` is an interval entirely containing `[low, high]`,
    /// say `[x_low, x_high]`.
    ///
    /// The analysis of `x BETWEEN low and high` should refine `x` to `[low, high]` exactly.
    ///
    /// When `x BETWEEN low AND high` is negated, the expression is always true outside
    /// of `[low, high]`, which should be `[x_low, low) U (high, x_high]`.
    /// Since analysis only produces a single interval, that single interval should
    /// just be `[x_low, x_high]`, meaning that no refinement occurred.
    #[test]
    fn analyze_not_between_containing() -> DataFusionResult<()> {
        let x_low = -2.0f32;
        let low = -1.0f32;
        let high = 1.0f32;
        let x_high = 2.0f32;

        let pred_between = Expr::Column(test_column()).between(
            Expr::Literal(ScalarValue::Float32(Some(low)), None),
            Expr::Literal(ScalarValue::Float32(Some(high)), None),
        );
        let out_between = analyze(bounded(x_low, x_high), &pred_between)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(low), Some(high))?),
            out_between.interval
        );

        let pred_between_negated = Expr::Column(test_column()).not_between(
            Expr::Literal(ScalarValue::Float32(Some(low)), None),
            Expr::Literal(ScalarValue::Float32(Some(high)), None),
        );
        let out_between_negated = analyze(bounded(x_low, x_high), &pred_between_negated)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(x_low), Some(x_high))?),
            out_between_negated.interval
        );

        let pred_not_between = pred_between.not();
        let out_not_between = analyze(bounded(x_low, x_high), &pred_not_between)?;
        assert_eq!(
            Some(Interval::make::<f32>(Some(x_low), Some(x_high))?),
            out_not_between.interval
        );

        Ok(())
    }

    fn test_column() -> Column {
        Column::new_unqualified("a")
    }

    fn bounded(low: f32, high: f32) -> ColumnStatistics {
        ColumnStatistics::new_unknown()
            .with_min_value(Precision::Exact(ScalarValue::from(low).into()))
            .with_max_value(Precision::Exact(ScalarValue::from(high)))
    }
}

Expected behavior

If the input column is bounded in [in_low, in_high], and the expression which is negated refines the bounds to [out_low, out_high], then the most precise result would be for NotExpr to be able to return [in_low, out_low) U (out_high, in_high].

This almost certainly requires API changes, so an acceptable result would be to return [in_low, in_high] instead, which at least would not exclude any of the region where the expression could evaluate to true.

Additional context

I am able to work around the issue by walking the physical expression tree and checking for NotExpr to avoid bounds inference.

rroelke avatar Dec 10 '25 17:12 rroelke