[Bug] Excuting spark core spark.sql("add jar path/xxx") will occur `ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoade`
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Search before asking
- [X] I have searched in the issues and found no similar issues.
Describe the bug
Based on #2471 #2475 Excuting spark core spark.sql("add jar path/xxx") will occur ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoade
My Spark code api
spark.sql("add jar file:/xxxxx.jar") import org.apache.spark.sql.{SaveMode, SparkSession}
import com.datastax.spark.connector.cql.CassandraConnectorConf import org.apache.spark.sql.cassandra._ spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000)) val writeDf = spark.read.parquet("xxxx") writeDf.printSchema() val cassandraMap = Map(table -> "xxxx", keyspace -> " xxxx") writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
Affects Version(s)
master
Kyuubi Server Log Output
No response
Kyuubi Engine Log Output
writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
... 57 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoader@1c852c0f
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 85 more
at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
at org.apache.kyuubi.engine.spark.operation.ExecuteScala.$anonfun$runInternal$1(ExecuteScala.scala:84)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:88)
at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:60)
at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:133)
at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:94)
at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:66)
at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:124)
at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:75)
at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:121)
at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:66)
at org.apache.kyuubi.service.TFrontendService.ExecuteStatement(TFrontendService.scala:232)
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
org.apache.kyuubi.KyuubiSQLException: Interpret error:
writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:388)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
... 57 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ClassNotFound with classloader: org.apache.spark.util.MutableURLClassLoader@1c852c0f
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:357)
... 85 more
at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
at org.apache.kyuubi.engine.spark.operation.ExecuteScala.$anonfun$runInternal$1(ExecuteScala.scala:84)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:88)
at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:60)
Kyuubi Server Configurations
No response
Kyuubi Engine Configurations
No response
Additional context
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
cc @iodone, can you take a look at this?
Yes, I'm following up on this.
I tried to reproduce the problem with the following example:
With Beeline:
SET kyuubi.operation.language=scala;
spark.sql("add jar file:/Users/work/tmp/jar-test/target/spark_to_cassandra_jars-1.0-SNAPSHOT.jar");
import org.apache.spark.sql.{SaveMode, SparkSession};
import com.datastax.spark.connector.cql.CassandraConnectorConf;
import org.apache.spark.sql.cassandra._;
spark.setCassandraConf(CassandraConnectorConf.KeepAliveMillisParam.option(10000));
val writeDf = spark.read.parquet("/Users/work/tmp/test.parquet");
val x = writeDf.limit(10);
result.set(x);
val cassandraMap = Map("table" -> "test", "keyspace" -> "store");
val df = spark.read.format("org.apache.spark.sql.cassandra").options(cassandraMap).load;
val y = df.limit(10);
result.set(y);
writeDf.write.format("org.apache.spark.sql.cassandra").options(cassandraMap).mode(SaveMode.Append).save()
Tested cassandra read and write, everything works well.

cc @FishManHome