CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

driver端异常恢复, 如何确保exactly once语义的呢?

Open jingwangfei opened this issue 5 years ago • 1 comments

嗨, 大佬, 我有一个问题. 当一个jobSet, 有部分job已经执行成功, 此时, driver端异常退出. 那么, 恢复后, 针对这个jobSet, 还会执行那些已经成功的job吗? 如果不执行, 那么在源码中, 是如何体现的? 如果执行, 那么肯定就不遵守exactly once语义了, 那么我们平时说的streaming的exactly once语义, 又是如何理解呢?

jingwangfei avatar Dec 27 '18 03:12 jingwangfei

我刚看了源码, 在这种情况下, 针对该jobSet, 还会执行那些已经成功的job.

恢复时, 会调用restart()方法, 从ck中读取信息重启jobGenerator. 那些在driver端异常退出前, 未被执行完成的jobSet, 会重新进行调度.

private def restart() {   
// 一个time时间点, 对应一个jobSet.
// driver down批次, ck时间和当前重启时间之间的批次
val downTimes = ......

// 在失败之前未经处理的批次
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)

// 未处理批次 + down批次, 并按照时间排序, 越早的越靠前
timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }.distinct.sorted(Time.ordering)

// 调度
  timesToReschedule.foreach { time =>
      jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
    }
}

以上, 仅仅是个人意见, 欢迎一起讨论 & 指导. 但是, 还有一个问题, 我们通常所说的 ''streaming遵守exactly once语义''是什么意思呢?

jingwangfei avatar Dec 27 '18 07:12 jingwangfei