embucket-labs icon indicating copy to clipboard operation
embucket-labs copied to clipboard

Add support for `LATERAL FLATTEN`

Open Vedin opened this issue 7 months ago • 3 comments

It's no separate table function FLATTEN https://docs.snowflake.com/en/sql-reference/functions/flatten. To use it we need TABLE or LATERAL. We have such steps to implement it:

  1. Support syntax for FLATTEN (done).
  2. Make sure Flatten works with JOIN.
  3. Support for LATERAL with correlated subqueries.
  4. Support for LATERAL FLATTEN

Vedin avatar May 13 '25 11:05 Vedin

It's no separate table function FLATTEN

@Vedin we have https://github.com/Embucket/embucket/blob/main/crates/df-builtins/src/table/flatten.rs Or what do you mean?

ravlio avatar May 13 '25 11:05 ravlio

It's no separate table function FLATTEN

@Vedin we have https://github.com/Embucket/embucket/blob/main/crates/df-builtins/src/table/flatten.rs Or what do you mean?

I meant that in Snowflake, you can't use FLATTEN directly. To see a result, you should wrap it in the TABLE function or use it with LATERAL.

Vedin avatar May 13 '25 11:05 Vedin

The reason of the issue that we doesn't support table faction as relation. https://github.com/apache/datafusion/blob/main/datafusion/sql/src/relation/mod.rs#L157 Both TableFunction and Function

create table test (jsontext text);
insert into table test (jsontext) values('{"key": "value"}');
 SELECT
  jsontext
FROM  test
INNER JOIN LATERAL  flatten(INPUT => PARSE_JSON(jsontext)) d;

can be implemented like

            TableFactor::Function {
                lateral,
                name,
                args,
                alias,
            } => {
                let tbl_func_name = self.object_name_to_table_reference(name)?;
                let schema = DFSchema::empty();
                let func_args = args
                    .into_iter()
                    .map(|arg| match arg {
                        FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => {
                            let expr = self.sql_expr_to_logical_expr(expr, &schema, planner_context)?;
                            Ok((expr, None))
                        }
                        FunctionArg::Named { name, arg: FunctionArgExpr::Expr(expr), .. } => {
                            let expr = self.sql_expr_to_logical_expr(expr, &schema, planner_context)?;
                            Ok((expr, Some(name.value.clone())))
                        }
                        _ => plan_err!("Unsupported function argument: {arg:?}"),
                    })
                    .collect::<Result<Vec<(Expr, Option<String>)>>>()?;

                let provider = self
                    .context_provider
                    .get_table_function_source(tbl_func_name.table(), func_args)?;
                let plan = LogicalPlanBuilder::scan(
                    tbl_func_name,
                    provider,
                    None,
                )?
                    .build()?;
                (plan, alias)
            }

osipovartem avatar Jun 23 '25 14:06 osipovartem