spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[FEA] Rework GpuSubstringIndex to use cudf::slice_strings

Open abellina opened this issue 2 years ago • 4 comments

When GpuSubstringIndex, cudf didn't have support for something like this. We filed https://github.com/rapidsai/cudf/issues/5158 and it got implemented in cuDF a long time ago, but we haven't gone back to GpuSubstringIndex and used the new api :(

The current implementation in the plugin relies on a regular expression that takes into account delim and count, but it doesn't work for delimiters that are multi character for example, and we throw in this case (which is good). That said, it would be great to move to the cuDF version of this.

Note that we need to make sure have unit and integration tests for this, especially showing that we could support multi character delimiters after the change to cudf::slice_strings.

abellina avatar Jul 19 '23 13:07 abellina

A Spark UT failed related to substring_index:

'Rapids - string substring_index function' offload to RAPIDS
- Rapids - string substring_index function *** FAILED ***
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 705.0 failed 1 times, most recent failure: Lost task 1.0 in stage 705.0 (TID 1411) (spark-haoyang executor driver): java.lang.AssertionError:  value at 0 is null
	at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
	at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
	at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
	at org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatchRow.java:126)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
  at scala.Option.foreach(Option.scala:407)
  ...
  Cause: java.lang.AssertionError: value at 0 is null
  at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
  at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
  at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
  at org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatchRow.java:126)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:365)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)

It is from following case:

SubstringIndex(Literal(""), Literal("."), Literal(-2))

Spark-shell reproduce:

scala> Seq("").toDF.write.mode("overwrite").parquet("TEMP")
24/05/30 07:47:38 WARN GpuOverrides:
*Exec <DataWritingCommandExec> will run on GPU
  *Output <InsertIntoHadoopFsRelationCommand> will run on GPU
  ! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
    @Expression <AttributeReference> value#1 could run on GPU


scala> val df = spark.read.parquet("TEMP")
df: org.apache.spark.sql.DataFrame = [value: string]

scala> df.selectExpr("value", "substring_index(value, '.', -2)").show()
24/05/30 07:48:21 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> substring_index(value#6, ., -2) AS substring_index(value, ., -2)#15 will run on GPU
      *Expression <SubstringIndex> substring_index(value#6, ., -2) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

+-----+-----------------------------+
|value|substring_index(value, ., -2)|
+-----+-----------------------------+
|     |                         null|
+-----+-----------------------------+


scala>

scala> spark.conf.set("spark.rapids.sql.enabled", "false")

scala> df.selectExpr("value", "substring_index(value, '.', -2)").show()
+-----+-----------------------------+
|value|substring_index(value, ., -2)|
+-----+-----------------------------+
|     |                             |
+-----+-----------------------------+

Since GpuSubstringIndex needs reworking, I think we can fix this Spark UT by reworking instead of fixing on the original way.

thirtiseven avatar May 27 '24 03:05 thirtiseven

https://github.com/rapidsai/cudf/pull/13373 cudf::strings::slice_strings has been depracated here. To fix this issue, we can either re-enable cudf::strings::slice_strings, or enhence the current regex-dependent way.

Feng-Jiang28 avatar Jun 14 '24 03:06 Feng-Jiang28

https://github.com/rapidsai/cudf/pull/13373#issuecomment-2168287320

revans2 avatar Jun 14 '24 15:06 revans2

cudf::strings::find()/rfind() is supposed to be called multiple times in order to find the right position. Now I prefer to put logics a new kernel substringIndex.

Feng-Jiang28 avatar Jun 28 '24 03:06 Feng-Jiang28

closed by https://github.com/NVIDIA/spark-rapids/pull/11149 and https://github.com/NVIDIA/spark-rapids-jni/pull/2205

Feng-Jiang28 avatar Jul 29 '24 03:07 Feng-Jiang28