[Bug] Can not use delta lake optimized write
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Search before asking
- [x] I have searched in the issues and found no similar issues.
Describe the bug
When trying to use optimized write from delta lake with uniffle, FetchFailedException occurs.
Affects Version(s)
master/0.11.0
Uniffle Server Log Output
Nothing specific shows up in the server log.
Uniffle Engine Log Output
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (save at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$1(DeltaFileFormatWriter.scala:450)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:457)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$3(DeltaFileFormatWriter.scala:279)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: port out of range:99999
at java.base/java.net.InetSocketAddress.checkPort(InetSocketAddress.java:152)
at java.base/java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:263)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:155)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:131)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:194)
at org.apache.spark.sql.delta.perf.OptimizedWriterShuffleReader.read(DeltaOptimizedWriterExec.scala:291)
at org.apache.spark.sql.delta.perf.DeltaOptimizedWriterRDD.compute(DeltaOptimizedWriterExec.scala:268)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
... 11 more
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2031)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:267)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:300)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:236)
at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:216)
at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$2(TransactionalWrite.scala:456)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:414)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:372)
at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:247)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:243)
at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:236)
at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:233)
at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeFiles(WriteIntoDelta.scala:377)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeAndReturnCommitData(WriteIntoDelta.scala:340)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:108)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:241)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:103)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:204)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:358)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840)
Uniffle Server Configurations
It is from the docker-compose in the repo.
Uniffle Engine Configurations
The spark job is started with following command:
docker exec -it rss-spark-master-1 /opt/spark/bin/pyspark \
--master spark://rss-spark-master-1:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
--conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
--conf spark.rss.storage.type=MEMORY_LOCALFILE \
--conf spark.speculation=true \
--packages io.delta:delta-spark_2.12:3.3.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf spark.jars.ivy=/tmp/
Additional context
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
The error stack doesn't use Uniffle.
Indeed, it is hard to see from error stack itself whats going on here. That's the actual problem here. In the grand scheme of things, stacktrace doesn't matter. Point of focus is really that both uniffle and delta expect things to work differently.
There are two gaps: 2. Uniffle uses DUMMY_HOST and DUMMY_PORT for the blocks making it hard for Delta to actually determine where the blocks are. 3. Delta uses its own reader to read the blocks. Even if uniffle pointed at right block locations, it wouldn't really work. That is because uniffle has a custom reader and writer implementation for shuffle blocks.
There are improvements that need to happen in both uniffle and delta, these are a bit tangential to the exact .
- Delta needs to allow for a custom reader to be plugged in.
- Uniffle needs to have a way to publish sizes of partitions (and also blocks of super huge partitions).
These two are enough to optimize the subsequent stages, in this case the stage that writes the files of certain size.
Indeed, it is hard to see from error stack itself whats going on here. That's the actual problem here. In the grand scheme of things, stacktrace doesn't matter. Point of focus is really that both uniffle and delta expect things to work differently.
There are two gaps: 2. Uniffle uses DUMMY_HOST and DUMMY_PORT for the blocks making it hard for Delta to actually determine where the blocks are. 3. Delta uses its own reader to read the blocks. Even if uniffle pointed at right block locations, it wouldn't really work. That is because uniffle has a custom reader and writer implementation for shuffle blocks.
There are improvements that need to happen in both uniffle and delta, these are a bit tangential to the exact .
- Delta needs to allow for a custom reader to be plugged in.
- Uniffle needs to have a way to publish sizes of partitions (and also blocks of super huge partitions).
These two are enough to optimize the subsequent stages, in this case the stage that writes the files of certain size.
Actually Deltalake reader is a datasource reader. It shouldn't read the shuffle data. But the code seems that Deltalake implement a shuffle reader. It's hard to support it. If this feature is optional, you can try to avoid using it.
Its a must have feature and can't be disabled or rather left on the table.
My proposal is to meet in the middle. Uniffle will expose more data on shuffle blocks and partitions. Deltalake will expose a way to configure a custom class for their reader + RDD implementation. Both of these are sufficient to make a decision on the optimization, mostly binning.
Its a must have feature and can't be disabled or rather left on the table.
My proposal is to meet in the middle. Uniffle will expose more data on shuffle blocks and partitions. Deltalake will expose a way to configure a custom class for their reader + RDD implementation. Both of these are sufficient to make a decision on the optimization, mostly binning.
Does Deltalake have the interface? If you have interest, you can implement it. I can give you some guidance about Uniffle.
Thats great to hear, we can then make a proposal for deltalake.
Lets see what the delta community has to say: https://github.com/delta-io/delta/issues/4343