spark-rapids
spark-rapids copied to clipboard
[FEA] Rework GpuSubstringIndex to use cudf::slice_strings
When GpuSubstringIndex, cudf didn't have support for something like this. We filed https://github.com/rapidsai/cudf/issues/5158 and it got implemented in cuDF a long time ago, but we haven't gone back to GpuSubstringIndex and used the new api :(
The current implementation in the plugin relies on a regular expression that takes into account delim and count, but it doesn't work for delimiters that are multi character for example, and we throw in this case (which is good). That said, it would be great to move to the cuDF version of this.
Note that we need to make sure have unit and integration tests for this, especially showing that we could support multi character delimiters after the change to cudf::slice_strings.
A Spark UT failed related to substring_index:
'Rapids - string substring_index function' offload to RAPIDS
- Rapids - string substring_index function *** FAILED ***
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 705.0 failed 1 times, most recent failure: Lost task 1.0 in stage 705.0 (TID 1411) (spark-haoyang executor driver): java.lang.AssertionError: value at 0 is null
at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
at org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatchRow.java:126)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
...
Cause: java.lang.AssertionError: value at 0 is null
at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
at org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatchRow.java:126)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
It is from following case:
SubstringIndex(Literal(""), Literal("."), Literal(-2))
Spark-shell reproduce:
scala> Seq("").toDF.write.mode("overwrite").parquet("TEMP")
24/05/30 07:47:38 WARN GpuOverrides:
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHadoopFsRelationCommand> will run on GPU
! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
@Expression <AttributeReference> value#1 could run on GPU
scala> val df = spark.read.parquet("TEMP")
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.selectExpr("value", "substring_index(value, '.', -2)").show()
24/05/30 07:48:21 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> substring_index(value#6, ., -2) AS substring_index(value, ., -2)#15 will run on GPU
*Expression <SubstringIndex> substring_index(value#6, ., -2) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
+-----+-----------------------------+
|value|substring_index(value, ., -2)|
+-----+-----------------------------+
| | null|
+-----+-----------------------------+
scala>
scala> spark.conf.set("spark.rapids.sql.enabled", "false")
scala> df.selectExpr("value", "substring_index(value, '.', -2)").show()
+-----+-----------------------------+
|value|substring_index(value, ., -2)|
+-----+-----------------------------+
| | |
+-----+-----------------------------+
Since GpuSubstringIndex needs reworking, I think we can fix this Spark UT by reworking instead of fixing on the original way.
https://github.com/rapidsai/cudf/pull/13373 cudf::strings::slice_strings has been depracated here.
To fix this issue, we can either re-enable cudf::strings::slice_strings, or enhence the current regex-dependent way.
https://github.com/rapidsai/cudf/pull/13373#issuecomment-2168287320
cudf::strings::find()/rfind() is supposed to be called multiple times in order to find the right position. Now I prefer to put logics a new kernel substringIndex.
closed by https://github.com/NVIDIA/spark-rapids/pull/11149 and https://github.com/NVIDIA/spark-rapids-jni/pull/2205