flink
flink copied to clipboard
[FLINK-31664] Implement ARRAY_INTERSECT function
-
What is the purpose of the change This is an implementation of ARRAY_INTERSECT
-
Brief change log ARRAY_INTERSECT for Table API and SQL
Returns an array of the elements in the intersection of array1 and array2, with duplicates.
Syntax:
array_intersect(array1, array2)
Arguments:
array: An ARRAY to be handled.
Returns:
An ARRAY. If any of the array is null, the function will return null.
Examples:
> SELECT array_intersect(array(1, 1, 3), array(1, 3, 5));
[1,1,3]
-
Verifying this change This change added tests in CollectionFunctionsITCase.
-
Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): ( no) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes ) The serializers: (no) The runtime per-record code paths (performance sensitive): ( no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) The S3 file system connector: ( no)
-
Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs)
after discussion with @dawidwys here https://github.com/apache/flink/pull/23171#issuecomment-1956501651
CI report:
- 07d3f0727c68780467cb707c6bdffc37507062b6 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
hi @dawidwys will you help review this?
Before I review the code let's settle on the behaviour first.
@MartijnVisser What is your opinion on how the function should behave? Especially in the context of https://github.com/apache/flink/pull/23173#discussion_r1491044219 and handling duplicates.
What should be the output of: [1, 1, 1, 2] INTERSECT [1, 1, 2]
?
- [1,2] - Spark/Databricks/Presto
- [1,1,2] - Snowflake
- [1, 1, 1, 2] - (as far as I can tell the current behaviour of the PR)
- Snowflake has multi-set semantics.
If one array has N copies of a value, and the other array has M copies of the same value, then the number of copies in the returned array is the smaller of N or M. For example, if N is 4 and M is 2, then the returned value contains 2 copies.
- Databricks deduplicates result (Spark I presume has the same behaviour)
An ARRAY of matching type to array1 with no duplicates and elements contained in both array1 and array2.
- Presto does the same as Spark:
Returns an array of the elements in the intersection of x and y, without duplicates.
one more idea some vendors allow to calculate intersections for arbitrary amount of arrays e.g. Clickhouse[1] [1] https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr
from my side, it is not a good idea. because we can use array_intersect(array_intersect(array1, array2), array3) does same. it is just a syntatic sugar. and array_union/array_except has supported and merged. there are both two args. we may align , what is your opinion @dawidwys @MartijnVisser
hi @snuyanzin @dawidwys what is your opinion?
What is your opinion on how the function should behave?
I've taken a look at how INTERSECT is defined in the SQL standard. Based on https://stackoverflow.com/questions/59060599/does-intersect-operator-exist-in-the-sql-standar, https://www.postgresql.org/docs/current/queries-union.html, the fact that Calcite differentiates between INTERSECT and INTERSECT ALL leads me to believe that the default behavior of INTERSECT is to remove duplicates.
So the result of INTERSECT on [1, 1, 1, 2] INTERSECT [1, 1, 2]
should be [1, 2]
in my understanding. I think that Spark/Databricks/Presto are performing the correct behavior.
BigQuery and Redshift don't support ARRAY_INTERSECT. ksqlDB follows the same behavior as Spark/Databricks/Presto per https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#array_intersect.
hi @MartijnVisser INTERSECT is different with array_intersect. instersect is a set RelNode, like union/union all, which indeed is a SQL standard.. while array_intersect array_union are just functions, which doesn't have any standard
@liuyongvs I disagree: I think that we're looking at what the definition of INTERSECT in general is, not from a functional or implementation perspective, but more if there's a definition of what INTERSECT should do. I don't think it's a good idea to have INTERSECT in Flink that doesn't return duplicates, and then have an ARRAY_INTERSECT that does return duplicates. That's not consistent. If both INTERSECT and ARRAY_INTERSECT don't return duplicates, that is a consistent behavior.
So IMHO:
INTERSECT and ARRAY_INTERSECT --> Removes duplicates
If there's a need to have duplicates included:
INTERSECT ALL and ARRAY_INTERSECT_ALL --> Keep duplicates, have consistent behavior with INTERSECT ALL
@MartijnVisser @dawidwys @snuyanzin I agree with you. That is to say, this semantic alignment with Spark's is clear, with no duplicate elements involved. Consequently, I believe it is also necessary to maintain consistency for array_union/array_except. As of now, array_union has been developed by me, with semantics and behavior aligned with Spark, meaning no duplicates are included. However, array_except https://issues.apache.org/jira/browse/FLINK-31663 does include duplicates, and thus its semantics do not align; it requires modification. Furthermore, array_except was merged in version 1.20, and since 1.20 is currently only a snapshot version and not officially released, there’s no concern of causing compatibility issues due to changing behaviors, so it should indeed be corrected. What are your thoughts?
hi @MartijnVisser
Furthermore, array_except was merged in version 1.20, and since 1.20 is currently only a snapshot version and not officially released, there’s no concern of causing compatibility issues due to changing behaviors, so it should indeed be corrected. What are your thoughts?
I think you're right. @dawidwys @snuyanzin WDYT?
hi @dawidwys @snuyanzin WDYT?
May be it is an unpopular opinion
however I tend to think that INTERSECT
vs INTERSECT ALL
and the same for others for set and bag semantics is defined for rows and hardly could be applied for collections. I failed to find standard approach for collections like arrays (I mean in SQL Standard).
Probably that is one of the reasons we could see a number of vendors are handling this differently. From one side we could say that we should follow the same approach as for rows. The problem I see here is that by default we will remove duplicates however what should we do if we want to keep them? There is no well known vendor providing both array_intersect
and array_intersect_all
or keep/remove duplicates as a parameter.
At the same side if we keep duplicates then we will still be able to cover both cases:
we can do for case with duplicates
array_intersect(array1, array2)
and for case without
array_distinct(array_intersect(array1, array2))
Yes, array_union
looks like an exception here, however if we compare against vendors then it is a global exception just because there is a nice synonym which is used for another function array_concat
, and as you can see again it doesn't follow approach for rows since it is not something like array_union_all
. Thus if we want to concatenate arrays without duplicates we use array_union
and for the case with duplicates we could use array_concat
. The problem is that not every function has such a workaround.
I failed to find standard approach for collections like arrays (I mean in SQL Standard).
Same. I don't think it's documented anywhere. So how do we come to a conclusion (given that the 1.20 release date is coming to us rather quickly)?
Based on what I mentioned above how about following multi-set semantics like Snowflake by default then it will allow to cover more cases (with and without duplicates)?
array_union
/array_concat
will remain as exception since it looks like and exception across lots of the vendors
@snuyanzin @MartijnVisser Why do we necessarily have to align our semantics with Snowflake?
i found spark/presto/doris/starrocks/max_compute all follow the semantics with duplicate. I personally still think that aligning behavior with the majority of vendors might be better, as it would not cause too much confusion for users when they switch to a different engine. Unless the behavior is very unreasonable, we should need to change the semantics. What do you think?
https://prestodb.io/docs/current/functions/array.html#array_intersect https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_intersect/ https://www.alibabacloud.com/help/en/maxcompute/user-guide/array-intersect https://github.com/StarRocks/starrocks/blob/fb36000ee72588c8b4db196292c2acb40f306109/be/src/exprs/array_functions.cpp#L1435
@snuyanzin @MartijnVisser Why do we necessarily have to align our semantics with Snowflake?
as it was mentioned above the main reason is that multi-set semantics (Snowflake) allows to handle cases with duplicates e.g.
SELECT array_count(array_intersect(array('A', 'B', 'B'), array('B', 'B', 'B'))); -- returns 2
and without
SELECT array_count(array_distinct(array_intersect(array('A', 'B', 'B'), array('B', 'B', 'B')))); -- returns 1
And it seems Spark and others couldn't calculate amount of duplicates with their semantics and I would consider it as a main drawback of their approach, please correct me if I'm wrong @liuyongvs
@snuyanzin Indeed, while this might only result in a difference in function behavior, it would generally be best to align with the practices of the majority of other engines. If in the future there arises a demand to preserve duplicates, or if other engines also adopt this behavior, we could introduce a switch within the function to support the option to either preserve or remove duplicates. What do you think?
so far I see this kind of trade-off
multi-set semantics (with duplicates)
pros
- It covers more cases as mentioned above
- there is already implemented
array_except
with this semantic and it would make sense to follow this way P.S. earlier I wrote why I do not considerarray_union
cons
- there is only Snowflake from well known vendors who supports it
It would be great to see other opinions
This is indeed a tricky one. I also spent a significant amount of time here. Every vendor does it differently. I looked also at programming languages such as PHP, C#. For example, C# removes duplicates.
In general, having an INTERSECT on array types sounds weird. Intersection is a concept from set theory: https://en.wikipedia.org/wiki/Intersection_(set_theory)
But arrays are not sets. Since we also deduplicate unions (another term from set theory), I would vote for deduplicate behavior.
+1 for deduplicate
Conclusion: Since there are no objections, then we will support it with deduplication semantics.
rebase to fix conflicts, @twalthr @dawidwys @snuyanzin and will you help review this pr?
@snuyanzin fix your review, thanks very much
@liuyongvs I think you need to rebase in order to get the CI to pass
@MartijnVisser have rebased to fix ci
ci passed @snuyanzin @dawidwys @MartijnVisser will you have a look again?