CoolplaySpark
CoolplaySpark copied to clipboard
《0.1 Spark Streaming 实现思路与模块概述.md》讨论区
这里是 《0.1 Spark Streaming 实现思路与模块概述.md》 讨论区。
如需要贴代码,请复制以下内容并修改:
public static final thisIsJavaCode;
val thisIsScalaCode
谢谢!
Spark Streaming 的长时容错特性,能够提供不重、不丢,exactly-once 的处理语义。 不丢 可以通过记录标签及wal做到,怎么做到不重及exactly-once呢,此点本文没有说。
@lwwcl1314
考虑三点:
- 给定 HDFS 上的输入文件 f,那么 MapReduce 无论失败几次、重做几次,最终的结果 r 是一致的;
- Spark Streaming 就是保证了每条源头数据都唯一的划分到一个块数据里,每个块数据都唯一的划分到一个 batch 里;
- 然后对于一个 batch,失败几次就对着源头数据再重做几次(就像 MapReduce 对着 f 多次重做一样),就可以保证本 batch 的结果与本 batch 的源头数据是一一对应的。
不重复,是因为每条源头数据都唯一划分一个 batch 里;不丢 + 不重复,就等于 exactly-once 了。
Storm 的 Trident 层,也是通过类似划分 batch 的方式做到的 exactly-once —— 不过那个实现有点不太自然。。。
我感觉 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ 这个文件讲的还是很好的,呵呵。
cloudera 的这篇文章是针对 Apache Kafka 的情况作了具体的讲解,也推荐大家都看看。
赞 @lwwcl1314 ~
里面的图是什么画得呢?
我用spark streaming,但时间长了之后,经常会自己就退出了,是为什么呢
https://github.com/jacksu/utils4s/tree/master/sparkstreaming-demo
@jacksu 基本都是用 Visio 画的,:-)
@luphappy 这个需要具体看一下 driver 端打印的日志,退出一般是报 Exception 了,可以根据具体的 Exception 来看;有可能还需要到报错的 executor 端去看日志。
可否贴一下报错信息?
大神,你这一系列文章可以转载吗?转载会附上原文链接与作者名。
@winwill2012 OK 的。附上原文链接与作者名,然后把链接 post 到这里就可以了。
想问一下,热备和冷备都是针对块数据来的,那些还没成为块数据的缓存中的细小数据怎么处理?
@zwzm85 思考的很细致,赞。确实这里是有些问题的,这些细小数据就没有保障了;最主要的原因还是在于上游不能支持重放。
您好,请问,我的spark-streaming程序以yarn-client的方式运行了一段时间后,就退出了,但driver还在,yarn日志如下: 15/09/19 14:32:34 ERROR util.Utils: Uncaught exception in thread Thread-1 org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1437371132890_10529_000002 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.finishApplicationMaster(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:378) at org.apache.spark.deploy.yarn.YarnRMClient.unregister(YarnRMClient.scala:86) at org.apache.spark.deploy.yarn.ApplicationMaster.unregister(ApplicationMaster.scala:184) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:123) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2308) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2260) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1437371132890_10529_000002 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy17.finishApplicationMaster(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91) ... 23 more
这个是什么原因造成的呢
@luphappy
你好,请问你的 Yarn 是什么版本?有可能跟这个有关: YARN-3103: AMRMClientImpl does not update AMRM token properly
另外,用 yarn cluster 方式部署试试看;通常 Streaming 的程序不会部署为 yarn client。
问个Spark-Core的问题,只是我自己一直没搞明白。看源码的时候生成FinalStage的时候,那个ActiveJob是啥? 话说一个Stage应该就是对应一个job吧,然后一个Stage可以含有多个RDD,但是对于FinalStage而言,应该就只有一个job吧。 而非FinalStage可以对应多个RDD,那么非FinalStage是不是可以对应多个job呢? 求解... 看源码看晕了T_T~~~~
@lw-lin
对于job如何提交的,如何运行的,你可以看看这篇博客 http://www.cnblogs.com/luckuan/p/5250258.html 。 一个job可以生成多个stage,每个stage中间会有一个shuffle过程。@pzz2011
@luckuan @lw-lin 1.5.0 version spark-1.5.0\spark-1.5.0\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
感觉还是这几个Collection没搞懂jobIdToActiveJob shuffleToMapStage stageIdToStage jobIdToStageIds 没搞懂它们和Stage啊 job啊task的关系....
@pzz2011 可以看看这篇文章 [Spark源码剖析] DAGScheduler划分stage http://www.jianshu.com/p/50c5c1032206
我想问下WAL 默认配置是false的, 这个你们目前线上是开启的吗? 如果在保存WAL的时候程序强制退出,做了checkpoint ,这个没有保存在WAL的数据是不是丢失了。
在保存数据的时候exactly-once 这个怎么做到,比如写到非事务的存储里,写了一半挂掉了。
还有个问题,对于输出数据到hdfs中,如何做到一个小时对应partition个文件。 设置 batchDuration 为一个小时,感觉这种做法不靠谱, 能够append 到之前的文件这种做法吗?
@wangwenting
(1.a) 我们线上还是分应用,对于一点不能丢的都是从可重放的数据源读数据(如 HDFS,Kafka)等,可以丢一点的就开 WAL 或 RAM_2 双机热备; (1.b) 是的,不可重放的数据源不论是 WAL 还是 RAM_2 双机热备都有丢数据的风险,保险的还是靠可重 放数据源。
(2) exactly-once 主要是指 Streaming 内部的处理,end-to-end 的 exactly-once 还需要上游数据源支持(可重放)、下游数据接收支持(事务)。如果写到非事务的存储里,则保证 at-lease-once;其实多加一列 batch-id,其实也能保证 exactly-once,比如 batch-id =10, count = 2000 这样的输出多写几次,最终取出来还是 count = 2000 这样一条。
(3) 这个需求还不好直接支持,可以尝试间接:(a) 自己起一个合并任务,每小时把 p × n 个文件合并为 p 个;或者 (b) 自己写个输出方式为 append 到已有文件,只不过这种要注意设置 concurrentJobs = 1,以防同一个小时内多个 batch 同时 append 到同一个 partition 对应的文件。
希望有帮助!
非常感谢,很有帮助,前期准备就用kafka direct 方式读,在线的就foreachRDD 的方式写入mysql 离线的就kafka direct 方式读 写 hdfs中,spark目前好像不支持小文件combine, 只能自己合并了。
在streaming checkpoing 的情况下用broadcast 会出现raise Exception("Broadcast variable '%s' not loaded!" % bid) 不知道你可预见过
请问: 一个inputStream对应一个Receiver对吧,一个Receiver会分配到一个Executor上,那么这个Receiver接收到的数据都会放在这个Executor上吧,这样会不会造成数据倾斜呢?
@wangwenting
~~没有遇到过。以下代码是 checkpoint + broadcast,起停了几次都是正常运行的,请参考:……~~
Update: 代码已删除,原来的代码在 local 运行有效、但放到 Yarn 上跑确实有问题。
正确的代码请见 Streaming 官方 Programming Guide。
@romantic123
是的,如果整个 app 只有 1 个 receiver,那数据就会只收到 1 个或 2 个 Executor 上(看数据冗余是配了总共 1 份还是 2 份),数据会倾斜很多。
这种情况下一般是起多个 receiver,分别消费数据的不同部分,而且 Spark Streaming 会保证 receiver 均分到不同的 Executor 上,这样数据就可以均分到不同的 Executor 上了,同时计算时又会有很好的 data locality。
Accumulators and Broadcast Variables Accumulators and Broadcast variables cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulators or Broadcast variables as well, you’ll have to create lazily instantiated singleton instances for Accumulators and Broadcast variables so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.
when I use lazy --no problem def getWordBlacklist(sparkContext): if ('wordBlacklist' not in globals()): globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) return globals()['wordBlacklist']
when I use bc = sc.broadcast(["a", "b", "c"]) in main func and use in foreachRDD has error
File "/home/bfd_hz/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id raise Exception("Broadcast variable '%s' not loaded!" % bid) Exception: (Exception("Broadcast variable '0' not loaded!",)
@wangwenting
之前贴的代码确实有问题,从你上面的解释里也学习到了 accumulators & broadcast 在 Streaming 里的正确用法,感谢感谢!
@lw-lin 2.6.0,应该是那个原因