Support array aggregate sum function
Which issue does this PR close?
Ref #7213 . Ref #7214 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
Note
-
array_aggregate not support somethings like
SELECT list_aggregate([2, 4, 8, 42], 'string_agg', '|'), so I don't close #7213 . -
array_sumis not included in this PR
@izveigor @alamb Ready for review, thanks 👍
Thank you @jayzhan211 -- I didn't even know this (array aggregate functions) was a thing!
I think we should consider using the existing aggregate / accumulators rather than reimplementing aggregates for arrays and left some detailed suggestions. Let me know what you think
I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.
I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.
I can probably find time to help figure out how this might be able to work next week if that would help
One of the problems I met is I would get NullArray from batch (RecordBatch) if I did not set the target partition to 1. https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/core/src/physical_plan/aggregates/no_grouping.rs#L112-L114
CoalescePartitionsExec is the one that sent the batch with NullArray. The default partition seems to be 4. I would need to set set datafusion.execution.target_partitions = 1; in array.slt. I'm not yet fully understand why it that, not confident on whether this is the correct fix or workaround.
Exact batch I get with default partition count
- loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "column1", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [ListArray [ PrimitiveArray<Int64> [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ], PrimitiveArray<Int64> [ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, ], PrimitiveArray<Int64> [ 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, ], PrimitiveArray<Int64> [ 31, 32, 33, 34, 35, 26, 37, 38, 39, 40, ], ]], row_count: 4 }
- loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }
- loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }
- loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }
Only the first one is expected, the other 3 are null.
The query I was testing with select array_aggregate(column1, 'sum') from arrays_values_without_nulls;
Another big issue, we might need Array for aggregate function Sum so we can process array and return ArrayRef.
I try to process and return ScalarValue, but ScalarValue::List is not a good replacement for ArrayRef. We might need ColumnarValue that can accept either ScalarValue or ArrayRef for fn sum_batch and SumAccumulator.
https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/physical-expr/src/aggregate/sum.rs#L269-L300
Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?
https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/expr/src/accumulator.rs#L49-L63
It seems that ColumnarValue for Accumulator works! Just need to confirm this is a reasonable change.
An approach without set datafusion.execution.target_partitions = 1; has not been solved.
Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?
I wonder if you can call Accumulator::update_batch instead? It is already possible to go from ColumnarValue --> Array via https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#method.into_array
I miss the latest comment, let me try out that.
I am sorry @jayzhan211 -- I don't think I will have time to help work on this PR for the next few days. I will try to find time this wekeend or next week but I am too busy now with TopK and copy related work to help here too. Sorry I am spread too thin 😢
It is significantly simpler after #7352 :)
I have this on my review list, and I hope to start working that list down tomorrow
@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --
I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like https://github.com/apache/arrow-datafusion/pull/7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features
@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --
I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like https://github.com/apache/arrow-datafusion/pull/7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features
Ok
Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?
Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?
I plan to merge https://github.com/apache/arrow-datafusion/pull/8141 first, then maybe review https://github.com/apache/arrow-datafusion/pull/7242#discussion_r1369254508.
@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --
I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features
@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.
@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features -- I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features
@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.
My concern is that aggregate functions are some of the most important features in DataFusion as they are widely used and their performance is very important perspective. I am very concerned that any change in how the aggregates operate will cause issues downstream (either functionality or performance) as well as make it harder to maintain.
For this case especially I am worried about adding features for their own sake without anyone who is waiting for it
So one question I have is if anyone is waiting on this feature, and if so perhaps they can help implement / test it.
I am trying to go through old PRs and make sure we don't lose any -- this one has not had much activity and has accumulated conflicts. Marking as draft so it isn't on the review queue. Please feel free to reopen / mark as ready for review if it is
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.