datafusion
datafusion copied to clipboard
User Defined Table Function (udtf) support
UDTF support (User-defined functions returning table)
In my understanding table function returns multiple rows. For now, we have only UDF which returns a scalar value.
I don't think it should return multiply columns, structures are usually used for this.
we have the following cases:
1. select table_fun(1, 5);
generate_series(Int64(1),Int64(5))`
------------------------------------
1
2
3
4
5
(5 rows)
Projection: #generate_series(Int64(1),Int64(5)) +
TableUDFs: generate_series(Int64(1), Int64(5))+
EmptyRelation
it is the easiest scenario. The function just returns vec of values.
2. select table_fun(1, col) from (select 2 col union all select 3 col) t;
generate_series(Int64(1),t.col)
---------------------------------
1
2
3
1
2
(5 rows)
Projection: #generate_series(Int64(1),t.col) +
TableUDFs: generate_series(Int64(1), #t.col)+
Projection: #t.col, alias=t +
Union +
Projection: Int64(2) AS col +
EmptyRelation +
Projection: Int64(3) AS col +
EmptyRelation
The function returns a batch.
3. select col, table_fun(1, col) from (select 2 col union all select 3 col) t;
col | generate_series(Int64(1),t.col)
-----+---------------------------------
3 | 1
3 | 2
3 | 3
2 | 1
2 | 2
(5 rows)
Projection: #t.col, #generate_series(Int64(1),t.col)+
TableUDFs: generate_series(Int64(1), #t.col) +
Projection: #t.col, alias=t +
Union +
Projection: Int64(2) AS col +
EmptyRelation +
Projection: Int64(3) AS col +
EmptyRelation
it is the most difficult case. In this case, we have to transform data flow, because as you can see from the result, we have to duplicate col for each row of table_fun result.
4. select * from table_fun(1, 5);
generate_series(Int64(1),Int64(5))
------------------------------------
1
2
3
4
5
(5 rows)
Projection: #generate_series(Int64(1),Int64(5)) +
TableUDFs: generate_series(Int64(1), Int64(5))+
EmptyRelation
In this case, in this case, the result is the same as in the first case. However, we have another plan structure here.
5. select * from table_fun(1, 5) t(n);
n
---
1
2
3
4
5
(5 rows)
Projection: #t.n +
Projection: #generate_series(Int64(1),Int64(5)) AS n, alias=t+
TableUDFs: generate_series(Int64(1), Int64(5)) +
EmptyRelation
It looks the same with the previous case, however we have a bit different plan here to support alias (because table_fun node not support aliases and we have to add projection).
Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec<ColumnarValue>, this will require a lot of memory in case of a request for millions of rows.
BTW, from clippy:
error: unneeded `return` statement
--> datafusion/core/src/physical_plan/functions.rs:752:9
|
752 | return Ok(ColumnarValue::Array(result));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: remove `return`: `Ok(ColumnarValue::Array(result))`
|
= note: `-D clippy::needless-return` implied by `-D warnings`
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return
error: could not compile `datafusion` due to previous error
warning: build failed, waiting for other jobs to finish...
error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
--> datafusion/core/src/execution/context.rs:3527:32
|
3527 | let start_number = start_arr.into_iter().nth(0).unwrap().unwrap_or(0);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `start_arr.into_iter().next()`
|
= note: `-D clippy::iter-nth-zero` implied by `-D warnings`
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero
error: called `.nth(0)` on a `std::iter::Iterator`, when `.next()` is equivalent
--> datafusion/core/src/execution/context.rs:3533:30
|
3533 | let end_number = end_arr.into_iter().nth(0).unwrap().unwrap_or(0) + 1;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try calling `.next()` instead of `.nth(0)`: `end_arr.into_iter().next()`
|
= help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#iter_nth_zero
error: build failed
Hmm, is TableFunction an expression 🤔?
refer to https://docs.snowflake.com/en/sql-reference/functions-table.html
The sql looks usually like
select doi.date as "Date", record_temperatures.city, record_temperatures.temperature
from dates_of_interest as doi,
table(record_high_temperatures_for_date(doi.date)) as record_temperatures;
It shouldn't be an expression, right?
Hmmmm...I have some problems about this pr.
If we treat UDTF as an expression, does it mean that it can only produce one column?
As I mentioned before (https://github.com/apache/arrow-datafusion/pull/2177#issuecomment-1094208748), it's more like a table so that we can select * from it and get any number of columns.
I'm confused, would you please explain it to me? @alamb @gandronchik
Hmmmm...I have some problems about this pr. If we treat UDTF as an expression, does it mean that it can only produce one column? As I mentioned before (#2177 (comment)), it's more like a table so that we can
select * fromit and get any number of columns. I'm confused, would you please explain it to me? @alamb @gandronchik
I had the same question. I'm not sure I understand how this is different from a scalar function. It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
I agree it should definitely produce RecordBatch
what about Result<Vec<ColumnarValue>>. I already almost implemented it this way:)
It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
I agree it should definitely produce
RecordBatch
what about
Result<Vec<ColumnarValue>>. I already almost implemented it this way:)It seems like a table function should produce RecordBatchs and effectively compile down to an ExecutionPlan.
I agree it should definitely produce
RecordBatch
That's essentially a RecordBatch :)
You could have
pub type TableFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<Vec<ColumnarValue>> + Send + Sync>;
// This is a terrible name but this would be analogous to ReturnTypeFunction/StateTypeFunction
pub type TableSchemaFunction =
Arc<dyn Fn(&[DataType]) -> Result<SchemaRef> + Send + Sync>;
@alamb @thinkharderdev @doki23 i met the same problem in #2343
if we treat it as a Expr , we need change it to PhysicalExpr but
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
pub enum ColumnarValue {
/// Array of values
Array(ArrayRef),
/// A single value
Scalar(ScalarValue),
}
cause of it return ColumnarValue, we can not return result as a table, am i right?
Should i implement a TablePhysicalExpr
using
fn evaluate(&self, batch: &RecordBatch) -> Result<Vec<ColumnarValue>>;
@alamb @thinkharderdev @doki23 i met the same problem in https://github.com/apache/arrow-datafusion/issues/2343
I left some thoughts in
https://github.com/apache/arrow-datafusion/issues/2343#issuecomment-1110222756
I plan to give this a more careful review tomorrow
I don't think it should return multiply columns, structures are usually used for this.
I cannot agree. Result of Table Function represents a temporary table. Since it's a table, it shouldn't only have one column. Of course, one column of type structure can solve the problem, but it's different. We cannot directly execute order by or other query on it if we don't extract the structure.
@gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o.
I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)
Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.
The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with Streams but then that requires interacting with rust's async ecosystem which is non trivial
If you wanted a streaming solution, that would mean the signature might look something like the following (maybe)
Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;
@gandronchik thank you for the explanation in this PR's description. It helps though I will admit I still don't fully understand what is going o.
I agree with @doki23 -- I expect a table function to logically return a table (that something with both rows and columns)
Regarding signature, I decided to use a single vector and vector with sizes of sections instead of vec of vecs to have better performance. If we use Vec, this will require a lot of memory in case of a request for millions of rows.
The way the rest of DataFusion avoids buffering all the intermediate results at once int memory is with
Streams but then that requires interacting with rust'sasyncecosystem which is non trivialIf you wanted a streaming solution, that would mean the signature might look something like the following (maybe)
Arc<dyn Fn(Box<dyn SendableRecordBatchStream>) -> Result<Box<dyn SendableRecordBatchStream>> + Send + Sync>;
Looks like I got the title wrong. I have implemented a function that returns many rows, probably it is not a table function. If I rename it, will it be fine?
Regarding the function signature, I think my solution is a compromise between vecResult<(ArrayRef, Vec<usize>)> + Send + Sync>, Result<Vec<ColumnarValue>> + Send + Sync> or Result<Box< dyn SendableRecordBatchStream>> + Send + Sync> ?
I think adding UDTFs (aka user defined table functions) that produce a 2 dimensional table output (aka Vec<RecordBatch> or a SendableRecordBatchStream) would be a valuable addition to DataFusion.
I think Spark calls these "table value functions":
https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-tvf.html
Postgres calls them table functions:
https://www.postgresql.org/docs/7.3/xfunc-tablefunctions.html
However, this PR does not implement table functions that I can see. I still don't fully understand the usecase for the code in this PR for a function that returns a single column of values and I don't know of any other system that implements such functions. Thus I feel that this PR adds a feature that is not widely usable to DataFusion users as a whole, and so I don't feel I can approve it.
If others (users or maintainers) have a perspective on this issue, I would love to hear them too. If there is broader support for this feature, I won't oppose merging it.
marking as draft until we figure out what to do with this
@alamb Hello! Sorry for the long response.
I am sorry for so big PR with so a bad description.
Now I try to explain what is happening here. Honestly, I made mistake with the naming. I supported Set Returning Function. (https://www.postgresql.org/docs/current/functions-srf.html)
As I know DataFunction is oriented on PostgreSQL behavior. So, the functionality I provide here is Postgres functionality.
We already use it in Cube.js. We implemented a several functions:
- generate_series (https://www.postgresql.org/docs/current/functions-srf.html)
- generate_subscripts (https://www.postgresql.org/docs/current/functions-srf.html)
- unnest (https://www.postgresql.org/docs/current/functions-array.html)
Please, look at my PR closer. I am ready to improve it, rename some structures, etc.
Bellow, I provide the implementation of generate_series function (real Postgres function):
macro_rules! generate_series_udtf {
($ARGS:expr, $TYPE: ident, $PRIMITIVE_TYPE: ident) => {{
let mut section_sizes: Vec<usize> = Vec::new();
let l_arr = &$ARGS[0].as_any().downcast_ref::<PrimitiveArray<$TYPE>>();
if l_arr.is_some() {
let l_arr = l_arr.unwrap();
let r_arr = downcast_primitive_arg!($ARGS[1], "right", $TYPE);
let step_arr = PrimitiveArray::<$TYPE>::from_value(1 as $PRIMITIVE_TYPE, 1);
let step_arr = if $ARGS.len() > 2 {
downcast_primitive_arg!($ARGS[2], "step", $TYPE)
} else {
&step_arr
};
let mut builder = PrimitiveBuilder::<$TYPE>::new(1);
for (i, (start, end)) in l_arr.iter().zip(r_arr.iter()).enumerate() {
let step = if step_arr.len() > i {
step_arr.value(i)
} else {
step_arr.value(0)
};
let start = start.unwrap();
let end = end.unwrap();
let mut section_size: i64 = 0;
if start <= end && step > 0 as $PRIMITIVE_TYPE {
let mut current = start;
loop {
if current > end {
break;
}
builder.append_value(current).unwrap();
section_size += 1;
current += step;
}
}
section_sizes.push(section_size as usize);
}
return Ok((Arc::new(builder.finish()) as ArrayRef, section_sizes));
}
}};
}
pub fn create_generate_series_udtf() -> TableUDF {
let fun = make_table_function(move |args: &[ArrayRef]| {
assert!(args.len() == 2 || args.len() == 3);
if args[0].as_any().downcast_ref::<Int64Array>().is_some() {
generate_series_udtf!(args, Int64Type, i64)
} else if args[0].as_any().downcast_ref::<Float64Array>().is_some() {
generate_series_udtf!(args, Float64Type, f64)
}
Err(DataFusionError::Execution(format!("Unsupported type")))
});
let return_type: ReturnTypeFunction = Arc::new(move |tp| {
if tp.len() > 0 {
Ok(Arc::new(tp[0].clone()))
} else {
Ok(Arc::new(DataType::Int64))
}
});
TableUDF::new(
"generate_series",
&Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
TypeSignature::Exact(vec![DataType::Int64, DataType::Int64, DataType::Int64]),
TypeSignature::Exact(vec![DataType::Float64, DataType::Float64]),
TypeSignature::Exact(vec![
DataType::Float64,
DataType::Float64,
DataType::Float64,
]),
],
Volatility::Immutable,
),
&return_type,
&fun,
)
}
Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above.
Thanks @gandronchik -- I will try and find time to re-review this PR over the next few days in light of the information above.
@alamb Hello! Have you had already time to check the PR?
@alamb Hello! Have you had already time to check the PR?
Hi @gandronchik sadly I have not had a chance. I apologize for my lack of bandwidth but it is hard to find sufficient contiguous time to review such large PRs when I don't have the background context.
My core problem is that I don't understand (despite your admirable attempts to clarify) what this PR is trying to implement, so it is very hard to evaluate the code to see if it is implementing what is desired (because I don't understand what is desired).
For example, all the examples of "set returning functions" in the links you shared in postgres appear to use those functions as elements in the FROM clause. For example,
select * from unnest(ARRAY[1,2], ARRAY['foo','bar','baz']) as x(a,b) →
So I am struggling to understand examples you share in the PR's description that show using these functions in combination with a column 🤔
select table_fun(1, col) from (select 2 col union all select 3 col) t;
So what would you think about implementing more general user defined table functions (that can return RecordBatches / streams as we have discussed above)? I think others would also likely use such functionality and it seems like it would satisfy the usecases from cube.js (?)
@alamb Hello! I think it will be easier to understand what I implemented here if you check how generate_series function works in Postgres. Just try to call the following requests:
1. select generate_series(1, 5);
2. select generate_series(1, n) from (select 2 n union all select 3 n) x;
3. select n, generate_series(1, n) from (select 2 n union all select 3 n) x;
4. select col from generate_series(1, 5) fun(col);
Before these changes, DataFusion had only udf (returns only one row per each input row) and udaf (returns one row per any count of input rows). My changes allow to return multiply rows per each input row.
This PR is more than 6 month old, so closing it down for now to clean up the PR list. Please reopen if this is a mistake and you plan to work on it more