datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

User Defined Table Function (udtf) support

Open gandronchik opened this issue 3 years ago • 20 comments

UDTF support (User-defined functions returning table)

In my understanding table function returns multiple rows. For now, we have only UDF which returns a scalar value.

I don't think it should return multiply columns, structures are usually used for this.

we have the following cases:

1. select table_fun(1, 5);

generate_series(Int64(1),Int64(5))`
------------------------------------
                                  1
                                  2
                                  3
                                  4
                                  5
(5 rows)
 Projection: #generate_series(Int64(1),Int64(5)) +
   TableUDFs: generate_series(Int64(1), Int64(5))+
     EmptyRelation

it is the easiest scenario. The function just returns vec of values.

2. select table_fun(1, col) from (select 2 col union all select 3 col) t;

 generate_series(Int64(1),t.col)
---------------------------------
                               1
                               2
                               3
                               1
                               2
(5 rows)
Projection: #generate_series(Int64(1),t.col)  +
   TableUDFs: generate_series(Int64(1), #t.col)+
     Projection: #t.col, alias=t               +
       Union                                   +
         Projection: Int64(2) AS col           +
           EmptyRelation                       +
         Projection: Int64(3) AS col           +
           EmptyRelation

The function returns a batch.

3. select col, table_fun(1, col) from (select 2 col union all select 3 col) t;

col | generate_series(Int64(1),t.col)
-----+---------------------------------
   3 |                               1
   3 |                               2
   3 |                               3
   2 |                               1
   2 |                               2
(5 rows)
Projection: #t.col, #generate_series(Int64(1),t.col)+
   TableUDFs: generate_series(Int64(1), #t.col)      +
     Projection: #t.col, alias=t                     +
       Union                                         +
         Projection: Int64(2) AS col                 +
           EmptyRelation                             +
         Projection: Int64(3) AS col                 +
           EmptyRelation

it is the most difficult case. In this case, we have to transform data flow, because as you can see from the result, we have to duplicate col for each row of table_fun result.

4. select * from table_fun(1, 5);

 generate_series(Int64(1),Int64(5))
------------------------------------
                                  1
                                  2
                                  3
                                  4
                                  5
(5 rows)
Projection: #generate_series(Int64(1),Int64(5)) +
   TableUDFs: generate_series(Int64(1), Int64(5))+
     EmptyRelation

In this case, in this case, the result is the same as in the first case. However, we have another plan structure here.

5. select * from table_fun(1, 5) t(n);

 n
---
 1
 2
 3
 4
 5
(5 rows)
Projection: #t.n                                               +
   Projection: #generate_series(Int64(1),Int64(5)) AS n, alias=t+
     TableUDFs: generate_series(Int64(1), Int64(5))             +
       EmptyRelation

It looks the same with the previous case, however we have a bit different plan here to support alias (because table_fun node not support aliases and we have to add projection).

Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec<ColumnarValue>, this will require a lot of memory in case of a request for millions of rows.

gandronchik avatar Apr 08 '22 12:04 gandronchik

BTW, from clippy:

error: unneeded `return` statement
   --> datafusion/core/src/physical_plan/functions.rs:752:9
    |
752 |         return Ok(ColumnarValue::Array(result));
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: remove `return`: `Ok(ColumnarValue::Array(result))`
    |
    = note: `-D clippy::needless-return` implied by `-D warnings`
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return

error: could not compile `datafusion` due to previous error
warning: build failed, waiting for other jobs to finish...
error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
    --> datafusion/core/src/execution/context.rs:3527:32
     |
3527 |             let start_number = start_arr.into_iter().nth(0).unwrap().unwrap_or(0);
     |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `start_arr.into_iter().next()`
     |
     = note: `-D clippy::iter-nth-zero` implied by `-D warnings`
     = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero

error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
    --> datafusion/core/src/execution/context.rs:3533:30
     |
3533 |             let end_number = end_arr.into_iter().nth(0).unwrap().unwrap_or(0) + 1;
     |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `end_arr.into_iter().next()`
     |
     = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero

error: build failed

xudong963 avatar Apr 09 '22 12:04 xudong963

Hmm, is TableFunction an expression 🤔? refer to https://docs.snowflake.com/en/sql-reference/functions-table.html The sql looks usually like

select doi.date as "Date", record_temperatures.city, record_temperatures.temperature
    from dates_of_interest as doi,
         table(record_high_temperatures_for_date(doi.date)) as record_temperatures;

It shouldn't be an expression, right?

doki23 avatar Apr 10 '22 07:04 doki23

Hmmmm...I have some problems about this pr. If we treat UDTF as an expression, does it mean that it can only produce one column? As I mentioned before (https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1094208748), it's more like a table so that we can select * from it and get any number of columns. I'm confused, would you please explain it to me? @alamb @gandronchik

doki23 avatar Apr 14 '22 08:04 doki23

Hmmmm...I have some problems about this pr. If we treat UDTF as an expression, does it mean that it can only produce one column? As I mentioned before (#2177 (comment)), it's more like a table so that we can select * from it and get any number of columns. I'm confused, would you please explain it to me? @alamb @gandronchik

I had the same question. I'm not sure I understand how this is different from a scalar function. It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.

thinkharderdev avatar Apr 15 '22 10:04 thinkharderdev

It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.

I agree it should definitely produce RecordBatch

alamb avatar Apr 15 '22 14:04 alamb

what about Result<Vec<ColumnarValue>>. I already almost implemented it this way:)

It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.

I agree it should definitely produce RecordBatch

gandronchik avatar Apr 15 '22 18:04 gandronchik

what about Result<Vec<ColumnarValue>>. I already almost implemented it this way:)

It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.

I agree it should definitely produce RecordBatch

That's essentially a RecordBatch :)

You could have

pub type TableFunctionImplementation =
    Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;

// This is a terrible name but this would be analogous to ReturnTypeFunction/StateTypeFunction
pub type TableSchemaFunction = 
    Arc<dyn Fn(&[DataType]) -> Result<SchemaRef> + Send + Sync>; 

thinkharderdev avatar Apr 15 '22 20:04 thinkharderdev

@alamb @thinkharderdev @doki23 i met the same problem in #2343

if we treat it as a Expr , we need change it to PhysicalExpr but

/// Evaluate an expression against a RecordBatch
    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
pub enum ColumnarValue {
    /// Array of values
    Array(ArrayRef),
    /// A single value
    Scalar(ScalarValue),
}

cause of it return ColumnarValue, we can not return result as a table, am i right?

Should i implement a TablePhysicalExpr using

  fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>;

Ted-Jiang avatar Apr 26 '22 13:04 Ted-Jiang

@alamb @thinkharderdev @doki23 i met the same problem in https://github.com/apache/arrow-datafusion/issues/2343

I left some thoughts in

https://github.com/apache/arrow-datafusion/issues/2343#issuecomment-1110222756

alamb avatar Apr 26 '22 20:04 alamb

I plan to give this a more careful review tomorrow

alamb avatar Apr 27 '22 18:04 alamb

I don't think it should return multiply columns, structures are usually used for this.

I cannot agree. Result of Table Function represents a temporary table. Since it's a table, it shouldn't only have one column. Of course, one column of type structure can solve the problem, but it's different. We cannot directly execute order by or other query on it if we don't extract the structure.

doki23 avatar Apr 29 '22 11:04 doki23

@gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o.

I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)

Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.

The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with Streams but then that requires interacting with rust's async ecosystem which is non trivial

If you wanted a streaming solution, that would mean the signature might look something like the following (maybe)

Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;

alamb avatar Apr 29 '22 18:04 alamb

@gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o.

I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)

Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.

The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with Streams but then that requires interacting with rust's async ecosystem which is non trivial

If you wanted a streaming solution, that would mean the signature might look something like the following (maybe)

Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;

Looks like I got the title wrong. I have implemented a function that returns many rows, probably it is not a table function. If I rename it, will it be fine?

Regarding the function signature, I think my solution is a compromise between vec and streaming. Actually, I don't think that function can return so many rows. However, of course, I will rewrite it if you want. So which solution do we choose: current Result<(ArrayRef, Vec<usize>)> + Send + Sync>, Result<Vec<ColumnarValue>> + Send + Sync> or Result<Box< dyn SendableRecordBatchStream>> + Send + Sync> ?

gandronchik avatar May 05 '22 12:05 gandronchik

I think adding UDTFs (aka user defined table functions) that produce a 2 dimensional table output (aka Vec<RecordBatch> or a SendableRecordBatchStream) would be a valuable addition to DataFusion.

I think Spark calls these "table value functions":

https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-tvf.html

Postgres calls them table functions:

https://www.postgresql.org/docs/7.3/xfunc-tablefunctions.html

However, this PR does not implement table functions that I can see. I still don't fully understand the usecase for the code in this PR for a function that returns a single column of values and I don't know of any other system that implements such functions. Thus I feel that this PR adds a feature that is not widely usable to DataFusion users as a whole, and so I don't feel I can approve it.

If others (users or maintainers) have a perspective on this issue, I would love to hear them too. If there is broader support for this feature, I won't oppose merging it.

alamb avatar May 24 '22 13:05 alamb

marking as draft until we figure out what to do with this

alamb avatar Jun 07 '22 17:06 alamb

@alamb Hello! Sorry for the long response.

I am sorry for so big PR with so a bad description.

Now I try to explain what is happening here. Honestly, I made mistake with the naming. I supported Set Returning Function. (https://www.postgresql.org/docs/current/functions-srf.html)

As I know DataFunction is oriented on PostgreSQL behavior. So, the functionality I provide here is Postgres functionality.

We already use it in Cube.js. We implemented a several functions:

  • generate_series (https://www.postgresql.org/docs/current/functions-srf.html)
  • generate_subscripts (https://www.postgresql.org/docs/current/functions-srf.html)
  • unnest (https://www.postgresql.org/docs/current/functions-array.html)

Please, look at my PR closer. I am ready to improve it, rename some structures, etc.

Bellow, I provide the implementation of generate_series function (real Postgres function):

macro_rules! generate_series_udtf {
    ($ARGS:expr, $TYPE: ident, $PRIMITIVE_TYPE: ident) => {{
        let mut section_sizes: Vec<usize> = Vec::new();
        let l_arr = &$ARGS[0].as_any().downcast_ref::<PrimitiveArray<$TYPE>>();
        if l_arr.is_some() {
            let l_arr = l_arr.unwrap();
            let r_arr = downcast_primitive_arg!($ARGS[1], "right", $TYPE);
            let step_arr = PrimitiveArray::<$TYPE>::from_value(1 as $PRIMITIVE_TYPE, 1);
            let step_arr = if $ARGS.len() > 2 {
                downcast_primitive_arg!($ARGS[2], "step", $TYPE)
            } else {
                &step_arr
            };

            let mut builder = PrimitiveBuilder::<$TYPE>::new(1);
            for (i, (start, end)) in l_arr.iter().zip(r_arr.iter()).enumerate() {
                let step = if step_arr.len() > i {
                    step_arr.value(i)
                } else {
                    step_arr.value(0)
                };

                let start = start.unwrap();
                let end = end.unwrap();
                let mut section_size: i64 = 0;
                if start <= end && step > 0 as $PRIMITIVE_TYPE {
                    let mut current = start;
                    loop {
                        if current > end {
                            break;
                        }
                        builder.append_value(current).unwrap();

                        section_size += 1;
                        current += step;
                    }
                }
                section_sizes.push(section_size as usize);
            }

            return Ok((Arc::new(builder.finish()) as ArrayRef, section_sizes));
        }
    }};
}

pub fn create_generate_series_udtf() -> TableUDF {
    let fun = make_table_function(move |args: &[ArrayRef]| {
        assert!(args.len() == 2 || args.len() == 3);

        if args[0].as_any().downcast_ref::<Int64Array>().is_some() {
            generate_series_udtf!(args, Int64Type, i64)
        } else if args[0].as_any().downcast_ref::<Float64Array>().is_some() {
            generate_series_udtf!(args, Float64Type, f64)
        }

        Err(DataFusionError::Execution(format!("Unsupported type")))
    });

    let return_type: ReturnTypeFunction = Arc::new(move |tp| {
        if tp.len() > 0 {
            Ok(Arc::new(tp[0].clone()))
        } else {
            Ok(Arc::new(DataType::Int64))
        }
    });

    TableUDF::new(
        "generate_series",
        &Signature::one_of(
            vec![
                TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
                TypeSignature::Exact(vec![DataType::Int64, DataType::Int64, DataType::Int64]),
                TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]),
                TypeSignature::Exact(vec![
                    DataType::Float64,
                    DataType::Float64,
                    DataType::Float64,
                ]),
            ],
            Volatility::Immutable,
        ),
        &return_type,
        &fun,
    )
}

gandronchik avatar Jun 10 '22 09:06 gandronchik

Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above.

alamb avatar Jun 12 '22 09:06 alamb

Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above.

@alamb Hello! Have you had already time to check the PR?

gandronchik avatar Jun 26 '22 15:06 gandronchik

@alamb Hello! Have you had already time to check the PR?

Hi @gandronchik sadly I have not had a chance. I apologize for my lack of bandwidth but it is hard to find sufficient contiguous time to review such large PRs when I don't have the background context.

My core problem is that I don't understand (despite your admirable attempts to clarify) what this PR is trying to implement, so it is very hard to evaluate the code to see if it is implementing what is desired (because I don't understand what is desired).

For example, all the examples of "set returning functions" in the links you shared in postgres appear to use those functions as elements in the FROM clause. For example,

select * from unnest(ARRAY[1,2], ARRAY['foo','bar','baz']) as x(a,b) →

So I am struggling to understand examples you share in the PR's description that show using these functions in combination with a column 🤔

select table_fun(1, col) from (select 2 col union all select 3 col) t;

So what would you think about implementing more general user defined table functions (that can return RecordBatches / streams as we have discussed above)? I think others would also likely use such functionality and it seems like it would satisfy the usecases from cube.js (?)

alamb avatar Jun 28 '22 19:06 alamb

@alamb Hello! I think it will be easier to understand what I implemented here if you check how generate_series function works in Postgres. Just try to call the following requests:

1. select generate_series(1, 5);
2. select generate_series(1, n) from (select 2 n union all select 3 n) x;
3. select n, generate_series(1, n) from (select 2 n union all select 3 n) x;
4. select col from generate_series(1, 5) fun(col);

Before these changes, DataFusion had only udf (returns only one row per each input row) and udaf (returns one row per any count of input rows). My changes allow to return multiply rows per each input row.

gandronchik avatar Jun 29 '22 09:06 gandronchik

This PR is more than 6 month old, so closing it down for now to clean up the PR list. Please reopen if this is a mistake and you plan to work on it more

alamb avatar Jan 14 '23 11:01 alamb