Zhichao Zhang
Zhichao Zhang
@keepsimplefocus 能解释下为什么spark.streaming.concurrentJobs大于1时会丢数据吗?
@keepsimplefocus 多谢,能不能把你blog先给我,学习下哈
@keepsimplefocus 原来你就是“牛肉圆粉不加葱”啊,看过你的文章,赞
@keepsimplefocus 关于spark.streaming.concurrentJobs大于1时会丢数据的问题,你什么时候能写个blog呢?
@keepsimplefocus 如果每个batch都能完成任务,这种情况就不会出现?当batch出现滞后,就会导致这种情况呢?
大概清楚了,谢谢你的解释,不过就算是不自己管理offset,采用cp,也是有可能出现你说的情况吧?归根结底,还是能及时处理,才能避免这种情况
@lw-lin 那个pr是增加了在结束job后再删除cp数据的,但是如果出现后面的batch比前面的batch早处理完,似乎也有可能出现 @keepsimplefocus 说的情况吧?
@lw-lin , @keepsimplefocus , 请教个问题,我在使用cp时,从cp恢复后,发现总是会重复消费停止前最后一个batch的数据,看到JobGenerator.restart时发现: ``` scala val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo("Batches during down time (" + downTimes.size + "...
@TopSpoofer , 没有这个机制。实验证明,从cp恢复后,会重复执行一次stop gracefully前的最后一个batch
@TopSpoofer , 我可能误解你的意思了,我是基于我的问题来回答的。正如你所说,在生成jobset及结束后是都会cp,结束后的cp还会清理一些之前的cp数据,但正是因为这个机制,导致在stop gracefully后,ssc.initialCheckpoint.checkpointTime这个time是已经执行结束的时间,然而从cp恢复后,开始执行的时间却是从这个ssc.initialCheckpoint.checkpointTime开始,导致ssc.initialCheckpoint.checkpointTime这个时间的数据会重复。 这个问题已经和社区的人沟通过,确实是个bug,等我项目上线运行OK后,我会提交issue及pr。