azure-sdk-for-java
azure-sdk-for-java copied to clipboard
[BUG] Random cast error while writting to CosmosDB with pyspark
Describe the bug I have some pyspark code that loads a csv file and appens the data into a CosmosDB container. Sometime I get an error, I can rerun the code and it can work even if the data or the cluster did not change.
Before I was using Databricks Runtime 10.4 TLS with an other version of cosmosdb library into the cluster. And I never had this issue. The issue started to occure only when I've updated databricks runtime to 12.2 TLS + azure-cosmos-spark_3-3_2-12:4.30.0
Exception or Stack Trace
An error occurred while calling o76.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 87 in stage 9.0 failed 4 times, most recent failure: Lost task 87.3 in stage 9.0 (TID 197) (10.34.208.176 executor 14): java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:97)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3440)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3362)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3351)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3351)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1460)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1460)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3651)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3589)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3577)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1209)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1197)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2758)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2741)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:428)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:404)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:290)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:383)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:382)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:290)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:47)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:54)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:256)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:165)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:256)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:258)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:448)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:203)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1073)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:398)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:255)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:238)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:244)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:244)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:189)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:305)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:964)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:346)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
at com.databricks.service.DataFrameWriteCommand.run(DataFrameWriteCommand.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:256)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:165)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:256)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:258)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:448)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:203)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1073)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:131)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:398)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:255)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:238)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:251)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:106)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:244)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:244)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:189)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:305)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:310)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:307)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:543)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:751)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:543)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:560)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:553)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at com.databricks.service.SparkServiceImpl$.$anonfun$executePlan$2(SparkServiceImpl.scala:121)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:61)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:60)
at com.databricks.service.SparkServiceImpl$.logInfo(SparkServiceImpl.scala:58)
at com.databricks.service.SparkServiceImpl$.$anonfun$executePlan$1(SparkServiceImpl.scala:121)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:560)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:657)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:678)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:652)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:569)
at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:560)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:528)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:68)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:150)
at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:109)
at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
at com.databricks.service.SparkServiceImpl$.super$recordOperation(SparkServiceImpl.scala:89)
at com.databricks.service.SparkServiceImpl$.$anonfun$recordOperation$4(SparkServiceImpl.scala:99)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:414)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:158)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:412)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:409)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:457)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:442)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:495)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493)
at com.databricks.service.SparkServiceImpl$.withAttributionTags(SparkServiceImpl.scala:58)
at com.databricks.service.SparkServiceImpl$.recordOperation(SparkServiceImpl.scala:99)
at com.databricks.service.SparkServiceImpl$.executePlan(SparkServiceImpl.scala:119)
at com.databricks.service.SparkServiceRPCHandler.execute0(SparkServiceRPCHandler.scala:697)
at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC0$1(SparkServiceRPCHandler.scala:500)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.service.SparkServiceRPCHandler.executeRPC0(SparkServiceRPCHandler.scala:372)
at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:323)
at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:309)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC$1(SparkServiceRPCHandler.scala:359)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.service.SparkServiceRPCHandler.executeRPC(SparkServiceRPCHandler.scala:336)
at com.databricks.service.SparkServiceRPCServlet.doPost(SparkServiceRPCServer.scala:167)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:523)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:550)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:190)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots cannot be cast to com.azure.cosmos.spark.CosmosClientMetadataCachesSnapshots
at com.azure.cosmos.spark.CosmosWriterBase.<init>(CosmosWriterBase.scala:39)
at com.azure.cosmos.spark.CosmosWriter.<init>(CosmosWriter.scala:33)
at com.azure.cosmos.spark.ItemsDataWriteFactory.createWriter(ItemsDataWriteFactory.scala:51)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:432)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:179)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:126)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:97)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1740)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Code Snippet
cfg = {
"spark.cosmos.accountEndpoint": f"https://{cosmosdb_name}.documents.azure.com:443/",
"spark.cosmos.accountKey": cosmosdb.primary_master_key,
"spark.cosmos.database": database_name,
"spark.cosmos.container": table_name,
}
spark.conf.set(
"spark.cosmos.throughputControl.globalControl.database", database_name
)
spark.conf.set("spark.cosmos.throughputControl.enabled", "true")
spark.conf.set(
"spark.cosmos.throughputControl.name", "SourceContainerThroughputControl"
)
spark.conf.set("spark.cosmos.throughputControl.targetThroughputThreshold", "0.95")
data = (
(
spark.read.format("csv")
.options(header="True", inferSchema="True", delimiter=";")
.load(spark_file_path)
)
.withColumnRenamed("my_col", "id")
.na.drop(subset=["id"])
)
data = data.withColumn("id", data["id"].cast("string"))
data = data.dropDuplicates(["id"])
data.write.format("cosmos.oltp").options(**cfg).mode("APPEND").save()
Expected behavior No errors or a retry since it does not fail all the time. The data are made of strings, doubles and nans.
Setup:
- OS: WSL 1 or Azure Machine Learning Compute Instances/Clusters (any size), windows 10.
- Library/Libraries: azure-cosmos-spark_3-3_2-12:4.30.0
- Java version: openjdk 19.0.1 2022-10-18 OpenJDK Runtime Environment (build 19.0.1+10-21) OpenJDK 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)
- App Server/Environment: Databricks cluster 12.2 LTS
- Spark Config: spark.sql.catalog.cosmosCatalog com.azure.cosmos.spark.CosmosCatalog spark.jars.packages com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.30.0