CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

《2.2 JobGenerator 详解.md》讨论区

Open lw-lin opened this issue 9 years ago • 9 comments

这里是 《2.2 JobGenerator 详解.md》 讨论区。

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

lw-lin avatar Dec 05 '15 12:12 lw-lin

@lw-lin , @keepsimplefocus , 请教个问题,我在使用cp时,从cp恢复后,发现总是会重复消费停止前最后一个batch的数据,看到JobGenerator.restart时发现:

val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
logInfo("Batches during down time (" + downTimes.size + " batches): "
      + downTimes.mkString(", "))

从cp恢复的第一个时间是停止前checkpointTime的时间开始,但是这个checkpointTime是已经执行过的batch啊?为什么要这么做呢?

zzcclp avatar May 19 '16 01:05 zzcclp

博主,这里的checkpoint只是记录了batch开始的信息, 是不是这个batch完成后还有一个回写确认这个batch的jobs已经完成的机制?

TopSpoofer avatar May 23 '16 23:05 TopSpoofer

@TopSpoofer , 没有这个机制。实验证明,从cp恢复后,会重复执行一次stop gracefully前的最后一个batch

zzcclp avatar May 24 '16 00:05 zzcclp

@zzcclp

不是的, “Checkpoint 发起的间隔默认的是和 batchDuration 一致;即每次 batch 发起、提交了需要运行的 job 后就做 Checkpoint,另外在 job 完成了更新任务状态的时候再次做一下 Checkpoint。”

JobGenerator 类在异步提交jobset后做了一次cp JobScheduler 在jobset成功执行完成后,也做了一次cp

下面是源码

// 来自 JobGenerator
private def generateJobs(time: Time) {
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)                 // 【步骤 (1)】
    graph.generateJobs(time)                                                 // 【步骤 (2)】
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) // 【步骤 (3)】
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))    // 【步骤 (4)】
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))       // 【步骤 (5)】
}
// 来自 JobScheduler
private def clearMetadata(time: Time) {
  ssc.graph.clearMetadata(time)

  if (shouldCheckpoint) {
    // 【一个 batch 做完,需要 clean 元数据时】
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
  }
  ...
}

TopSpoofer avatar May 27 '16 00:05 TopSpoofer

@TopSpoofer , 我可能误解你的意思了,我是基于我的问题来回答的。正如你所说,在生成jobset及结束后是都会cp,结束后的cp还会清理一些之前的cp数据,但正是因为这个机制,导致在stop gracefully后,ssc.initialCheckpoint.checkpointTime这个time是已经执行结束的时间,然而从cp恢复后,开始执行的时间却是从这个ssc.initialCheckpoint.checkpointTime开始,导致ssc.initialCheckpoint.checkpointTime这个时间的数据会重复。

这个问题已经和社区的人沟通过,确实是个bug,等我项目上线运行OK后,我会提交issue及pr。

zzcclp avatar May 27 '16 02:05 zzcclp

@zzcclp 原来是这样啊,good good good

TopSpoofer avatar May 27 '16 09:05 TopSpoofer

关于GenerateJobs里步骤1和步骤3的解释【第 1 步中 ReceiverTracker 只是对 batch 的源头数据 meta 信息进行了 batch 的分配,本步骤是按照 batch 时间来向 ReceiverTracker 查询得到划分到本 batch 的块数据 meta 信息】不是很懂。

InputInfoTracker的注释是This class manages all the input streams as well as their input data statistics. The information will be exposed through StreamingListener for monitoring.还有,StreamInputInfo注解为@DeveloperApi。

我的理解是步骤1所做的是获取到数据。步骤3所做的是给这些数据加上属性,提供给开发者查询展示监控。

smileyingping avatar Jul 11 '17 03:07 smileyingping

ForEachDStream

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

这边有一个问题,在streaming的job没有提交运行前,就已经递归将foreachDstream的parentRDD都计算完了,也就是说最终提交给jobExecutor的job已经是最终RDD了吗?

chucheng92 avatar Feb 25 '18 13:02 chucheng92

@Lemonjing 我的理解是没有,调用parent.getOrCompute只会递归的生成RDD,但是这个时候实际还是没有计算的,只是把RDD之间的dag关系给建立起来了。

就像example中的sparkPi例子:

  val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>  
      val x = random * 2 - 1        
     val y = random * 2 - 1
      if (x*x + y*y <= 1) 1 else 0
  }.reduce(_ + _) 

上面的parallelize,map都会生成对应的RDD,但是触发计算是因为reduce这个action算子,如果没有reduce是不会进行计算的。

hansonzh avatar Mar 20 '18 11:03 hansonzh