spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[FEA] Support function array_distinct

Open viadea opened this issue 3 years ago • 3 comments

I wish we can support function array_distinct.

Eg:

from pyspark.sql.functions import *
df = spark.createDataFrame([(["a", "b", "a"], ["b", "c"]), (["a","a"], ["b", "c"]), (["aa"], ["b", "c"])    ], ['x', 'y'])
df.write.format("parquet").mode("overwrite").save("/tmp/testparquet")
df = spark.read.parquet("/tmp/testparquet")
df.select(array_distinct(df.x).alias("distinct")).collect()

Not-supported-messages:

    ! <ArrayDistinct> array_distinct(x#58) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.ArrayDistinct

viadea avatar Apr 12 '22 20:04 viadea

I think CUDF already supports this through dropListDuplicates

https://github.com/rapidsai/cudf/blob/ac27757092e9ba2bc0656b6a7dfbc79ce8b5e76a/java/src/main/java/ai/rapids/cudf/ColumnView.java#L2375-L2386

We should be able to implement this without any issues, so long at dropListDuplicates supports the types.

revans2 avatar Apr 14 '22 13:04 revans2

I am interested in taking this. Could anyone point me in the right direction for which file (collectionOperations?) this would live in and maybe a comparable Gpu* case class (GpuArrayRemove?)?

Edit: Okay I see ArrayDistinct in the CPU version of collectionOperations so I think I'm on the right path

phish3y avatar Apr 10 '24 01:04 phish3y

@phish3y happy to have you start to work on this.

https://github.com/apache/spark/blob/0d7c07047a628bd42eb53eb49935f5e3f81ea1a1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4036

is the CPU implementation that we want to try and target. It looks like they have special case equality for NaNs and Nulls, but I am not sure if it is going to work with -0.0 vs 0.0 properly. We probably need to do some explicit testing on different versions of Spark.

The other thing to be careful of is that it appears that Spark is purposely keeping the order of the values in the array the same and only removing duplicates that come later. I am not sure if we need to replicate this functionality or not. It would be ideal if we could, but I don't think this is critical because it started to happen after a bug fix. https://github.com/apache/spark/pull/33993

As for how you might be able to implement this I would suggest that you start with

https://github.com/rapidsai/cudf/blob/e727814c00ce0ae13febfeb44ca3d2db66f7f2e9/cpp/include/cudf/lists/stream_compaction.hpp#L87

using the java API for it https://github.com/rapidsai/cudf/blob/e727814c00ce0ae13febfeb44ca3d2db66f7f2e9/java/src/main/java/ai/rapids/cudf/ColumnView.java#L2513

Then we can see what data types work well out of the box and if we have to add in some special case processing to make it work.

revans2 avatar Apr 10 '24 14:04 revans2