Convert `ArrayAgg` to UDAF
Is your feature request related to a problem or challenge?
Similar to other issues in #8708
Remember to include test in roundtrip_expr_api
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
Will work on this one. I think it might also involve moving nth_value since they shared some code.
@jayzhan211 I pushed a work in progress here https://github.com/apache/datafusion/pull/11029 it still fails some test cases in sqllogictests
External error: query failed: DataFusion error: This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: NTH_VALUE(aggregate_test_100.c4,Int64(3)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
[SQL] SELECT
NTH_VALUE(c4, 3) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as nth_value1,
NTH_VALUE(c4, 2) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as nth_value2
FROM aggregate_test_100
ORDER BY c9
LIMIT 5
at test_files/window.slt:1205
External error: query failed: DataFusion error: External error: Arrow error: Invalid argument error: column types must match schema types, expected List(Field { name: "item", data_type: Struct([Field { name: "sn@1", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) but found List(Field { name: "item", data_type: Struct([Field { name: "sn@0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) at column index 2
[SQL] SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
e.currency_to = 'USD' AND
s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn;
at test_files/group_by.slt:3181
The first one seems to be because the new NTH_VALUE UDAF is picked over the builtin window function with the same name. Is this expected? What is the correct course of action to resolve it?
The second one looks a bit weird to me, not sure if I messed something up or I hitting some other issue.
@jayzhan211 If you have time to provide some pointers that would be highly appreciated :)
I suggest we convert 1. ArrayAgg 2 DistinctArrayAgg, 3. OrderSensitiveArrayAgg and 3. NthValue separately.
OrderSensitiveArrayAgg and NthValue are quite complex.
ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.
We can get nullable with
https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr/src/aggregate/build_in.rs#L71
https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr-common/src/aggregate/mod.rs#L275C1-L289C6
I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.
At least leave
OrderSensitiveArrayAggandNthValuein another PR, since they have ordering and window function so it is quite complex
Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?
ArrayAgg and Nth expect to have parameter
nullable, we need to add it toAggregateFunctionExprso we can add it toStateFieldsArgsforstate_fieldsand change thenullableforfield.
Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?
ArrayAgg and Nth expect to have parameter
nullable, we need to add it toAggregateFunctionExprso we can add it toStateFieldsArgsforstate_fieldsand change thenullableforfield.Make sense. How come we only provide a single value for
input_type(https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be justinput_nullable: boolor should it beinputs_nullable: Vec<bool>?
We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec
I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately. At least leave
OrderSensitiveArrayAggandNthValuein another PR, since they have ordering and window function so it is quite complexSounds good. If only converting
ArrayAgghow do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?
We could check the distinct and ordering to know whether we should use builtin or UDAF here
https://github.com/apache/datafusion/blob/18042fd69138e19613844580408a71a200ea6caa/datafusion/core/src/physical_planner.rs#L1825-L1909
The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state? We can move to
physical_expr_commonfirst, after all the related function is done, then move it back.
We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec
What about covariance: https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/covariance.rs#L43 that takes 2 arguments.
@jayzhan211 Created a PR for only doning ArrayAgg here https://github.com/apache/datafusion/pull/11045 will look into adding nullable next.
array_agg is known to produce non-null result. if we make it an UDAF, to avoid regression (if we choose so), we need a way for an UDAF to mark its output as non-null: https://github.com/apache/datafusion/issues/11274
@eejbyfeldt Dp you plan to work on array_agg?
@jayzhan211 I will not be able to work on it for the two weeks due to being on vacation. So, someone else should feel to pick it up/take it over before then.
@jayzhan211 Should this be closed. Seems like it was resolved with #11448 ?
Yes, all the functions are converted