Liwei Lin

Results 61 comments of Liwei Lin

@keepsimplefocus 我们是做社交和效果广告的业务,所以 Spark Streaming 在我们的实践中: - 从业务应用来讲 - 各种维度、各种指标的数据统计(非精准、允许有少许误差) - 各种维度、各种指标的数据统计(精准) - 实时反作弊与计费(可重入、精准) - 与其它数据源进行实时数据关联、数据清洗、特征抽取 - 实验相关内容的实时控制与反馈 - 从技术栈来讲 - 原有 Storm 能做的业务已经迁移了很多到 Spark Streaming - 原有 MR 能做的业务已经迁移了很多到 Spark...

@keepsimplefocus 是的,我们也有。 之前跟你们差不多,我们是借助 HBase 进行去重,加点优化一个是批量,另一个是一个 key 保存多个 uuid 的 value 列表、进一步批量化一下。 前段时间我们在改为 Spark Streaming 原生的 mapWithState() 实现,就是把见过的值保存到 state 里,state 靠 Spark Streaming 来做 snapshot 和 replay。但是还在调优、暂时还没到线上、应该很快可以到线上了。

@keepsimplefocus 重启后不丢失进度这个靠 Spark Streaming 的 checkpoint 机制(是指 DStreamGraph、JobGenerator 等的状态的 checkpoint,详见[《4.2》](https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/4.2%20Driver%20%E7%AB%AF%E9%95%BF%E6%97%B6%E5%AE%B9%E9%94%99%E8%AF%A6%E8%A7%A3.md))就很好了——注意这个 checkpoint 是 Streaming 对计算进度的 checkpoint,不是 DStream 或 RDD 的内容的 checkpoint。Application 失效前计算到什么进度、恢复后从什么进度开始重计算,都是很准确的。 之前 checkpoint 限制较多是指? 对于 Kafka 来讲,建议使用 [Direct](http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers) 而不是 [Receiver-based](http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach)...

@keepsimplefocus 是的。End-to-end exactly-once 还是需要下游数据接收端也有些去重处理逻辑,现在我们都是自己写的,利用 HBase 的 checkAndPut() / MySQL 里是另加了 batch-id 等。不过 2.0 的 Structured Streaming 里确实添加了 end-to-end exactly-once 的框架级支持,参见来自 databricks 的 slide: ![image](https://cloud.githubusercontent.com/assets/15843379/14421978/f18bd2da-0006-11e6-8b11-6cd8a5127ecd.png)

@keepsimplefocus 你在哪个公司?感觉你们 streaming 也研究和应用的挺深入的,没有在你 github 页找到公司信息。

@keepsimplefocus @zzcclp 另一个与异步 ck 有些相关的问题(已经在 1.5 中 fix 了),FYI: [[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint](https://github.com/apache/spark/commit/645cf3fcc21987417b2946bdeeeb60af3edf667e)

@tsface `block input-x-xxxxx not found` 这个错误,就是数据已经找不到了: - 在 `MEMORY_ONLY` 时,如果数据过多,内存中就会发生数据替换,被替换出的数据直接被丢掉,所以在后面计算时需要这部分数据的时候,就找不到了; - 在 `MEMORY_AND_DISK` 时,数据过多,替换出的数据会 flush 到硬盘上,所以比 `MEMORY_ONLY` 时报错几率小很多;但 Executor 失效时(比如其它原因内存溢出后被 Yarn kill 掉、或网络太忙导致心跳发不出去被 driver 认为丢失了等),也会导致在 Memory 或在 Disk 上的数据没有了,虽然几率已经小很多了,但也还是会报这个错误的。 通常解决方法是几种:...

@tsface 上面提到的 try-catch 代码: ``` scala val inputDStream = ssc.fileStream("") inputDStream.foreachRDD(rdd => { try { // do something } catch { case e => e.printStackTrace() } }) ```

@tsface 现在有几个 receiver?几个 Executor、每个 Executor 几个 core? block interval 是多大?batch interval(即 batch duration) 呢?每个 batch 处理多少 records?

@tsface 好的,try-catch 只是个应急手段,看起来还是建议调整下 block interval 和每个 executor 的 core 数~