CoolplaySpark icon indicating copy to clipboard operation
CoolplaySpark copied to clipboard

《3.1 Receiver 分发详解.md》讨论区

Open lw-lin opened this issue 8 years ago • 20 comments

这里是 《3.1 Receiver 分发详解.md》 讨论区。

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

lw-lin avatar Dec 05 '15 14:12 lw-lin

其中,在 Receiver y 失效时,以前的 Spark Streaming 有可能会在 executor 1 上重启 Recever y,而 1.5.0 以来,将在 executor 2 上重启 Receiver y ,应该是executor 3吧(根据图形)

ouyangshourui avatar Dec 31 '15 05:12 ouyangshourui

@ouyangshourui 是的,确实应该是 executor 3。可否提一个 Pull Request 修正下这个 typo?感谢!

lw-lin avatar Jan 07 '16 01:01 lw-lin

@ouyangshourui Merged your PR into master. Good job & thanks !

lw-lin avatar Jan 12 '16 01:01 lw-lin

在 driver 端构造一个 RDD 实例,这个 RDD 分为 x 个 partition 没看懂这里的RDD分为x个partition? 求解答 谢谢

pzz2011 avatar Jan 26 '16 12:01 pzz2011

@pzz2011 这里是说,driver 端在构造 RDD 的时候,指定其并行度为 x -- 即指定 x 个 partition -- 将来在 executor 端,1 个 partition 会有 1 个并行度来负责处理。 因为 Receiver 有 x 个,而我们希望每个 Receiver 有 1 个并行度来处理,所以最开始在分发 Receiver 的 RDD 里,即将 RDD 的 partition 指定为 x 个了。

lw-lin avatar Jan 27 '16 01:01 lw-lin

@proflin 请教一个问题,Spark Stream面向的流计算应用,似乎根据应用的不同,各个batch内的数据之间的依赖完全取决于业务需求,这种依赖性完全不同与图计算这种应用。 感觉从传统体系结构上相对Spark Stream进行优化似乎比较难。不知道你怎么看。 还在学习中..

pzz2011 avatar Feb 22 '16 07:02 pzz2011

@pzz2011 没太明白你的问题。是说想让 Streaming 的程序各个 Batch 之间产生依赖?比如后一个 Batch 依赖前一个 Batch?

lw-lin avatar Feb 22 '16 08:02 lw-lin

@proflin 简单的说就是,我觉得针对spark stream,如果从cache局部性的角度区优化,似乎比较难

pzz2011 avatar Feb 22 '16 11:02 pzz2011

@pzz2011 这个不难啊,因为 Spark Streaming 的 cache(), persist() 数据存储完全是来自 Spark Core 的,而且对计算任务的调度和下发也是 Spark Core 的 DAGScheduler 来完成的,会朝着 data locality 最好的计算方式去调度。

lw-lin avatar Feb 22 '16 13:02 lw-lin

“会朝着 data locality 最好的计算方式去调度。” 哦?话说这部分代码是? 我之前留言中考虑了各个batch之间的依赖关系,那个问题是这样的。

我之前看到图计算方面的论文,因为不同顶点之间存在数据依赖,巨量的顶点不可能只分布到单个节点的内存中,具体到numa架构中,这些顶点不可能只分布于单个socket的所拥有的内存中,即要对顶点分区存储。

然后现在考虑spark stream,不同业务场景使用的方法必然不同,可能有的业务对于时间属性比较看重,比如基于时间序列分析的流计算(瞎扯的),根本不强调时间序列的流计算,和线下历史数据交互分析的,等等。那么这些场景中,使用spark stream时。这里有的数据之间有依赖,有的其实是没有什么依赖关系的。 (其实我描述的这种依赖粒度比较大的)

那么问题来了,两者有没有什么类似的地方呢?

话说spark stream这里dstream之间的依赖和我上面描述的依赖其实不是一个东西,二者存在一些交集就是了。

然后您说的“会朝着 data locality 最好的计算方式去调度。”这个应该指的是根据DAG来进行的。当然我挺懵的现在...T_T

pzz2011 avatar Feb 22 '16 13:02 pzz2011

@pzz2011

OK, 大致明白你的意思了。这个问题在 Spark 生态系统里,跟 Spark Streaming 部分关系不大,主要还是跟 Spark Core 部分关系比较大。

比如假设我们有 A={a1, a2, a3}, B={b1, b2}, C ={c1, c2, c3, c4, c5} 三种类型的数据,现在有 3 个节点,对计算最好的分布如下:节点 1:[a1, b1, c1];节点 2:[a2, b2, c2, c3];节点 3:[a3, c4, c5],那么怎么达到这种数据分布呢?在 Spark Core 里,可以用 A.cogroup(B, C, partitioner),其中 partitioner 是自定义的方法,用于对不同的数据进行自定义分区,cogroup() 解释可以参考 这里.

希望有帮助 :)

lw-lin avatar Feb 22 '16 14:02 lw-lin

@proflin 谢谢! 还有一个问题想问,spark 中scala写的代码中,一堆的ClassTag之类,看得实在难受,有什么办法解决吗

pzz2011 avatar Feb 24 '16 10:02 pzz2011

@pzz2011 哪里有 ClassTag?贴些示例代码?

lw-lin avatar Feb 24 '16 12:02 lw-lin

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
   Array[U] = {
  val result = new Array[U](partitions.size)
  runJob[T,U](rdd,func,partitions,allowLocal,(index,res) => results(index) = res)
  result
}

pzz2011 avatar Feb 25 '16 01:02 pzz2011

@proflin 忘记@了

pzz2011 avatar Feb 26 '16 01:02 pzz2011

@pzz2011 这些 ClassTag 都是源码里的东西,也是合理的使用,不应该去掉,还是适应着看吧。

lw-lin avatar Feb 26 '16 05:02 lw-lin

@lw-lin 从源码来看,一个 ReceiverInputDStream#getReceivers 返回一个 Receiver;在最终生成的receiverRDD应该也只是 partition 数为1的 RDD。为何最终会有多个 receiver 分发到 executos 呢?

weibin0516 avatar Mar 14 '16 14:03 weibin0516

@keepsimplefocus 确实是这样。1 ReceiverInputDStream ←→ 1 Receiver ←→ 1 分发 RDD(partition 数为 1)。 如果想要分发 n 个 Receiver, 只要把 n 个 ReceiverInputDStream 给 union 到一起就好了:ssc.union(Seq[ReceiverInputDStream]),就像在 Spark Core 模块里 union 多个 RDD 一样。

lw-lin avatar Mar 15 '16 00:03 lw-lin

@pzz2011 ClassTag只是为了给JVM的类型擦除实现擦屁股,目的是为了把类型信息传进去,因此,只要你心中有类型,那些ClassTag直接无视就好了吧。

AntikaSmith avatar Mar 29 '17 10:03 AntikaSmith

请教: 2.1.1 版本中 ReceiverInputDStream 子类只包含 KafkaInputDStream PluggableInputDStream RawInputDStream SocketInputDStream 四个了 有一些应该是删除了 文中是否应该做响应修改? 谢谢

darionyaphet avatar Jun 19 '17 03:06 darionyaphet