spark-rapids
                                
                                 spark-rapids copied to clipboard
                                
                                    spark-rapids copied to clipboard
                            
                            
                            
                        [BUG] On SPARK-3.2.1 : java.lang.ClassCastException
Hi,
I have a random bug when i do a groupBy after join Problem :
val c = a.join(b)
val d= c.groupBy(...).agg(sum(...)
println(d.cache.count()) => java.lang.ClassCastException 
How I Fixed the problem in my side :
val c = a.join(b)
.cache()
println(c.count())
val d= c.groupBy(...).agg(sum(...)
println(d.cache.count())
The Full Stack :
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:241)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:276)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:81)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
	at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)
	at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)
	at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:398)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3210)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3209)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3209)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:1)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:64)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:66)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:68)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:70)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:72)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:74)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:76)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw$$iw.<init>(command-1409292377896938:78)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw$$iw.<init>(command-1409292377896938:80)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw$$iw.<init>(command-1409292377896938:82)
	at $line2950148436704909b7ddd289d0721a90163.$read$$iw.<init>(command-1409292377896938:84)
	at $line2950148436704909b7ddd289d0721a90163.$read.<init>(command-1409292377896938:86)
	at $line2950148436704909b7ddd289d0721a90163.$read$.<init>(command-1409292377896938:90)
	at $line2950148436704909b7ddd289d0721a90163.$read$.<clinit>(command-1409292377896938)
	at $line2950148436704909b7ddd289d0721a90163.$eval$.$print$lzycompute(<notebook>:7)
	at $line2950148436704909b7ddd289d0721a90163.$eval$.$print(<notebook>:6)
	at $line2950148436704909b7ddd289d0721a90163.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:999)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:952)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException 
Thanks for your help
Do you have the stack trace from the Caused by here? Should be below the ClassCastException line you have above.
Was the Rapids for Spark plugin enabled here and if you disable the plugin it the same code work?
hi @lagarantie can you provide details about the environment?
- plugin version
- repro steps for launching (looks like a Databricks Notebook)
Do you see the stacktrace of the "Caused by: " Exception, that would also help pinpoint the issue
Hi @tgravescs, @gerashegalov Here is more informations for you :)
Configuration:
- Spark 3.2.1, Scala 2.12 on Databricks
- Rapids 22.08.0
- Cluster : Standard_NC16as_T4_v3 * 4
The notebook work on CPU without rapids
The stack is not always the same. I dont always have the stack below the ClassCastException. But i found the the most complete :
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3029)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2970)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2970)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1390)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1390)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3238)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3167)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1152)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2651)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:241)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:276)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:81)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:87)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
	at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
	at org.apache.spark.sql.execution.ResultCacheManager.collectResult$1(ResultCacheManager.scala:575)
	at org.apache.spark.sql.execution.ResultCacheManager.computeResult(ResultCacheManager.scala:582)
	at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:528)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:527)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:424)
	at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:403)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:422)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:398)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3210)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3209)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3951)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3949)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3209)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:1)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:64)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:66)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:68)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:70)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:72)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:74)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:76)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw$$iw.<init>(command-1409292377896931:78)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw$$iw.<init>(command-1409292377896931:80)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw$$iw.<init>(command-1409292377896931:82)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$$iw.<init>(command-1409292377896931:84)
	at $line6556f2073b1543b48de131a32ec3144f227.$read.<init>(command-1409292377896931:86)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$.<init>(command-1409292377896931:90)
	at $line6556f2073b1543b48de131a32ec3144f227.$read$.<clinit>(command-1409292377896931)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval$.$print$lzycompute(<notebook>:7)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval$.$print(<notebook>:6)
	at $line6556f2073b1543b48de131a32ec3144f227.$eval.$print(<notebook>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:225)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:999)
	at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:952)
	at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$12(DriverLocal.scala:631)
	at com.databricks.logging.Log4jUsageLoggingShim$.$anonfun$withAttributionContext$1(Log4jUsageLoggingShim.scala:33)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:94)
	at com.databricks.logging.Log4jUsageLoggingShim$.withAttributionContext(Log4jUsageLoggingShim.scala:31)
	at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:205)
	at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:204)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:59)
	at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:240)
	at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:225)
	at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:59)
	at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:608)
	at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:615)
	at scala.util.Try$.apply(Try.scala:213)
	at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:607)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommandAndGetError(DriverWrapper.scala:526)
	at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:561)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:431)
	at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:374)
	at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:225)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.nvidia.spark.rapids.GpuScalar cannot be cast to com.nvidia.spark.rapids.GpuColumnVector
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$3(aggregate.scala:517)
	at scala.collection.immutable.List.map(List.scala:297)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$2(aggregate.scala:513)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.withResource(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$finalProjectBatch$1(aggregate.scala:512)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.withResource(aggregate.scala:181)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.finalProjectBatch(aggregate.scala:510)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:262)
	at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:241)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:187)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:238)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:215)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:255)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at com.nvidia.spark.rapids.RowToColumnarIterator.hasNext(GpuRowToColumnarExec.scala:605)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:239)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:187)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:238)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:215)
	at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:255)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:120)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
22/10/07 18:50:26 ERROR Uncaught throwable from user code: org.apache.spark.SparkException: Job aborted due to stage failure: Task 384 in stage 947.0 failed 4 times, most recent failure: Lost task 384.3 in stage 947.0 (TID 93449) (10.139.64.7 executor 2): java.lang.ClassCastException
I cant give you the notebook, it's confidential data. but can give you a part of the code ?
@lagarantie a couple of things would help (if you can provide):
- 
The .explainoutput ofd.cache(d.cache.explain). It should give us the full DAG to look at.
- 
The schema. I'd like to know what aandblook like, which column (and what type) we are joining on, and what are we summing.
- 
Have you tried removing .cache? We are curious to know if this is specific to persisting the dataframe, and if without.cacheyou'd see a normal plan. Diffing the.explainof the non-cached equivalent would be great (so getting the.explainoutput of the non-cached code, if it works, would be great).
- 
What databricks runtime is this? Is it 10.4? 
Thanks for the help so far! And please make sure no confidential field names are revealed in the output above.
Note I tried the following example on databricks 10.4 ML runtime with the 22.08 plugin, but I don't see the class cast exception:
import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(1,2,3,4,5,6)).toDF
val df2 = sc.parallelize(Seq(1,2,3,4,5,6)).toDF
val c = df.join(df2).cache()
val d = c.groupBy(df.col("value")).agg(sum(df2.col("value")))
println(d.cache.count())
So my guess is that something is going on with one of the types you are joining. I tried a few other aggregate types and used df.value vs df2.value, but it's not happening for me.
To be clear I don't think that the issue has anything to do with caching.
https://github.com/NVIDIA/spark-rapids/blob/7e4324c6f117e9555a8bb427205562f296591003/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala#L517-L522
is the area of the code that is failing.
This code assumes that calling columnarEval will always return a GpuColumnVector. This is not true. There are cases, like with coalesce, where a scalar value could be returned. If we repalce this code with a GpuProjectExec.apply, I think it would fix the problem.
Hello,
I work on Databricks 10.4 LTS I tried to remove the cache on the DataFrame c but the bug appears.
You will find attached the Dags when i cache and dont cache the dataframe c and the Schema (the Dag is big, i have multiple transformations before)
@revans2 you are maybe in the goodway !
Thank you for your help !!
DAG_Only_DataFrame_C_And_D_Cache(NoBug).txt DAG_Only_DataFrame_D_Cache(Bug).txt Schema_Dataframe_C.txt
@lagarantie thanks for the files above. I see that DAG_Only_DataFrame_C_And_D_Cache(NoBug).txt and DAG_Only_DataFrame_D_Cache(Bug).txt are the same (no diffs). Is that the case? Or was that an issue pasting it here?
At this stage I am not able to replicate this, I am hoping that there is a difference between the NoBug and Bug dags that we can use to triage this. That said, I agree with @revans2 that we can make this handle scalars, but I think we are looking to repro it locally to understand what is going on here.
To be sure, i send you an another Dag where there is no bug. (i'm sure abore the bug one) The cluster has been restarted between
You will find attached for more help the log4J output with all the errors in the executors.
Thank you again for your help !!!
DAG_Only_DataFrame_C_And_D_Cache(NoBug)_Cluster_Restarted.txt Log4jOutput.txt
@lagarantie I have put up a patch and merged it into the 22.10 release that should fix the issue you are seeing. It fixes all of the places where we computed an expression and assumed that the output could not be a Scalar without any checks to fall back to the CPU beforehand. But we have not been able to reproduce the problem on our end. Are you okay if I close this issue and you can reopen it if we didn't fully fix it? Would you rather wait until you have verified it? Is there a good way to send you a snapshot jar that you can test out? If so feel free to reach out to me at bobby at apache dot org.
Feel free to send an email to [email protected] as well, if any further questions.
Hi @revans2,
Thank you very much !
I was not able to reproduce the bug in my side outside the notebook.... As you said, before closing, i prefer to test the jar. I contact you in private !
@lagarantie have you had a chance to test it? Can I close this issue?
Hi @revans2, I was going to write you ! I have test it, I don't have the bug on my side anymore.
Thank you for your efficiency :)