flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-31664] Implement ARRAY_INTERSECT function

Open liuyongvs opened this issue 11 months ago • 15 comments

  • What is the purpose of the change This is an implementation of ARRAY_INTERSECT

  • Brief change log ARRAY_INTERSECT for Table API and SQL

Returns an array of the elements in the intersection of array1 and array2, with duplicates.

Syntax:
array_intersect(array1, array2)

Arguments:
array: An ARRAY to be handled.

Returns:
An ARRAY. If any of the array is null, the function will return null.
Examples:

> SELECT array_intersect(array(1, 1, 3), array(1, 3, 5));
 [1,1,3]

  • Verifying this change This change added tests in CollectionFunctionsITCase.

  • Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): ( no) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes ) The serializers: (no) The runtime per-record code paths (performance sensitive): ( no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no) The S3 file system connector: ( no)

  • Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs)

liuyongvs avatar Mar 19 '24 08:03 liuyongvs

after discussion with @dawidwys here https://github.com/apache/flink/pull/23171#issuecomment-1956501651

liuyongvs avatar Mar 19 '24 08:03 liuyongvs

CI report:

  • 07d3f0727c68780467cb707c6bdffc37507062b6 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Mar 19 '24 08:03 flinkbot

hi @dawidwys will you help review this?

liuyongvs avatar Mar 19 '24 12:03 liuyongvs

Before I review the code let's settle on the behaviour first.

@MartijnVisser What is your opinion on how the function should behave? Especially in the context of https://github.com/apache/flink/pull/23173#discussion_r1491044219 and handling duplicates.

What should be the output of: [1, 1, 1, 2] INTERSECT [1, 1, 2]?

  1. [1,2] - Spark/Databricks/Presto
  2. [1,1,2] - Snowflake
  3. [1, 1, 1, 2] - (as far as I can tell the current behaviour of the PR)

If one array has N copies of a value, and the other array has M copies of the same value, then the number of copies in the returned array is the smaller of N or M. For example, if N is 4 and M is 2, then the returned value contains 2 copies.

An ARRAY of matching type to array1 with no duplicates and elements contained in both array1 and array2.

  • Presto does the same as Spark:

Returns an array of the elements in the intersection of x and y, without duplicates.

dawidwys avatar Mar 22 '24 11:03 dawidwys

one more idea some vendors allow to calculate intersections for arbitrary amount of arrays e.g. Clickhouse[1] [1] https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr

snuyanzin avatar Mar 22 '24 11:03 snuyanzin

https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr

from my side, it is not a good idea. because we can use array_intersect(array_intersect(array1, array2), array3) does same. it is just a syntatic sugar. and array_union/array_except has supported and merged. there are both two args. we may align , what is your opinion @dawidwys @MartijnVisser

liuyongvs avatar Mar 25 '24 06:03 liuyongvs

hi @snuyanzin @dawidwys what is your opinion?

liuyongvs avatar Mar 27 '24 11:03 liuyongvs

What is your opinion on how the function should behave?

I've taken a look at how INTERSECT is defined in the SQL standard. Based on https://stackoverflow.com/questions/59060599/does-intersect-operator-exist-in-the-sql-standar, https://www.postgresql.org/docs/current/queries-union.html, the fact that Calcite differentiates between INTERSECT and INTERSECT ALL leads me to believe that the default behavior of INTERSECT is to remove duplicates.

So the result of INTERSECT on [1, 1, 1, 2] INTERSECT [1, 1, 2] should be [1, 2] in my understanding. I think that Spark/Databricks/Presto are performing the correct behavior.

BigQuery and Redshift don't support ARRAY_INTERSECT. ksqlDB follows the same behavior as Spark/Databricks/Presto per https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#array_intersect.

MartijnVisser avatar Mar 27 '24 13:03 MartijnVisser

hi @MartijnVisser INTERSECT is different with array_intersect. instersect is a set RelNode, like union/union all, which indeed is a SQL standard.. while array_intersect array_union are just functions, which doesn't have any standard

liuyongvs avatar Mar 28 '24 01:03 liuyongvs

@liuyongvs I disagree: I think that we're looking at what the definition of INTERSECT in general is, not from a functional or implementation perspective, but more if there's a definition of what INTERSECT should do. I don't think it's a good idea to have INTERSECT in Flink that doesn't return duplicates, and then have an ARRAY_INTERSECT that does return duplicates. That's not consistent. If both INTERSECT and ARRAY_INTERSECT don't return duplicates, that is a consistent behavior.

So IMHO:

INTERSECT and ARRAY_INTERSECT --> Removes duplicates

If there's a need to have duplicates included:

INTERSECT ALL and ARRAY_INTERSECT_ALL --> Keep duplicates, have consistent behavior with INTERSECT ALL

MartijnVisser avatar Mar 28 '24 13:03 MartijnVisser

@MartijnVisser @dawidwys @snuyanzin I agree with you. That is to say, this semantic alignment with Spark's is clear, with no duplicate elements involved. Consequently, I believe it is also necessary to maintain consistency for array_union/array_except. As of now, array_union has been developed by me, with semantics and behavior aligned with Spark, meaning no duplicates are included. However, array_except https://issues.apache.org/jira/browse/FLINK-31663 does include duplicates, and thus its semantics do not align; it requires modification. Furthermore, array_except was merged in version 1.20, and since 1.20 is currently only a snapshot version and not officially released, there’s no concern of causing compatibility issues due to changing behaviors, so it should indeed be corrected. What are your thoughts?

liuyongvs avatar Mar 29 '24 02:03 liuyongvs

hi @MartijnVisser

liuyongvs avatar Apr 07 '24 08:04 liuyongvs

Furthermore, array_except was merged in version 1.20, and since 1.20 is currently only a snapshot version and not officially released, there’s no concern of causing compatibility issues due to changing behaviors, so it should indeed be corrected. What are your thoughts?

I think you're right. @dawidwys @snuyanzin WDYT?

MartijnVisser avatar Apr 17 '24 09:04 MartijnVisser

hi @dawidwys @snuyanzin WDYT?

liuyongvs avatar May 10 '24 11:05 liuyongvs

May be it is an unpopular opinion however I tend to think that INTERSECT vs INTERSECT ALL and the same for others for set and bag semantics is defined for rows and hardly could be applied for collections. I failed to find standard approach for collections like arrays (I mean in SQL Standard). Probably that is one of the reasons we could see a number of vendors are handling this differently. From one side we could say that we should follow the same approach as for rows. The problem I see here is that by default we will remove duplicates however what should we do if we want to keep them? There is no well known vendor providing both array_intersect and array_intersect_all or keep/remove duplicates  as a parameter. At the same side if we keep duplicates then we will still be able to cover both cases: we can do for case with duplicates

array_intersect(array1, array2)

and for case without

array_distinct(array_intersect(array1, array2))

Yes, array_union looks like an exception here, however if we compare against vendors then it is a global exception just because there is a nice synonym which is used for another function array_concat, and as you can see again it doesn't follow approach for rows since it is not something like array_union_all. Thus if we want to concatenate arrays without duplicates we use array_union and for the case with duplicates we could use array_concat. The problem is that not every function has such a workaround.

snuyanzin avatar May 10 '24 19:05 snuyanzin

I failed to find standard approach for collections like arrays (I mean in SQL Standard).

Same. I don't think it's documented anywhere. So how do we come to a conclusion (given that the 1.20 release date is coming to us rather quickly)?

MartijnVisser avatar May 21 '24 11:05 MartijnVisser

Based on what I mentioned above how about following multi-set semantics like Snowflake by default then it will allow to cover more cases (with and without duplicates)?

array_union/array_concat will remain as exception since it looks like and exception across lots of the vendors

snuyanzin avatar May 21 '24 12:05 snuyanzin

@snuyanzin @MartijnVisser Why do we necessarily have to align our semantics with Snowflake?

i found spark/presto/doris/starrocks/max_compute all follow the semantics with duplicate. I personally still think that aligning behavior with the majority of vendors might be better, as it would not cause too much confusion for users when they switch to a different engine. Unless the behavior is very unreasonable, we should need to change the semantics. What do you think?

https://prestodb.io/docs/current/functions/array.html#array_intersect https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_intersect/ https://www.alibabacloud.com/help/en/maxcompute/user-guide/array-intersect https://github.com/StarRocks/starrocks/blob/fb36000ee72588c8b4db196292c2acb40f306109/be/src/exprs/array_functions.cpp#L1435

liuyongvs avatar May 22 '24 01:05 liuyongvs

@snuyanzin @MartijnVisser Why do we necessarily have to align our semantics with Snowflake?

as it was mentioned above the main reason is that multi-set semantics (Snowflake) allows to handle cases with duplicates e.g.

SELECT array_count(array_intersect(array('A', 'B', 'B'), array('B', 'B', 'B'))); -- returns 2

and without

SELECT array_count(array_distinct(array_intersect(array('A', 'B', 'B'), array('B', 'B', 'B')))); -- returns 1

And it seems Spark and others couldn't calculate amount of duplicates with their semantics and I would consider it as a main drawback of their approach, please correct me if I'm wrong @liuyongvs

snuyanzin avatar May 22 '24 05:05 snuyanzin

@snuyanzin Indeed, while this might only result in a difference in function behavior, it would generally be best to align with the practices of the majority of other engines. If in the future there arises a demand to preserve duplicates, or if other engines also adopt this behavior, we could introduce a switch within the function to support the option to either preserve or remove duplicates. What do you think?

liuyongvs avatar May 22 '24 06:05 liuyongvs

so far I see this kind of trade-off

multi-set semantics (with duplicates)

pros

  1. It covers more cases as mentioned above
  2. there is already implemented array_except with this semantic and it would make sense to follow this way P.S. earlier I wrote why I do not consider array_union

cons

  1. there is only Snowflake from well known vendors who supports it

It would be great to see other opinions

snuyanzin avatar May 22 '24 07:05 snuyanzin

This is indeed a tricky one. I also spent a significant amount of time here. Every vendor does it differently. I looked also at programming languages such as PHP, C#. For example, C# removes duplicates.

In general, having an INTERSECT on array types sounds weird. Intersection is a concept from set theory: https://en.wikipedia.org/wiki/Intersection_(set_theory)

But arrays are not sets. Since we also deduplicate unions (another term from set theory), I would vote for deduplicate behavior.

twalthr avatar May 22 '24 12:05 twalthr

+1 for deduplicate

liuyongvs avatar May 22 '24 12:05 liuyongvs

Conclusion: Since there are no objections, then we will support it with deduplication semantics.

liuyongvs avatar May 23 '24 03:05 liuyongvs

rebase to fix conflicts, @twalthr @dawidwys @snuyanzin and will you help review this pr?

liuyongvs avatar May 23 '24 03:05 liuyongvs

@snuyanzin fix your review, thanks very much

liuyongvs avatar Jun 05 '24 02:06 liuyongvs

@liuyongvs I think you need to rebase in order to get the CI to pass

MartijnVisser avatar Jun 05 '24 07:06 MartijnVisser

@MartijnVisser have rebased to fix ci

liuyongvs avatar Jun 05 '24 08:06 liuyongvs

ci passed @snuyanzin @dawidwys @MartijnVisser will you have a look again?

liuyongvs avatar Jun 06 '24 01:06 liuyongvs