CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

《1.1 DStream, DStreamGraph 详解.md》讨论区

Open lw-lin opened this issue 8 years ago • 36 comments

这里是 《1.1 DStream, DStreamGraph 详解.md》 讨论区。

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

lw-lin avatar Dec 05 '15 12:12 lw-lin

@lw-lin 请问 Spark Streaming 在你们的实践中主要用来做哪些事情?

weibin0516 avatar Mar 28 '16 13:03 weibin0516

@keepsimplefocus

我们是做社交和效果广告的业务,所以 Spark Streaming 在我们的实践中:

  • 从业务应用来讲
    • 各种维度、各种指标的数据统计(非精准、允许有少许误差)
    • 各种维度、各种指标的数据统计(精准)
    • 实时反作弊与计费(可重入、精准)
    • 与其它数据源进行实时数据关联、数据清洗、特征抽取
    • 实验相关内容的实时控制与反馈
  • 从技术栈来讲
    • 原有 Storm 能做的业务已经迁移了很多到 Spark Streaming
    • 原有 MR 能做的业务已经迁移了很多到 Spark Streaming

抱歉不能讲的很详细 :-)

lw-lin avatar Mar 29 '16 05:03 lw-lin

多谢~

weibin0516 avatar Mar 29 '16 06:03 weibin0516

@lw-lin 再请问下,在你们的业务中应该也有需要结合比较长一段历史数据进行实时统计的吧。你们是如何做的呢?比如要实时算一天的 uuid 排重数。我们这边这种操作都是需要借助外部的东西,比如 redis 之类的

weibin0516 avatar Apr 06 '16 09:04 weibin0516

@keepsimplefocus

是的,我们也有。

之前跟你们差不多,我们是借助 HBase 进行去重,加点优化一个是批量,另一个是一个 key 保存多个 uuid 的 value 列表、进一步批量化一下。

前段时间我们在改为 Spark Streaming 原生的 mapWithState() 实现,就是把见过的值保存到 state 里,state 靠 Spark Streaming 来做 snapshot 和 replay。但是还在调优、暂时还没到线上、应该很快可以到线上了。

lw-lin avatar Apr 07 '16 02:04 lw-lin

@lw-lin 好的 多谢 我们在 redis 的使用了也做了批量和 pipeline 的优化。我也尝试使用你们现在的思路试试~

weibin0516 avatar Apr 07 '16 07:04 weibin0516

@lw-lin 再请教一个问题: 你们是怎么保证 Application 在 failed 之后计算不会多也不会少,且重启后也不丢失计算进度? 我们的实践中,数据源是 kafka,之前由于 checkpoint 限制较多就没启用 checkpoint,自己维护 offsets。但自己维护 offsets 始终存在丢失数据的可能,所以现在考虑启用 checkpoint + WAL

weibin0516 avatar Apr 09 '16 23:04 weibin0516

@keepsimplefocus

重启后不丢失进度这个靠 Spark Streaming 的 checkpoint 机制(是指 DStreamGraph、JobGenerator 等的状态的 checkpoint,详见《4.2》)就很好了——注意这个 checkpoint 是 Streaming 对计算进度的 checkpoint,不是 DStream 或 RDD 的内容的 checkpoint。Application 失效前计算到什么进度、恢复后从什么进度开始重计算,都是很准确的。

之前 checkpoint 限制较多是指?

对于 Kafka 来讲,建议使用 Direct 而不是 Receiver-based 的方式读取,offsets 都由 Spark Streaming 放到自己的 checkpoint 中而不是 zk 中。Direct 读取的方式,就不需要 WAL 了,而是需要重计算时根据随时更新到 checkpoint 中的 offsets 重新回 Kafka 那里去取。

最后,对外写出需要简单的原子性保证,如 HBase 的 checkAndPut()。没有这个原子性的,就每次写出时加个 batch id,在下游获取结果的时候根据 batch id 进行一下结果的去重吧。

lw-lin avatar Apr 10 '16 04:04 lw-lin

嗯,direct 方式确实不需要 WAL,每个 batch 开始的时候才去拉数据。我说的 checkpoint 限制是指,在修改 app 后 checkpoint 因为反序列化失败是失效的。这样我修改了 app 或想要修改配置,使用 checkpoint 都不能恢复。 不过,在将spark.streaming.concurrentJobs设置为多个的时候,还是需要启用 WAL,不然同时执行未完成的 job 还是可能会丢

weibin0516 avatar Apr 10 '16 08:04 weibin0516

@lw-lin 我仔细想了下,使用 checkpoint (甚至加上 WAL)理论上也是无法保证 exactly once 的,因为在 job 丢给 jobExecutor 异步执行之后,会马上执行 checkpoint 操作,这时 checkpoint 数据里,刚刚的 job 其实是未完成的。假设,在下一个 batch 开始之前刚刚的 job 完成了,但下一个 batch 在 checkpoint 之前 application 挂掉了,那么本来已经完成的 job 对应的 time 在 driver 从 checkpoint 恢复后还是未完成的。这样就会重复处理了

weibin0516 avatar Apr 11 '16 06:04 weibin0516

@keepsimplefocus 是的。End-to-end exactly-once 还是需要下游数据接收端也有些去重处理逻辑,现在我们都是自己写的,利用 HBase 的 checkAndPut() / MySQL 里是另加了 batch-id 等。不过 2.0 的 Structured Streaming 里确实添加了 end-to-end exactly-once 的框架级支持,参见来自 databricks 的 slide:

image

lw-lin avatar Apr 11 '16 09:04 lw-lin

@keepsimplefocus 你在哪个公司?感觉你们 streaming 也研究和应用的挺深入的,没有在你 github 页找到公司信息。

lw-lin avatar Apr 11 '16 09:04 lw-lin

@lw-lin 我是猎豹移动的 :-)

weibin0516 avatar Apr 11 '16 09:04 weibin0516

@keepsimplefocus 自己维护偏移量,会存在丢失数据的可能么?

luckuan avatar Apr 12 '16 07:04 luckuan

@luckuan 要看怎么用,如果spark.streaming.concurrentJobs为1,则不会丢数据,但存在有很小一部分数据重复消费的可能;如果spark.streaming.concurrentJobs大于1,是会丢数据的

另,凡是处理数据的完成操作和确认数据已处理这两个操作不是一个原子操作,那么一定是不能保证加好一次的语义的

weibin0516 avatar Apr 12 '16 07:04 weibin0516

膜拜~~~

luckuan avatar Apr 12 '16 09:04 luckuan

@keepsimplefocus 能解释下为什么spark.streaming.concurrentJobs大于1时会丢数据吗?

zzcclp avatar Apr 12 '16 09:04 zzcclp

@zzcclp 我周末写篇文章说明下这个问题吧,到时发链接给你~

weibin0516 avatar Apr 12 '16 09:04 weibin0516

@keepsimplefocus 多谢,能不能把你blog先给我,学习下哈

zzcclp avatar Apr 12 '16 09:04 zzcclp

http://www.jianshu.com/users/001d44710e2e/latest_articles

在 2016-04-12 17:17:58,"Zhichao Zhang" [email protected] 写道:

@keepsimplefocus 多谢,能不能把你blog先给我,学习下哈

— You are receiving this because you were mentioned. Reply to this email directly or view it on GitHub

weibin0516 avatar Apr 12 '16 09:04 weibin0516

@keepsimplefocus 原来你就是“牛肉圆粉不加葱”啊,看过你的文章,赞

zzcclp avatar Apr 12 '16 09:04 zzcclp

@keepsimplefocus 关于spark.streaming.concurrentJobs大于1时会丢数据的问题,你什么时候能写个blog呢?

zzcclp avatar Apr 26 '16 02:04 zzcclp

@zzcclp 抱歉,我周末补上,大致说一下,主要原因是因为 job 都是放到线程池里异步提交执行的,这样会导致写入 checkpoint 的 job 状态和 job 真正的状况可能会不一致~

weibin0516 avatar Apr 26 '16 03:04 weibin0516

@keepsimplefocus 如果每个batch都能完成任务,这种情况就不会出现?当batch出现滞后,就会导致这种情况呢?

zzcclp avatar Apr 26 '16 03:04 zzcclp

@zzcclp 写了文章解释了下:http://www.jianshu.com/p/27f91de7417d,之前说的可能会引起数据丢失还是跟我们自己的实现方式有关,我们是自己保留每个 batch 消费到的 offsets,所以如果 spark.streaming.concurrentJobs 大于1,那么就可能会有多个 batch 的 job 一起运行,可能会晚一点的batch 反而更早运行完了,这个时候把这个晚一点的 batch 写到我们的 zookeeper 中当做上次消费到的 offsets,如果此时挂掉了,那么在下次重启的时候就会从该 offsets 处重启,那么在挂掉时,那个更早一点的 batch 对应的 job 其实没有执行完成,这个时候就会有一部分数据丢失了。这是我们之前方案的不足之处,之所以采取这样的方案的原因也是我们的业务允许绩效的数据误差。不过要在spark.streaming.concurrentJobs 大于1时保证整个业务的 exactly once,其实还要视具体情况而定,在我们的业务中代价会比较大。

weibin0516 avatar May 03 '16 14:05 weibin0516

大概清楚了,谢谢你的解释,不过就算是不自己管理offset,采用cp,也是有可能出现你说的情况吧?归根结底,还是能及时处理,才能避免这种情况

zzcclp avatar May 03 '16 16:05 zzcclp

@keepsimplefocus @zzcclp 另一个与异步 ck 有些相关的问题(已经在 1.5 中 fix 了),FYI: [SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint

lw-lin avatar May 04 '16 00:05 lw-lin

@lw-lin 那个pr是增加了在结束job后再删除cp数据的,但是如果出现后面的batch比前面的batch早处理完,似乎也有可能出现 @keepsimplefocus 说的情况吧?

zzcclp avatar May 04 '16 01:05 zzcclp

@lw-lin 博主 End-to-end exactly-once 是什么意思,能解析一下吗?

TopSpoofer avatar May 31 '16 14:05 TopSpoofer

@lw-lin @keepsimplefocus 看到上面你们讨论关于“需要结合比较长一段历史数据进行实时统计”的需求在spark streaming中的mapWithState实现,请问这个方案现在在使用中吗?

另外我有几个问题想请教下 1.如果需求中有比较多的这种去重需求,那对于一天的统计,mapWithState中累积的数据就会随时间线性增加,在一天结束时这个量相当于和做了一个每天定时的批处理数据量是相当的,这对于stateful stream的checkpoint操作是否有比较大的压力?

2.由于mapWithState操作是只对增量数据进行操作,因此每个batch中并不会遍历所有key,这虽然减少运算量,但相比updateStateByKey就不能及时remove掉不需要的数据(比如一天结束时的clean操作),想到过timeout方法,但这样就需要设置超时大于1天,内存和checkpoint冗余量会比较大,这个如何解决?

3.一旦生成"静态的RDD DAG 模板",是不是用户就无法通过在driver侧设置变量等方式改变某个batch时的执行流程了?比如我想每N个batch才执行一次output,我试过在foreachRDD里面使用driver的变量来判断是否要执行输出操作,但我发现只有在实际输出需要执行的那个batch,job才会真正启动计算,期间的数据计算都会堆积在此时才会触发,这种做法是否合理?

这几个问题困扰了好久,希望能解答下,谢谢。

permanentstar avatar Oct 13 '16 02:10 permanentstar