partiql-lang-rust icon indicating copy to clipboard operation
partiql-lang-rust copied to clipboard

PLR evaluates both branches of a join even if the first branch fails

Open sadderchris opened this issue 1 year ago • 0 comments

This is a bit of a weird corner case, but imagine I have the following fragment:

SELECT *
    FROM dummy_fun_1() AS a
    LEFT JOIN dummy_fun_2() AS b

where dummy_fun_1 and dummy_fun_2 are table-valued UDFs.

If dummy_fun_1() fails before ever providing any values, dummy_fun_2() is still executed even after dummy_fun_1() fails.

Concretely:

    use partiql_catalog::call_defs::{CallDef, CallSpec, CallSpecArg};
    use partiql_catalog::{Extension, TableFunction};
    #[derive(Debug)]
    struct DummyCatalogExtensions;
    impl Extension for DummyCatalogExtensions {
        fn name(&self) -> String {
            "dummy".to_string()
        }

        fn load(
            &self,
            catalog: &mut dyn partiql_catalog::Catalog,
        ) -> Result<(), Box<dyn std::error::Error>> {
            catalog.add_table_function(TableFunction::new(Box::new(DummyFun1::new())))?;
            catalog.add_table_function(TableFunction::new(Box::new(DummyFun2::new())))?;
            Ok(())
        }
    }

    #[derive(Debug)]
    struct DummyFun1 {
        call_def: CallDef,
    }

    impl DummyFun1 {
        fn new() -> Self {
            Self {
                call_def: CallDef {
                    names: vec!["dummy_fun_1"],
                    overloads: vec![CallSpec {
                        input: vec![CallSpecArg::Positional],
                        output: Box::new(|args| {
                            partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
                                name: partiql_logical::CallName::ByName("dummy_fun_1".to_string()),
                                arguments: args,
                            })
                        }),
                    }],
                },
            }
        }
    }

    impl partiql_catalog::BaseTableFunctionInfo for DummyFun1 {
        fn call_def(&self) -> &CallDef {
            &self.call_def
        }

        fn plan_eval(&self) -> Box<dyn partiql_catalog::BaseTableExpr> {
            Box::new(DummyFun1Eval)
        }
    }

    #[derive(Debug)]
    struct DummyFun1Eval;
    impl partiql_catalog::BaseTableExpr for DummyFun1Eval {
        fn evaluate<'c>(
            &self,
            args: &[std::borrow::Cow<'_, Value>],
            ctx: &'c dyn partiql_catalog::context::SessionContext<'c>,
        ) -> partiql_catalog::BaseTableExprResult<'c> {
            println!("I'm about to end this plan's entire career");
            Err("this is an error".into())
        }
    }

    #[derive(Debug)]
    struct DummyFun2 {
        call_def: CallDef,
    }

    impl DummyFun2 {
        fn new() -> Self {
            Self {
                call_def: CallDef {
                    names: vec!["dummy_fun_2"],
                    overloads: vec![CallSpec {
                        input: vec![CallSpecArg::Positional],
                        output: Box::new(|args| {
                            partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
                                name: partiql_logical::CallName::ByName("dummy_fun_2".to_string()),
                                arguments: args,
                            })
                        }),
                    }],
                },
            }
        }
    }

    impl partiql_catalog::BaseTableFunctionInfo for DummyFun2 {
        fn call_def(&self) -> &CallDef {
            &self.call_def
        }

        fn plan_eval(&self) -> Box<dyn partiql_catalog::BaseTableExpr> {
            Box::new(DummyFun2Eval)
        }
    }

    #[derive(Debug)]
    struct DummyFun2Eval;
    impl partiql_catalog::BaseTableExpr for DummyFun2Eval {
        fn evaluate<'c>(
            &self,
            args: &[std::borrow::Cow<'_, Value>],
            ctx: &'c dyn partiql_catalog::context::SessionContext<'c>,
        ) -> partiql_catalog::BaseTableExprResult<'c> {
            println!("Hello from dummy fun 2!");
            Err("some error in dummy fun 2".into())
        }
    }

    #[test]
    fn left_join_evaluates_both_branches_even_if_left_branch_fails() {
        let mut logical_plan = LogicalPlan::new();
        let join_op_id =
            logical_plan.add_operator(partiql_logical::BindingsOp::Join(partiql_logical::Join {
                kind: partiql_logical::JoinKind::Left,
                left: Box::new(partiql_logical::BindingsOp::Scan(partiql_logical::Scan {
                    expr: partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
                        name: partiql_logical::CallName::ByName("dummy_fun_1".to_string()),
                        arguments: vec![],
                    }),
                    as_key: "_1".to_string(),
                    at_key: None,
                })),
                right: Box::new(partiql_logical::BindingsOp::Scan(partiql_logical::Scan {
                    expr: partiql_logical::ValueExpr::Call(partiql_logical::CallExpr {
                        name: partiql_logical::CallName::ByName("dummy_fun_2".to_string()),
                        arguments: vec![],
                    }),
                    as_key: "_2".to_string(),
                    at_key: None,
                })),
                on: None,
            }));
        let sink_op_id = logical_plan.add_operator(partiql_logical::BindingsOp::Sink);
        logical_plan.add_flow(join_op_id, sink_op_id);
        // I've omitted the `SELECT *` from here, but it doesn't change things

        let bindings = MapBindings::default();
        let mut catalog = PartiqlCatalog::default();
        DummyCatalogExtensions.load(&mut catalog).unwrap();
        let mut planner = plan::EvaluatorPlanner::new(EvaluationMode::Strict, &catalog);
        let mut plan = planner.compile(&logical_plan).expect("Expect no plan error");
        println!("{}", plan.to_dot_graph());
        let sys = SystemContext {
            now: DateTime::from_system_now_utc(),
        };
        let ctx = BasicContext::new(bindings, sys);
        let _ = dbg!(plan.execute_mut(&ctx));
    }

Pasting this into one of the test modules in the partiql-logical-planner subcrate and running:

$ cargo test -p partiql-logical-planner -- left_join_evaluates_both_branches_even_if_left_branch_fails --nocapture

<snip>

     Running unittests src/lib.rs (target/debug/deps/partiql_logical_planner-ffc8a2604465b618)

running 1 test
digraph {
    0 [ label = "Left JOIN" ]
    1 [ label = "SINK" ]
    0 -> 1 [ label = "0" ]
}

I'm about to end this plan's entire career
Hello from dummy fun 2!
[partiql-logical-planner/src/lib.rs:300:17] plan.execute_mut(&ctx) = Err(
    EvalErr {
        errors: [
            ExtensionResultError(
                "this is an error",
            ),
            ExtensionResultError(
                "some error in dummy fun 2",
            ),
        ],
    },
)
test tests::left_join_evaluates_both_branches_even_if_left_branch_fails ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 9 filtered out; finished in 0.00s

   Doc-tests partiql-logical-planner

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

I don't think this should happen -- I would expect that plan execution would halt after the first failure instead barrelling onwards. Worse yet, dummy_fun_2() will get passed arguments it may not be expecting if dummy_fun_2()'s arguments depend on the output from dummy_fun_1(), as all of those arguments are set to MISSING.

I haven't tried this with other kinds of joins, but I suspect there are probably similar issues there.

sadderchris avatar Apr 17 '24 23:04 sadderchris