incubator-uniffle icon indicating copy to clipboard operation
incubator-uniffle copied to clipboard

[Bug] Can not use delta lake optimized write

Open santosh-d3vpl3x opened this issue 9 months ago • 7 comments

Code of Conduct

Search before asking

  • [x] I have searched in the issues and found no similar issues.

Describe the bug

When trying to use optimized write from delta lake with uniffle, FetchFailedException occurs.

Affects Version(s)

master/0.11.0

Uniffle Server Log Output

Nothing specific shows up in the server log.

Uniffle Engine Log Output

org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (save at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
	at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$1(DeltaFileFormatWriter.scala:450)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:457)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$3(DeltaFileFormatWriter.scala:279)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: port out of range:99999
	at java.base/java.net.InetSocketAddress.checkPort(InetSocketAddress.java:152)
	at java.base/java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:263)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:155)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:131)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
	at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:194)
	at org.apache.spark.sql.delta.perf.OptimizedWriterShuffleReader.read(DeltaOptimizedWriterExec.scala:291)
	at org.apache.spark.sql.delta.perf.DeltaOptimizedWriterRDD.compute(DeltaOptimizedWriterExec.scala:268)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	... 11 more

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2031)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:267)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:300)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:236)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:216)
	at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$2(TransactionalWrite.scala:456)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:414)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:372)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:247)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:243)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:236)
	at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:233)
	at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeFiles(WriteIntoDelta.scala:377)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeAndReturnCommitData(WriteIntoDelta.scala:340)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:108)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:241)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:103)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:204)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:358)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)

Uniffle Server Configurations

It is from the docker-compose in the repo.

Uniffle Engine Configurations

The spark job is started with following command:

docker exec -it rss-spark-master-1 /opt/spark/bin/pyspark \
  --master spark://rss-spark-master-1:7077 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
  --conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
  --conf spark.rss.storage.type=MEMORY_LOCALFILE \
  --conf spark.speculation=true \
  --packages io.delta:delta-spark_2.12:3.3.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf spark.jars.ivy=/tmp/

Additional context

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

santosh-d3vpl3x avatar Mar 15 '25 21:03 santosh-d3vpl3x

The error stack doesn't use Uniffle.

jerqi avatar Mar 26 '25 11:03 jerqi

Indeed, it is hard to see from error stack itself whats going on here. That's the actual problem here. In the grand scheme of things, stacktrace doesn't matter. Point of focus is really that both uniffle and delta expect things to work differently.

There are two gaps: 2. Uniffle uses DUMMY_HOST and DUMMY_PORT for the blocks making it hard for Delta to actually determine where the blocks are. 3. Delta uses its own reader to read the blocks. Even if uniffle pointed at right block locations, it wouldn't really work. That is because uniffle has a custom reader and writer implementation for shuffle blocks.

There are improvements that need to happen in both uniffle and delta, these are a bit tangential to the exact .

  1. Delta needs to allow for a custom reader to be plugged in.
  2. Uniffle needs to have a way to publish sizes of partitions (and also blocks of super huge partitions).

These two are enough to optimize the subsequent stages, in this case the stage that writes the files of certain size.

santosh-d3vpl3x avatar Mar 26 '25 12:03 santosh-d3vpl3x

Indeed, it is hard to see from error stack itself whats going on here. That's the actual problem here. In the grand scheme of things, stacktrace doesn't matter. Point of focus is really that both uniffle and delta expect things to work differently.

There are two gaps: 2. Uniffle uses DUMMY_HOST and DUMMY_PORT for the blocks making it hard for Delta to actually determine where the blocks are. 3. Delta uses its own reader to read the blocks. Even if uniffle pointed at right block locations, it wouldn't really work. That is because uniffle has a custom reader and writer implementation for shuffle blocks.

There are improvements that need to happen in both uniffle and delta, these are a bit tangential to the exact .

  1. Delta needs to allow for a custom reader to be plugged in.
  2. Uniffle needs to have a way to publish sizes of partitions (and also blocks of super huge partitions).

These two are enough to optimize the subsequent stages, in this case the stage that writes the files of certain size.

Actually Deltalake reader is a datasource reader. It shouldn't read the shuffle data. But the code seems that Deltalake implement a shuffle reader. It's hard to support it. If this feature is optional, you can try to avoid using it.

jerqi avatar Mar 26 '25 12:03 jerqi

Its a must have feature and can't be disabled or rather left on the table.

My proposal is to meet in the middle. Uniffle will expose more data on shuffle blocks and partitions. Deltalake will expose a way to configure a custom class for their reader + RDD implementation. Both of these are sufficient to make a decision on the optimization, mostly binning.

santosh-d3vpl3x avatar Mar 26 '25 13:03 santosh-d3vpl3x

Its a must have feature and can't be disabled or rather left on the table.

My proposal is to meet in the middle. Uniffle will expose more data on shuffle blocks and partitions. Deltalake will expose a way to configure a custom class for their reader + RDD implementation. Both of these are sufficient to make a decision on the optimization, mostly binning.

Does Deltalake have the interface? If you have interest, you can implement it. I can give you some guidance about Uniffle.

jerqi avatar Mar 27 '25 02:03 jerqi

Thats great to hear, we can then make a proposal for deltalake.

santosh-d3vpl3x avatar Mar 27 '25 10:03 santosh-d3vpl3x

Lets see what the delta community has to say: https://github.com/delta-io/delta/issues/4343

santosh-d3vpl3x avatar Mar 29 '25 13:03 santosh-d3vpl3x