spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] On SPARK-3.2.1 : java.lang.ClassCastException

Open lagarantie opened this issue 3 years ago • 12 comments

Hi,

I have a random bug when i do a groupBy after join Problem :

val c = a.join(b)

val d= c.groupBy(...).agg(sum(...)
println(d.cache.count()) => java.lang.ClassCastException 

How I Fixed the problem in my side :

val c = a.join(b)
.cache()

println(c.count())

val d= c.groupBy(...).agg(sum(...)
println(d.cache.count())

The Full Stack :

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:241)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:276)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:81)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
	at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)
	at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)
	at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:398)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3210)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3209)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3209)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:1)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:64)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:66)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:68)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:70)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:72)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:74)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:76)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:78)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw.<init>(command-1409292377896938:80)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw.<init>(command-1409292377896938:82)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw.<init>(command-1409292377896938:84)
	at $line2950148436704909b7ddd289d0721a90163.$read.<init>(command-1409292377896938:86)
	at $line2950148436704909b7ddd289d0721a90163.$read$.<init>(command-1409292377896938:90)
	at $line2950148436704909b7ddd289d0721a90163.$read$.<clinit>(command-1409292377896938)
	at $line2950148436704909b7ddd289d0721a90163.$eval$.$print$lzycompute(<notebook>:7)
	at $line2950148436704909b7ddd289d0721a90163.$eval$.$print(<notebook>:6)
	at $line2950148436704909b7ddd289d0721a90163.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:999)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:952)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException 

Thanks for your help

lagarantie avatar Oct 07 '22 20:10 lagarantie

Do you have the stack trace from the Caused by here? Should be below the ClassCastException line you have above.

Was the Rapids for Spark plugin enabled here and if you disable the plugin it the same code work?

tgravescs avatar Oct 07 '22 20:10 tgravescs

hi @lagarantie can you provide details about the environment?

  1. plugin version
  2. repro steps for launching (looks like a Databricks Notebook)

Do you see the stacktrace of the "Caused by: " Exception, that would also help pinpoint the issue

gerashegalov avatar Oct 07 '22 20:10 gerashegalov

Hi @tgravescs, @gerashegalov Here is more informations for you :)

Configuration:

  • Spark 3.2.1, Scala 2.12 on Databricks
  • Rapids 22.08.0
  • Cluster : Standard_NC16as_T4_v3 * 4

The notebook work on CPU without rapids

The stack is not always the same. I dont always have the stack below the ClassCastException. But i found the the most complete :

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:241)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:276)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:81)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
	at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)
	at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)
	at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:398)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3210)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3209)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3209)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:1)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:64)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:66)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:68)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:70)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:72)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:74)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:76)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:78)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw.<init>(command-1409292377896931:80)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw.<init>(command-1409292377896931:82)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw.<init>(command-1409292377896931:84)
	at $line6556f2073b1543b48de131a32ec3144f227.$read.<init>(command-1409292377896931:86)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$.<init>(command-1409292377896931:90)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$.<clinit>(command-1409292377896931)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval$.$print$lzycompute(<notebook>:7)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval$.$print(<notebook>:6)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:999)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:952)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.nvidia.spark.rapids.GpuScalar cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$3(aggregate.scala:517)
	at scala.collection.immutable.List.map(List.scala:297)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$2(aggregate.scala:513)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.withResource(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$1(aggregate.scala:512)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.withResource(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.finalProjectBatch(aggregate.scala:510)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:262)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:241)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:187)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:238)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:215)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:255)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at com.nvidia.spark.rapids.RowToColumnarIterator.hasNext(GpuRowToColumnarExec.scala:605)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:239)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:187)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:238)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:215)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:255)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:120)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
22/10/07 18:50:26 ERROR Uncaught throwable from user code: org.apache.spark.SparkException: Job aborted due to stage failure: Task 384 in stage 947.0 failed 4 times, most recent failure: Lost task 384.3 in stage 947.0 (TID 93449) (10.139.64.7 executor 2): java.lang.ClassCastException

I cant give you the notebook, it's confidential data. but can give you a part of the code ?

lagarantie avatar Oct 08 '22 10:10 lagarantie

@lagarantie a couple of things would help (if you can provide):

  1. The .explain output of d.cache (d.cache.explain). It should give us the full DAG to look at.

  2. The schema. I'd like to know what a and b look like, which column (and what type) we are joining on, and what are we summing.

  3. Have you tried removing .cache? We are curious to know if this is specific to persisting the dataframe, and if without .cache you'd see a normal plan. Diffing the .explain of the non-cached equivalent would be great (so getting the .explain output of the non-cached code, if it works, would be great).

  4. What databricks runtime is this? Is it 10.4?

Thanks for the help so far! And please make sure no confidential field names are revealed in the output above.

abellina avatar Oct 10 '22 13:10 abellina

Note I tried the following example on databricks 10.4 ML runtime with the 22.08 plugin, but I don't see the class cast exception:

import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(1,2,3,4,5,6)).toDF
val df2 = sc.parallelize(Seq(1,2,3,4,5,6)).toDF
val c = df.join(df2).cache()
val d = c.groupBy(df.col("value")).agg(sum(df2.col("value")))
println(d.cache.count())

So my guess is that something is going on with one of the types you are joining. I tried a few other aggregate types and used df.value vs df2.value, but it's not happening for me.

abellina avatar Oct 10 '22 18:10 abellina

To be clear I don't think that the issue has anything to do with caching.

https://github.com/NVIDIA/spark-rapids/blob/7e4324c6f117e9555a8bb427205562f296591003/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala#L517-L522

is the area of the code that is failing.

This code assumes that calling columnarEval will always return a GpuColumnVector. This is not true. There are cases, like with coalesce, where a scalar value could be returned. If we repalce this code with a GpuProjectExec.apply, I think it would fix the problem.

revans2 avatar Oct 10 '22 18:10 revans2

Hello,

I work on Databricks 10.4 LTS I tried to remove the cache on the DataFrame c but the bug appears.

You will find attached the Dags when i cache and dont cache the dataframe c and the Schema (the Dag is big, i have multiple transformations before)

@revans2 you are maybe in the goodway !

Thank you for your help !!

DAG_Only_DataFrame_C_And_D_Cache(NoBug).txt DAG_Only_DataFrame_D_Cache(Bug).txt Schema_Dataframe_C.txt

lagarantie avatar Oct 10 '22 18:10 lagarantie

@lagarantie thanks for the files above. I see that DAG_Only_DataFrame_C_And_D_Cache(NoBug).txt and DAG_Only_DataFrame_D_Cache(Bug).txt are the same (no diffs). Is that the case? Or was that an issue pasting it here?

At this stage I am not able to replicate this, I am hoping that there is a difference between the NoBug and Bug dags that we can use to triage this. That said, I agree with @revans2 that we can make this handle scalars, but I think we are looking to repro it locally to understand what is going on here.

abellina avatar Oct 10 '22 19:10 abellina

To be sure, i send you an another Dag where there is no bug. (i'm sure abore the bug one) The cluster has been restarted between

You will find attached for more help the log4J output with all the errors in the executors.

Thank you again for your help !!!

DAG_Only_DataFrame_C_And_D_Cache(NoBug)_Cluster_Restarted.txt Log4jOutput.txt

lagarantie avatar Oct 10 '22 21:10 lagarantie

@lagarantie I have put up a patch and merged it into the 22.10 release that should fix the issue you are seeing. It fixes all of the places where we computed an expression and assumed that the output could not be a Scalar without any checks to fall back to the CPU beforehand. But we have not been able to reproduce the problem on our end. Are you okay if I close this issue and you can reopen it if we didn't fully fix it? Would you rather wait until you have verified it? Is there a good way to send you a snapshot jar that you can test out? If so feel free to reach out to me at bobby at apache dot org.

revans2 avatar Oct 13 '22 13:10 revans2

Feel free to send an email to [email protected] as well, if any further questions.

sameerz avatar Oct 13 '22 16:10 sameerz

Hi @revans2,

Thank you very much !

I was not able to reproduce the bug in my side outside the notebook.... As you said, before closing, i prefer to test the jar. I contact you in private !

lagarantie avatar Oct 13 '22 18:10 lagarantie

@lagarantie have you had a chance to test it? Can I close this issue?

revans2 avatar Oct 18 '22 20:10 revans2

Hi @revans2, I was going to write you ! I have test it, I don't have the bug on my side anymore.

Thank you for your efficiency :)

lagarantie avatar Oct 18 '22 21:10 lagarantie