datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Support array aggregate sum function

Open jayzhan211 opened this issue 2 years ago • 19 comments

Which issue does this PR close?

Ref #7213 . Ref #7214 .

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Note

  • array_aggregate not support somethings like SELECT list_aggregate([2, 4, 8, 42], 'string_agg', '|'), so I don't close #7213 .

  • array_sum is not included in this PR

jayzhan211 avatar Aug 09 '23 09:08 jayzhan211

@izveigor @alamb Ready for review, thanks 👍

jayzhan211 avatar Aug 10 '23 03:08 jayzhan211

Thank you @jayzhan211 -- I didn't even know this (array aggregate functions) was a thing!

I think we should consider using the existing aggregate / accumulators rather than reimplementing aggregates for arrays and left some detailed suggestions. Let me know what you think

I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.

jayzhan211 avatar Aug 11 '23 13:08 jayzhan211

I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.

I can probably find time to help figure out how this might be able to work next week if that would help

alamb avatar Aug 12 '23 11:08 alamb

One of the problems I met is I would get NullArray from batch (RecordBatch) if I did not set the target partition to 1. https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/core/src/physical_plan/aggregates/no_grouping.rs#L112-L114

CoalescePartitionsExec is the one that sent the batch with NullArray. The default partition seems to be 4. I would need to set set datafusion.execution.target_partitions = 1; in array.slt. I'm not yet fully understand why it that, not confident on whether this is the correct fix or workaround.

Exact batch I get with default partition count

  1. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "column1", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [ListArray [ PrimitiveArray<Int64> [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ], PrimitiveArray<Int64> [ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, ], PrimitiveArray<Int64> [ 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, ], PrimitiveArray<Int64> [ 31, 32, 33, 34, 35, 26, 37, 38, 39, 40, ], ]], row_count: 4 }
  2. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }
  3. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }
  4. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64> [ null, ]], row_count: 1 }

Only the first one is expected, the other 3 are null.

The query I was testing with select array_aggregate(column1, 'sum') from arrays_values_without_nulls;

jayzhan211 avatar Aug 12 '23 23:08 jayzhan211

Another big issue, we might need Array for aggregate function Sum so we can process array and return ArrayRef. I try to process and return ScalarValue, but ScalarValue::List is not a good replacement for ArrayRef. We might need ColumnarValue that can accept either ScalarValue or ArrayRef for fn sum_batch and SumAccumulator.

https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/physical-expr/src/aggregate/sum.rs#L269-L300

Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?

https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/expr/src/accumulator.rs#L49-L63

jayzhan211 avatar Aug 13 '23 06:08 jayzhan211

It seems that ColumnarValue for Accumulator works! Just need to confirm this is a reasonable change.

An approach without set datafusion.execution.target_partitions = 1; has not been solved.

jayzhan211 avatar Aug 13 '23 07:08 jayzhan211

Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?

I wonder if you can call Accumulator::update_batch instead? It is already possible to go from ColumnarValue --> Array via https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#method.into_array

alamb avatar Aug 13 '23 11:08 alamb

I miss the latest comment, let me try out that.

jayzhan211 avatar Aug 13 '23 15:08 jayzhan211

I am sorry @jayzhan211 -- I don't think I will have time to help work on this PR for the next few days. I will try to find time this wekeend or next week but I am too busy now with TopK and copy related work to help here too. Sorry I am spread too thin 😢

alamb avatar Aug 16 '23 19:08 alamb

It is significantly simpler after #7352 :)

jayzhan211 avatar Oct 18 '23 11:10 jayzhan211

I have this on my review list, and I hope to start working that list down tomorrow

alamb avatar Oct 18 '23 20:10 alamb

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like https://github.com/apache/arrow-datafusion/pull/7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

alamb avatar Oct 27 '23 20:10 alamb

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like https://github.com/apache/arrow-datafusion/pull/7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

Ok

jayzhan211 avatar Oct 27 '23 23:10 jayzhan211

Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?

edmondop avatar Nov 13 '23 02:11 edmondop

Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?

I plan to merge https://github.com/apache/arrow-datafusion/pull/8141 first, then maybe review https://github.com/apache/arrow-datafusion/pull/7242#discussion_r1369254508.

jayzhan211 avatar Nov 13 '23 11:11 jayzhan211

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.

jayzhan211 avatar Nov 19 '23 08:11 jayzhan211

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features -- I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.

My concern is that aggregate functions are some of the most important features in DataFusion as they are widely used and their performance is very important perspective. I am very concerned that any change in how the aggregates operate will cause issues downstream (either functionality or performance) as well as make it harder to maintain.

For this case especially I am worried about adding features for their own sake without anyone who is waiting for it

So one question I have is if anyone is waiting on this feature, and if so perhaps they can help implement / test it.

alamb avatar Nov 20 '23 14:11 alamb

I am trying to go through old PRs and make sure we don't lose any -- this one has not had much activity and has accumulated conflicts. Marking as draft so it isn't on the review queue. Please feel free to reopen / mark as ready for review if it is

alamb avatar Feb 02 '24 21:02 alamb

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

github-actions[bot] avatar May 04 '24 01:05 github-actions[bot]