spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39696][CORE] Ensure Concurrent r/w `TaskMetrics` not throw Exception

Open LuciferYang opened this issue 2 years ago • 17 comments

What changes were proposed in this pull request?

This PR changes the declaration type of TaskMetrics#externalAccums from s.c.mutable.ArrayBuffer to j.u.concurrent.CopyOnWriteArrayList to ensure that errors described in SPARK-39696(java.util.ConcurrentModificationException: mutation occurred during iteration) will not occur when TaskMetrics#externalAccums is read and written concurrently

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass GitHub Actions and add a new test case.

The new case will fail regardless of the Scala version without changes of TaskMetrics as follows:

Scala 2.12:

mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite 

Before

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
  2 did not equal 0 (TaskMetricsSuite.scala:274)
Run completed in 6 seconds, 980 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

After

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 7 seconds, 516 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Scala 2.13

mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite  -Pscala-2.13

Before

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
  1339 did not equal 0 (TaskMetricsSuite.scala:275)
Run completed in 6 seconds, 714 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

After

Discovery starting.
Discovery completed in 6 seconds, 369 milliseconds.
Run starting. Expected test count is: 8
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 8 seconds, 434 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

LuciferYang avatar Jul 16 '22 18:07 LuciferYang

The exceptions reported in SPARK-39696 are as follows:

2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
        at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?]
        at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?] 

It seems to be a small probability event

LuciferYang avatar Jul 16 '22 18:07 LuciferYang

Have not looked in detail, but there are a bunch of other places where externalAccums is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?

mridulm avatar Jul 18 '22 03:07 mridulm

Have not looked in detail, but there are a bunch of other places where externalAccums is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?

You're right. I need to rethink this problem, let me set this PR to draft first, thanks

LuciferYang avatar Jul 18 '22 03:07 LuciferYang

It seems to be a small probability event

I'm finding the issue to be quite easy to reproduce. I have a 2 node Spark cluster with 3 executors per node (5 CPU/threads per executor). Since Friday I have ~150 DEAD executor instances due to this issue. Thanks for taking on the ticket.

smcmullan-ncirl avatar Jul 18 '22 09:07 smcmullan-ncirl

It seems to be a small probability event

@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?

LuciferYang avatar Jul 18 '22 09:07 LuciferYang

It seems to be a small probability event

@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?

I'm a novice with the Spark source code base and the Scala source code base but it looks to me that the scala.collection.mutable.MutationTracker class and scala.collection.mutable.CheckedIndexedSeqView class only exists in Scala 2.13 so I'm guessing its a case that with Java 17 JVM and Scala 2.13 that I'm running on a most restrictive runtime platform compared to the majority of Spark users

smcmullan-ncirl avatar Jul 18 '22 09:07 smcmullan-ncirl

Yes. When running new test suite, ConcurrentModificationException only occurs when using Scala 2.13. IndexOutOfBoundsException or NPE may occur when using Scala 2.12, but I did not encounter it in the production environment when using Scala 2.12

LuciferYang avatar Jul 18 '22 09:07 LuciferYang

@mridulm Compared with analyzing each scenario and using read-write locks, I think it may be simpler to change externalAccums to use a thread-safe data structure, for example CopyOnWriteArrayList. Do you have any better suggestions?

LuciferYang avatar Jul 19 '22 13:07 LuciferYang

I agree, given the usecase, CopyOnWriteArrayList might be a good option LuciferYang. +CC @JoshRosen - in case you have thoughts on this.

mridulm avatar Jul 20 '22 05:07 mridulm

@mridulm, thanks for the ping.

CopyOnWriteArrayList supports single-threaded writes and concurrent reads.

It looks like externalAccums is only written from registerAccumulator, which is only called from TaskContext.registerAccumulator, which is only called during AccumulatorV2 deserialization, which should only occur when deserializing the task binary at the beginning of Task execution.

In the SPARK-39696 JIRA, it looks like the read is occurring in the executor's reportHeartBeat function at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1042

As far as I know, taskRunner.task will be null initially and will be populated once the task is deserialized at https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508

Assuming my above description is correct, I don't understand how concurrent reads and writes can occur: in normal operation I would expect that writes only occur during task binary deserialization and that reads from the heartbeat thread can only occur after deserialization has completed.

Perhaps accumulators are being deserialized and registered while the task is running (not just during task setup)? For example, maybe a case class is defined on the driver and its closure accidentally closes over something that contains an AccumulatorV2, causing us to deserialize AccumulatorV2 instances when deserializing data values. In the simple case, we'd have one writer thread (the task's main runner thread) racing with the heartbeat thread. In more complex cases, I can imagine scenarios where a single task could consist of multiple threads (e.g. for PySpark) and deserialization might happen in multiple of them in case of spill (e.g. during RDD aggregation). If this is the case then it might explain the race condition. If that's true, though, then I'm wondering whether we're registering different instances of the same accumulator multiple times in the same task (which seems like it would be a performance and correctness bug).

JoshRosen avatar Jul 20 '22 07:07 JoshRosen

@JoshRosen

Yes, your analysis is very accurate. From the current stack, I can only infer that the following two methods may have racing (but I haven't found any conclusive evidence), so I added read-write locks to these two methods in the initial pr.

https://github.com/apache/spark/blob/66b1f79b72855af35351ff995492f2c13872dac5/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L257-L261

I didn't expect there would be correctness bug here, if this is really possible, I think the current clues are not enough to troubleshoot the problem.

@smcmullan-ncirl Cloud you provide more details to further investigate the issue? I want to know what the writing thread is?It seems that only you can stably reproduce this problem now :)

LuciferYang avatar Jul 21 '22 04:07 LuciferYang

Thanks for the detailed analysis @JoshRosen - I agree with your analysis. I saw two cases where this could be happening -

  • test code or user code calling spark-internal api,
  • Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.

Agree, we should get more details on this before changing the type/other fixes.

mridulm avatar Jul 26 '22 03:07 mridulm

Thanks for the detailed analysis @JoshRosen - I agree with your analysis. I saw two cases where this could be happening -

  • test code or user code calling spark-internal api,
  • Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.

Agree, we should get more details on this before changing the type/other fixes.

Agree +1, we need more details

LuciferYang avatar Jul 26 '22 04:07 LuciferYang

Thanks for all the analysis and effort. I think what @JoshRosen is describing in the last paragraph of his analysis is exactly what my application is doing.

I have a custom ForeachWriter implementation class acting as a data sink on the write stream and I'm passing an instance of a statistics gathering class as an argument to the ForeachWriterImpl constructor. This statistics class has several Maps which have keys as statistic name and the value is a Spark accumulator.

I think I have some defects in my application where the de-serialization of this statistics class instance on the executor is re-registering the accumulator in the Spark context as described in the analysis above.

I will try to reorganize my code and see if I can stop the issue happening. If you need to see more details I can probably code up a sample application to show the construction of my production application

smcmullan-ncirl avatar Jul 26 '22 15:07 smcmullan-ncirl

@smcmullan-ncirl Any new progress?

LuciferYang avatar Aug 03 '22 07:08 LuciferYang

Hi, yes and no. I produced this toy example of what I thought my production code is doing: https://github.com/smcmullan-ncirl/RateSourceSparkStreamTest

However it seems to work fine and has not reproduced the error or caused any executors to crash. Maybe the code will give some clues as to whether I'm using some bad practice which exposes the issue in the production code.

I'm still trying to analyze the production code to see where there may be differences with the toy application. I'll report back soon if I find any or if I'm able to extend the toy application to reproduce the error

smcmullan-ncirl avatar Aug 03 '22 10:08 smcmullan-ncirl

Hi, so I was able to reproduce the issue with the example application below.

It shows an custom ForEach sink which updates a set of Accumulators. The Map containing the accumulators is passed in the call to the Foreach sink and thus serialiazed/deserialized

I've integrated Codahale/DropWizard metrics for reporting metrics based on the accumulators and enabled the metrics sink like this:

driver.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=5
*.sink.console.unit=seconds

You may notice in the example application code below that that overridden method for getMetrics() resets the accumulator by calling its reset() method.

I wonder whether this causes the situation that @JoshRosen wrote about above?

import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet}
import org.apache.spark.metrics.source.StatsSource
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.slf4j.LoggerFactory

import java.util

object RateSourceSparkStreamTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("RateSourceStreamTest")

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    val stats = new PartitionStats(spark.sparkContext)
    val statsMap = stats.getStatsMap

    val metricRegistry = new MetricRegistry
    metricRegistry.registerAll(stats)

    SparkEnv.get.metricsSystem.registerSource(
      StatsSource("RateSourceSparkStreamTest", metricRegistry)
    )

    import spark.implicits._

    spark
      .readStream
      .format("rate")
      .option("numPartitions", 120)
      .option("rowsPerSecond", 12000)
      .load()
      .map(row => row.mkString("##"))
      .writeStream
      .foreach(new ForeachWriterImpl(statsMap))
      .start()

    spark.streams.awaitAnyTermination()

    spark.close
  }
}

class ForeachWriterImpl(statsMap: Map[PartitionStats.Value, LongAccumulator]) extends ForeachWriter[String] {
  private final val LOGGER = LoggerFactory.getLogger(this.getClass)

  override def open(partitionId: Long, epochId: Long): Boolean = {
    LOGGER.info(s"Open partition $partitionId, epoch $epochId")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsOpened, 1)
    true
  }

  override def process(value: String): Unit = {
    LOGGER.info(s"Process value: $value")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsProcessed, 1)
  }

  override def close(errorOrNull: Throwable): Unit = {
    LOGGER.info(s"Close partition: $errorOrNull")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsClosed, 1)
  }
}

object PartitionStats extends Enumeration {
  private final val LOGGER = LoggerFactory.getLogger(this.getClass)

  final val partitionsOpened = Value("partitionsOpened")
  final val partitionsProcessed = Value("partitionsProcessed")
  final val partitionsClosed = Value("partitionsClosed")

  def incMetric(statsMap: Map[PartitionStats.Value, LongAccumulator], stat: PartitionStats.Value, count: Long): Unit = {
    statsMap.get(stat) match {
      case Some(acc) => acc.add(count)
      case _ => LOGGER.error(s"Cannot increment accumulator for $stat")
    }
  }
}

class PartitionStats(sparkContext: SparkContext) extends MetricSet {
  private final val statsMap: Map[PartitionStats.Value, LongAccumulator] =
    PartitionStats.values.unsorted.map(elem => elem -> sparkContext.longAccumulator(elem.toString)).toMap

  def getStatsMap: Map[PartitionStats.Value, LongAccumulator] = statsMap

  override def getMetrics: util.Map[String, Metric] = {
    val metricsMap: Map[String, Metric] = statsMap.map(
      e =>
        (
          e._1.toString,
          new Gauge[Long]() {
            override def getValue: Long = {
              val metricValue = e._2.value

              e._2.reset() // this is possibly the problem!!!!

              metricValue
            }
          }
        )
    )

    import scala.jdk.CollectionConverters._
    metricsMap.asJava
  }
}

package org.apache.spark.metrics.source {
  case class StatsSource(srcName: String, registry: MetricRegistry) extends Source {
    override def sourceName: String = srcName

    override def metricRegistry: MetricRegistry = registry
  }
}

smcmullan-ncirl avatar Aug 08 '22 12:08 smcmullan-ncirl

@LuciferYang - does the above make any sense? Do you need any more information from me?

smcmullan-ncirl avatar Aug 23 '22 17:08 smcmullan-ncirl

Hi! Any updates on this?

MaksGS09 avatar Nov 02 '22 13:11 MaksGS09

I don't have any new ideas at present

LuciferYang avatar Nov 08 '22 11:11 LuciferYang

No better idea, close it first

LuciferYang avatar Nov 16 '22 15:11 LuciferYang

The exceptions reported in SPARK-39696 are as follows:

2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
        at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?]
        at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?] 

It seems to be a small probability event

Hi - our executors crashed almost every 30 min while using HDFS as checkpoint (Kafka source). FYI - using SSHFS as checkpoint would crash the executors randomly say once/twice per day...

From our opinion, this is too dangerous in production, as this would cause streaming jitters - thus causing unncessary backpressure on our data pipeline. ...

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

Is there a Spark LTS release that is stable for production use?

tamama avatar Mar 28 '23 13:03 tamama

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

LuciferYang avatar Mar 28 '23 14:03 LuciferYang

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.

Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

tamama avatar Mar 28 '23 16:03 tamama

@LuciferYang FYI ^

tamama avatar Mar 28 '23 16:03 tamama

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.

Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself

LuciferYang avatar Mar 29 '23 03:03 LuciferYang

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing. Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself

  • We confirm that we have not detected any data-race exception for 18+ hrs.
  • Will let you know if this can last for this entire trading week ...
  • If yes, I would mark this as fixed on our side - with Scala-2.12 fallback as resolution.

tamama avatar Mar 29 '23 09:03 tamama

There is no issue with Scala 2.12 at all for rhe entire week.

tamama avatar Apr 01 '23 16:04 tamama

@LuciferYang FYI

  • We have done a stress-load test from our side using Spark 3.3.1 (Scala 2.12)
  • We have not detected any data-race condition for 140 hours
  • Fallback from Scala 2.13 to Scala 2.12 is a fix for us at the moment, before our Spark community has a fix for this :)

tamama avatar Apr 03 '23 08:04 tamama

We are also seeing this failure on Spark 3.3.1 with Scala 2.13 on Ubuntu 22.04.

I used one of the spark applications seeing this issue to further debug when we are deserializing AccumulatorV2 which would making us suspicable to the race. And I found an example that does not seem to involve any client code. Here is my understanding:

If the Task is a ShuffleMapTask it will deserialize a the rddAndDep as part of the runTask:

https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L85

which is called after the Task deserilization happens. This combined with a ShuffleDependency

https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/Dependency.scala#L85

with a ShuffleWriteProcessor that comes from

https://github.com/apache/spark/blob/v3.3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

the our ShuffleDependency will contain a reference to the map metrics: Map[String, SQLMetric] which will cause the SQLMetric to be registered while the task is already running and we have our race condition.

I would be willing to work no a fix for this, but would probably need some guidance/discussion what the correct fix actually is.

eejbyfeldt avatar Apr 03 '23 11:04 eejbyfeldt