flink
flink copied to clipboard
[FLINK-21949][table] Support ARRAY_AGG aggregate function
What is the purpose of the change
[FLINK-21949][table] Support ARRAY_AGG aggregate function
Some nosql databases like mongodb and elasticsearch support nested data types. Aggregating multiple rows into ARRAY<ROW> is a common requirement.
Brief change log
Introduce built in function ARRAY_AGG([ ALL | DISTINCT ] expression)
to return an array that concatenates the input rows
and returns NULL if there are no input rows. NULL values will be ignored. Use DISTINCT for one unique instance of each value.
SELECT ARRAY_AGG(f1)
FROM tmp
GROUP BY f0
Note that we have made some simplifications based on Calcite's SqlLibraryOperators.ARRAY_AGG
.
-- calcite
ARRAY_AGG([ ALL | DISTINCT ] value [ RESPECT NULLS | IGNORE NULLS ] [ ORDER BY orderItem [, orderItem ]* ] )
-- flink
ARRAY_AGG([ ALL | DISTINCT ] expression)
The differences from Calcite are as follows:
- Null values are ignored.
- The order by expression within the function is not supported because the complete row record cannot be accessed within the function implementation.
- The function returns null when there's no input rows, but calcite definition returns an empty array. The behavior was referenced from BigQuery and Postgres.
- https://cloud.google.com/bigquery/docs/reference/standard-sql/aggregate_functions#array_agg
- https://www.postgresql.org/docs/8.4/functions-aggregate.html
Verifying this change
ITCase and UnitCase are added.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (yes) - The serializers: (no )
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (docs)
CI report:
- d898519a24a365ddde33c5e6811f4df99bbe9aef Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
Hi @wuchong, could you help review this when you have time?
Thanks @snuyanzin for the review.
Sorry, I was a little puzzled because I did not use HashMap
in Arrayaggfunction
.
Could you help pinpoint exactly where it is?
Do you mean using equalityEvaluator to compare when merging and retracting lists?
yes, sorry was not clear enough
to compare when merging and retracting lists?
yes
@flinkbot run azure
Hi @snuyanzin, could you help take a look again when you have time? Thanks.
https://github.com/apache/flink/pull/23411#pullrequestreview-1770150649
It took me a while to understand that myself, but I think it's actually ok to depend on the Object#equals/hashcode
in AggregateFunctions
@snuyanzin what you said is correct for ArrayDistinctFunction
, because:
- it is a scalar function, so it can be chained with other functions without an exchange before it
- It gets data from two input(s) (previous operator and e.g. a literal) haystack and needle may come from different operators which may produce data in different formats e.g.
GenericRowData
andBinaryRowData
In AggregateFunctions
we will always have all records as BinaryRowData
(and alike) and those equals/hashcode
should work just fine. (It may be a different story when we support STRUCTURED_TYPE#equals/hashcode
, but we will need to revisit most of the operators then, because all MapView(s)
will work incorrectly). This is what we do in other aggregate functions already. Take a look at e.g. JsonArrayAggFunction
or FirstValueWithRetractAggFunction
I haven't seen the previous version @Jiabao-Sun , but I believe we can remove the equalityHandler
and make the ArrayAggFunction
look similar to JsonArrayAggFunction
.
thanks a lot for the explanation @dawidwys you're right
Thanks @snuyanzin @dawidwys for the review. Could you help review it again?
Thanks for addressing comments
in general it looks ok from my side
i guess there is one little thing: since it is based on Calcite parser it allows to have ORDER BY
inside...
At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported
Thanks for addressing comments in general it looks ok from my side i guess there is one little thing: since it is based on Calcite parser it allows to have
ORDER BY
inside... At the same time it is currently not supported on Flink level, not sure whether we can redefine this behavior however at least it would make sense to mention it in doc that it is not supported
Yes, ORDER BY allows sorting of any field in the input rows, but currently it is difficult to obtain the complete input rows for sorting in the function implementation. Therefore, the ORDER BY
clause is not supported yet.
I have added an explanation in the documentation.
@snuyanzin, please help take a look again when you have time.
@flinkbot run azure
@flinkbot run azure
Hi @dawidwys, please help review it again when you have time. Thanks a lot.
Sorry, it takes such a long time from my side. I had a vacation in the meantime. I'll try to check it Monday. Nevertheless if you're comfortable with the PR @snuyanzin feel free to merge it without waiting for my review.
I think we could wait until Monday or even more since right now there is a feature freeze and need to wait for cutting release branch
Thanks for the update @Jiabao-Sun The implementation looks good now. I want to go through the tests again, but I need a bit more time. I hope this is fine, cause anyway we need to wait for a branch cut.