spark
spark copied to clipboard
[SPARK-39696][CORE] Ensure Concurrent r/w `TaskMetrics` not throw Exception
What changes were proposed in this pull request?
This PR changes the declaration type of TaskMetrics#externalAccums
from s.c.mutable.ArrayBuffer
to j.u.concurrent.CopyOnWriteArrayList
to ensure that errors described in SPARK-39696(java.util.ConcurrentModificationException: mutation occurred during iteration
) will not occur when TaskMetrics#externalAccums
is read and written concurrently
Why are the changes needed?
Bug fix.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Pass GitHub Actions and add a new test case.
The new case will fail regardless of the Scala version without changes of TaskMetrics
as follows:
Scala 2.12:
mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite
Before
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
2 did not equal 0 (TaskMetricsSuite.scala:274)
Run completed in 6 seconds, 980 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
After
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 7 seconds, 516 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
Scala 2.13
mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite -Pscala-2.13
Before
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
1339 did not equal 0 (TaskMetricsSuite.scala:275)
Run completed in 6 seconds, 714 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
After
Discovery starting.
Discovery completed in 6 seconds, 369 milliseconds.
Run starting. Expected test count is: 8
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 8 seconds, 434 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
The exceptions reported in SPARK-39696 are as follows:
2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?]
at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?]
at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?]
at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?]
at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?]
at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?]
at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?]
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
It seems to be a small probability event
Have not looked in detail, but there are a bunch of other places where externalAccums
is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?
Have not looked in detail, but there are a bunch of other places where
externalAccums
is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?
You're right. I need to rethink this problem, let me set this PR to draft first, thanks
It seems to be a small probability event
I'm finding the issue to be quite easy to reproduce. I have a 2 node Spark cluster with 3 executors per node (5 CPU/threads per executor). Since Friday I have ~150 DEAD executor instances due to this issue. Thanks for taking on the ticket.
It seems to be a small probability event
@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?
It seems to be a small probability event
@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?
I'm a novice with the Spark source code base and the Scala source code base but it looks to me that the scala.collection.mutable.MutationTracker class and scala.collection.mutable.CheckedIndexedSeqView class only exists in Scala 2.13 so I'm guessing its a case that with Java 17 JVM and Scala 2.13 that I'm running on a most restrictive runtime platform compared to the majority of Spark users
Yes. When running new test suite, ConcurrentModificationException
only occurs when using Scala 2.13. IndexOutOfBoundsException
or NPE
may occur when using Scala 2.12, but I did not encounter it in the production environment when using Scala 2.12
@mridulm Compared with analyzing each scenario and using read-write locks, I think it may be simpler to change externalAccums
to use a thread-safe data structure, for example CopyOnWriteArrayList
. Do you have any better suggestions?
I agree, given the usecase, CopyOnWriteArrayList
might be a good option LuciferYang
.
+CC @JoshRosen - in case you have thoughts on this.
@mridulm, thanks for the ping.
CopyOnWriteArrayList
supports single-threaded writes and concurrent reads.
It looks like externalAccums
is only written from registerAccumulator
, which is only called from TaskContext.registerAccumulator
, which is only called during AccumulatorV2
deserialization, which should only occur when deserializing the task binary at the beginning of Task
execution.
In the SPARK-39696 JIRA, it looks like the read is occurring in the executor's reportHeartBeat
function at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1042
As far as I know, taskRunner.task
will be null
initially and will be populated once the task is deserialized at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508
Assuming my above description is correct, I don't understand how concurrent reads and writes can occur: in normal operation I would expect that writes only occur during task binary deserialization and that reads from the heartbeat thread can only occur after deserialization has completed.
Perhaps accumulators are being deserialized and registered while the task is running (not just during task setup)? For example, maybe a case class is defined on the driver and its closure accidentally closes over something that contains an AccumulatorV2, causing us to deserialize AccumulatorV2 instances when deserializing data values. In the simple case, we'd have one writer thread (the task's main runner thread) racing with the heartbeat thread. In more complex cases, I can imagine scenarios where a single task could consist of multiple threads (e.g. for PySpark) and deserialization might happen in multiple of them in case of spill (e.g. during RDD aggregation). If this is the case then it might explain the race condition. If that's true, though, then I'm wondering whether we're registering different instances of the same accumulator multiple times in the same task (which seems like it would be a performance and correctness bug).
@JoshRosen
Yes, your analysis is very accurate. From the current stack, I can only infer that the following two methods may have racing (but I haven't found any conclusive evidence), so I added read-write locks to these two methods in the initial pr.
https://github.com/apache/spark/blob/66b1f79b72855af35351ff995492f2c13872dac5/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L257-L261
I didn't expect there would be correctness bug here, if this is really possible, I think the current clues are not enough to troubleshoot the problem.
@smcmullan-ncirl Cloud you provide more details to further investigate the issue? I want to know what the writing thread is?It seems that only you can stably reproduce this problem now :)
Thanks for the detailed analysis @JoshRosen - I agree with your analysis. I saw two cases where this could be happening -
- test code or user code calling spark-internal api,
- Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.
Agree, we should get more details on this before changing the type/other fixes.
Thanks for the detailed analysis @JoshRosen - I agree with your analysis. I saw two cases where this could be happening -
- test code or user code calling spark-internal api,
- Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.
Agree, we should get more details on this before changing the type/other fixes.
Agree +1, we need more details
Thanks for all the analysis and effort. I think what @JoshRosen is describing in the last paragraph of his analysis is exactly what my application is doing.
I have a custom ForeachWriter implementation class acting as a data sink on the write stream and I'm passing an instance of a statistics gathering class as an argument to the ForeachWriterImpl constructor. This statistics class has several Maps which have keys as statistic name and the value is a Spark accumulator.
I think I have some defects in my application where the de-serialization of this statistics class instance on the executor is re-registering the accumulator in the Spark context as described in the analysis above.
I will try to reorganize my code and see if I can stop the issue happening. If you need to see more details I can probably code up a sample application to show the construction of my production application
@smcmullan-ncirl Any new progress?
Hi, yes and no. I produced this toy example of what I thought my production code is doing: https://github.com/smcmullan-ncirl/RateSourceSparkStreamTest
However it seems to work fine and has not reproduced the error or caused any executors to crash. Maybe the code will give some clues as to whether I'm using some bad practice which exposes the issue in the production code.
I'm still trying to analyze the production code to see where there may be differences with the toy application. I'll report back soon if I find any or if I'm able to extend the toy application to reproduce the error
Hi, so I was able to reproduce the issue with the example application below.
It shows an custom ForEach sink which updates a set of Accumulators. The Map containing the accumulators is passed in the call to the Foreach sink and thus serialiazed/deserialized
I've integrated Codahale/DropWizard metrics for reporting metrics based on the accumulators and enabled the metrics sink like this:
driver.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=5
*.sink.console.unit=seconds
You may notice in the example application code below that that overridden method for getMetrics() resets the accumulator by calling its reset() method.
I wonder whether this causes the situation that @JoshRosen wrote about above?
import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet}
import org.apache.spark.metrics.source.StatsSource
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.slf4j.LoggerFactory
import java.util
object RateSourceSparkStreamTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("RateSourceStreamTest")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val stats = new PartitionStats(spark.sparkContext)
val statsMap = stats.getStatsMap
val metricRegistry = new MetricRegistry
metricRegistry.registerAll(stats)
SparkEnv.get.metricsSystem.registerSource(
StatsSource("RateSourceSparkStreamTest", metricRegistry)
)
import spark.implicits._
spark
.readStream
.format("rate")
.option("numPartitions", 120)
.option("rowsPerSecond", 12000)
.load()
.map(row => row.mkString("##"))
.writeStream
.foreach(new ForeachWriterImpl(statsMap))
.start()
spark.streams.awaitAnyTermination()
spark.close
}
}
class ForeachWriterImpl(statsMap: Map[PartitionStats.Value, LongAccumulator]) extends ForeachWriter[String] {
private final val LOGGER = LoggerFactory.getLogger(this.getClass)
override def open(partitionId: Long, epochId: Long): Boolean = {
LOGGER.info(s"Open partition $partitionId, epoch $epochId")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsOpened, 1)
true
}
override def process(value: String): Unit = {
LOGGER.info(s"Process value: $value")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsProcessed, 1)
}
override def close(errorOrNull: Throwable): Unit = {
LOGGER.info(s"Close partition: $errorOrNull")
PartitionStats.incMetric(statsMap, PartitionStats.partitionsClosed, 1)
}
}
object PartitionStats extends Enumeration {
private final val LOGGER = LoggerFactory.getLogger(this.getClass)
final val partitionsOpened = Value("partitionsOpened")
final val partitionsProcessed = Value("partitionsProcessed")
final val partitionsClosed = Value("partitionsClosed")
def incMetric(statsMap: Map[PartitionStats.Value, LongAccumulator], stat: PartitionStats.Value, count: Long): Unit = {
statsMap.get(stat) match {
case Some(acc) => acc.add(count)
case _ => LOGGER.error(s"Cannot increment accumulator for $stat")
}
}
}
class PartitionStats(sparkContext: SparkContext) extends MetricSet {
private final val statsMap: Map[PartitionStats.Value, LongAccumulator] =
PartitionStats.values.unsorted.map(elem => elem -> sparkContext.longAccumulator(elem.toString)).toMap
def getStatsMap: Map[PartitionStats.Value, LongAccumulator] = statsMap
override def getMetrics: util.Map[String, Metric] = {
val metricsMap: Map[String, Metric] = statsMap.map(
e =>
(
e._1.toString,
new Gauge[Long]() {
override def getValue: Long = {
val metricValue = e._2.value
e._2.reset() // this is possibly the problem!!!!
metricValue
}
}
)
)
import scala.jdk.CollectionConverters._
metricsMap.asJava
}
}
package org.apache.spark.metrics.source {
case class StatsSource(srcName: String, registry: MetricRegistry) extends Source {
override def sourceName: String = srcName
override def metricRegistry: MetricRegistry = registry
}
}
@LuciferYang - does the above make any sense? Do you need any more information from me?
Hi! Any updates on this?
I don't have any new ideas at present
No better idea, close it first
The exceptions reported in SPARK-39696 are as follows:
2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater java.util.ConcurrentModificationException: mutation occurred during iteration at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?] at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?] at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?] at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?] at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?] at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?] at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?] at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?] at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?] at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?] at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?] at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?] at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0] at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0] at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?] at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?] at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0] at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?] at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0] at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:833) ~[?:?]
It seems to be a small probability event
Hi - our executors crashed almost every 30 min while using HDFS as checkpoint (Kafka source). FYI - using SSHFS as checkpoint would crash the executors randomly say once/twice per day...
From our opinion, this is too dangerous in production, as this would cause streaming jitters - thus causing unncessary backpressure on our data pipeline. ...
We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)
Is there a Spark LTS release that is stable for production use?
We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)
@tamama Using Scala 2.12 can avoid this issue?
We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)
@tamama Using Scala 2.12 can avoid this issue?
Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.
Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)
@LuciferYang FYI ^
We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)
@tamama Using Scala 2.12 can avoid this issue?
Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.
Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)
Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself
We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)
@tamama Using Scala 2.12 can avoid this issue?
Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing. Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)
Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself
- We confirm that we have not detected any data-race exception for 18+ hrs.
- Will let you know if this can last for this entire trading week ...
- If yes, I would mark this as fixed on our side - with Scala-2.12 fallback as resolution.
There is no issue with Scala 2.12 at all for rhe entire week.
@LuciferYang FYI
- We have done a stress-load test from our side using Spark 3.3.1 (Scala 2.12)
- We have not detected any data-race condition for 140 hours
- Fallback from Scala 2.13 to Scala 2.12 is a fix for us at the moment, before our Spark community has a fix for this :)
We are also seeing this failure on Spark 3.3.1 with Scala 2.13 on Ubuntu 22.04.
I used one of the spark applications seeing this issue to further debug when we are deserializing AccumulatorV2
which would making us suspicable to the race. And I found an example that does not seem to involve any client code. Here is my understanding:
If the Task
is a ShuffleMapTask
it will deserialize a the rddAndDep as part of the runTask
:
https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L85
which is called after the Task deserilization happens. This combined with a ShuffleDependency
https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/Dependency.scala#L85
with a ShuffleWriteProcessor
that comes from
https://github.com/apache/spark/blob/v3.3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422
the our ShuffleDependency
will contain a reference to the map metrics: Map[String, SQLMetric]
which will cause the SQLMetric
to be registered while the task is already running and we have our race condition.
I would be willing to work no a fix for this, but would probably need some guidance/discussion what the correct fix actually is.