flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-21949][table] Support ARRAY_AGG aggregate function

Open Jiabao-Sun opened this issue 1 year ago • 7 comments

What is the purpose of the change

[FLINK-21949][table] Support ARRAY_AGG aggregate function

Some nosql databases like mongodb and elasticsearch support nested data types. Aggregating multiple rows into ARRAY<ROW> is a common requirement.

Brief change log

Introduce built in function ARRAY_AGG([ ALL | DISTINCT ] expression) to return an array that concatenates the input rows and returns NULL if there are no input rows. NULL values will be ignored. Use DISTINCT for one unique instance of each value.

SELECT ARRAY_AGG(f1)
  FROM tmp
 GROUP BY f0

image

Note that we have made some simplifications based on Calcite's SqlLibraryOperators.ARRAY_AGG.

-- calcite
ARRAY_AGG([ ALL | DISTINCT ] value [ RESPECT NULLS | IGNORE NULLS ] [ ORDER BY orderItem [, orderItem ]* ] )
-- flink
ARRAY_AGG([ ALL | DISTINCT ] expression)

The differences from Calcite are as follows:

  1. Null values are ignored.
  2. The order by expression within the function is not supported because the complete row record cannot be accessed within the function implementation.
  3. The function returns null when there's no input rows, but calcite definition returns an empty array. The behavior was referenced from BigQuery and Postgres.
  • https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate_functions#array_agg
  • https://www.postgresql.org/docs/8.4/functions-aggregate.html

Verifying this change

ITCase and UnitCase are added.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no )
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

Jiabao-Sun avatar Sep 13 '23 12:09 Jiabao-Sun

CI report:

  • d898519a24a365ddde33c5e6811f4df99bbe9aef Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Sep 13 '23 13:09 flinkbot

Hi @wuchong, could you help review this when you have time?

Jiabao-Sun avatar Oct 10 '23 06:10 Jiabao-Sun

Thanks @snuyanzin for the review. Sorry, I was a little puzzled because I did not use HashMap in Arrayaggfunction. Could you help pinpoint exactly where it is?

Jiabao-Sun avatar Dec 07 '23 13:12 Jiabao-Sun

Do you mean using equalityEvaluator to compare when merging and retracting lists?

Jiabao-Sun avatar Dec 07 '23 14:12 Jiabao-Sun

yes, sorry was not clear enough

to compare when merging and retracting lists?

yes

snuyanzin avatar Dec 07 '23 15:12 snuyanzin

@flinkbot run azure

Jiabao-Sun avatar Dec 18 '23 08:12 Jiabao-Sun

Hi @snuyanzin, could you help take a look again when you have time? Thanks.

Jiabao-Sun avatar Dec 19 '23 09:12 Jiabao-Sun

https://github.com/apache/flink/pull/23411#pullrequestreview-1770150649

It took me a while to understand that myself, but I think it's actually ok to depend on the Object#equals/hashcode in AggregateFunctions @snuyanzin what you said is correct for ArrayDistinctFunction, because:

  1. it is a scalar function, so it can be chained with other functions without an exchange before it
  2. It gets data from two input(s) (previous operator and e.g. a literal) haystack and needle may come from different operators which may produce data in different formats e.g. GenericRowData and BinaryRowData

In AggregateFunctions we will always have all records as BinaryRowData (and alike) and those equals/hashcode should work just fine. (It may be a different story when we support STRUCTURED_TYPE#equals/hashcode, but we will need to revisit most of the operators then, because all MapView(s) will work incorrectly). This is what we do in other aggregate functions already. Take a look at e.g. JsonArrayAggFunction or FirstValueWithRetractAggFunction

I haven't seen the previous version @Jiabao-Sun , but I believe we can remove the equalityHandler and make the ArrayAggFunction look similar to JsonArrayAggFunction.

dawidwys avatar Jan 15 '24 15:01 dawidwys

thanks a lot for the explanation @dawidwys you're right

snuyanzin avatar Jan 15 '24 23:01 snuyanzin

Thanks @snuyanzin @dawidwys for the review. Could you help review it again?

Jiabao-Sun avatar Jan 16 '24 06:01 Jiabao-Sun

Thanks for addressing comments in general it looks ok from my side i guess there is one little thing: since it is based on Calcite parser it allows to have ORDER BY inside... At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported

snuyanzin avatar Jan 17 '24 23:01 snuyanzin

Thanks for addressing comments in general it looks ok from my side i guess there is one little thing: since it is based on Calcite parser it allows to have ORDER BY inside... At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported

Yes, ORDER BY allows sorting of any field in the input rows, but currently it is difficult to obtain the complete input rows for sorting in the function implementation. Therefore, the ORDER BY clause is not supported yet. I have added an explanation in the documentation.

@snuyanzin, please help take a look again when you have time.

Jiabao-Sun avatar Jan 18 '24 02:01 Jiabao-Sun

@flinkbot run azure

Jiabao-Sun avatar Jan 18 '24 07:01 Jiabao-Sun

@flinkbot run azure

Jiabao-Sun avatar Jan 18 '24 14:01 Jiabao-Sun

Hi @dawidwys, please help review it again when you have time. Thanks a lot.

Jiabao-Sun avatar Jan 22 '24 09:01 Jiabao-Sun

Sorry, it takes such a long time from my side. I had a vacation in the meantime. I'll try to check it Monday. Nevertheless if you're comfortable with the PR @snuyanzin feel free to merge it without waiting for my review.

dawidwys avatar Feb 02 '24 14:02 dawidwys

I think we could wait until Monday or even more since right now there is a feature freeze and need to wait for cutting release branch

snuyanzin avatar Feb 02 '24 14:02 snuyanzin

Thanks for the update @Jiabao-Sun The implementation looks good now. I want to go through the tests again, but I need a bit more time. I hope this is fine, cause anyway we need to wait for a branch cut.

dawidwys avatar Feb 06 '24 16:02 dawidwys