ApacheSparkBook
ApacheSparkBook copied to clipboard
page48,关于宽窄依赖的划分
1、你说 父RDD的一个或多个分区的数据需要全部流入子RDD的一个或多个分区,就是窄依赖。是不是可以这么理解,只要需要通过partitioner进行分发的,就是宽依赖,否则就是窄依赖。 假如父RDD有3个分区,但每个分区只有一条数据,是不是无论子RDD如何,这都是窄依赖,因为符合你上面说的。假如走了partitioner分发呢?? 2、为什么宽依赖就不能以pipeline的方式处理呢?无非就是中间加个partitioner判断而已。 3、假如一个stage里面有很多窄依赖,导致节点负载过重,有没有办法强行在窄依赖之间划分stage呢?flink是可以的
1、你说 父RDD的一个或多个分区的数据需要全部流入子RDD的一个或多个分区,就是窄依赖。是不是可以这么理解,只要需要通过partitioner进行分发的,就是宽依赖,否则就是窄依赖。=> 不建议使用“只要需要通过partitioner进行分发的,就是宽依赖”,因为很难去定义这里的partitioner是什么。目前图3.2及其相关描述已经很清晰,不需要再引入其他可能导致歧义的定义。
假如父RDD有3个分区,但每个分区只有一条数据,是不是无论子RDD如何,这都是窄依赖,因为符合你上面说的。假如走了partitioner分发呢??=> 不是,即使在只有一条数据的情况下,如果是ShuffleDependency,那么数据可能进入下游的分区id是不确定的(根据具体key来定),可能第一个分区,也可能是下游的第二个分区,而NarrowDependency情况下,其进入的下游分区是确定的。
2、为什么宽依赖就不能以pipeline的方式处理呢?无非就是中间加个partitioner判断而已。=> 需要经过网络去获取远程数据,获取完整结果并处理后才能进行下一步处理,属于同步操作。
3、假如一个stage里面有很多窄依赖,导致节点负载过重,有没有办法强行在窄依赖之间划分stage呢?flink是可以的 => 可以,但要考虑内存消耗、计算效率等因素。
1、你说 父RDD的一个或多个分区的数据需要全部流入子RDD的一个或多个分区,就是窄依赖。是不是可以这么理解,只要需要通过partitioner进行分发的,就是宽依赖,否则就是窄依赖。=> 不建议使用“只要需要通过partitioner进行分发的,就是宽依赖”,因为很难去定义这里的partitioner是什么。目前图3.2及其相关描述已经很清晰,不需要再引入其他可能导致歧义的定义。 $$$跨节点的窄依赖之间传数据应该不通过partitioner是吧?跨节点窄依赖之间数据的网络传输,也有缓存的概念吗?
假如父RDD有3个分区,但每个分区只有一条数据,是不是无论子RDD如何,这都是窄依赖,因为符合你上面说的。假如走了partitioner分发呢??=> 不是,即使在只有一条数据的情况下,如果是ShuffleDependency,那么数据可能进入下游的分区id是不确定的(根据具体key来定),可能第一个分区,也可能是下游的第二个分区,而NarrowDependency情况下,其进入的下游分区是确定的。 分区id确定、不确定,这重要吗?就像一个分叉路口,走到路口再决定要去哪,这个好像也没多大影响 2、为什么宽依赖就不能以pipeline的方式处理呢?无非就是中间加个partitioner判断而已。=> 需要经过网络去获取远程数据,获取完整结果并处理后才能进行下一步处理,属于同步操作。 $$$有些明白了,宽窄依赖的区别,本质就是是否等父RDD完全计算完之后才能进行下一步。因为要等父RDD全部算完,所以不能以pipeline的方式进行。 3、假如一个stage里面有很多窄依赖,导致节点负载过重,有没有办法强行在窄依赖之间划分stage呢?flink是可以的 => 可以,但要考虑内存消耗、计算效率等因素。 $$$假如现在有一个stage里面有很多窄依赖,导致节点负载过重,应该怎么解决这个问题呢? (1)使用coleasce(shuffle=true)强行shuffle一下
跟您再确认个问题: 1、窄依赖是不是就不需要走partitioner了? 2、我感觉宽依赖也可以以pipeline的方式并行计算,比如groupByKey,父RDD的分区内的数据经过partitioner确定分区id后,直接分发到子RDD对应的分区中不就行了,为啥要等父RDD各个分区分别算完,再发送到子RDD聚合呢?
@zdkzdk 如果“走partitioner”的意思是“根据partitioner提供的信息进行shuffle”的话,那么窄依赖不需要根据partitioner提供的信息进行shuffle,只需根据partitioner提供的信息获取数据进行处理(一般是map()等操作)。
“我感觉宽依赖也可以以pipeline的方式并行计算,比如groupByKey,父RDD的分区内的数据经过partitioner确定分区id后,直接分发到子RDD对应的分区中不就行了,为啥要等父RDD各个分区分别算完,再发送到子RDD聚合呢?”
=> 是直接发送,同步在reduce端,reduce需要收集所有的records,进行聚合后才能输出。
@zdkzdk 如果“走partitioner”的意思是“根据partitioner提供的信息进行shuffle”的话,那么窄依赖不需要根据partitioner提供的信息进行shuffle,只需根据partitioner提供的信息获取数据进行处理(一般是map()等操作)。
1、map操作为啥需要partitioner进行处理。 2、窄依赖间跨节点的map操作,也是需要partitioner进行分发是吗
2、为什么宽依赖就不能以pipeline的方式处理呢?无非就是中间加个partitioner判断而已。=> 需要经过网络去获取远程数据,获取完整结果并处理后才能进行下一步处理,属于同步操作。 $$$有些明白了,宽窄依赖的区别,本质就是是否等父RDD完全计算完之后才能进行下一步。因为要等父RDD全部算完,所以不能以pipeline的方式进行。
$$$还有个疑问,有些操作也不需要等前面RDD算完才能进行下一步吧,比如groupByKey,完全可以并行操作啊
我再仔细回答一下这个我能听明白的问题:为什么宽依赖就不能以pipeline的方式处理呢?
=> 可以看看第4章的第4.1节,里面讨论了stage和task的划分,其中的“想法二”介绍了当存在宽依赖(ShuffleDependency)时,如果仍然尽可能串联更多算子(即pipeline思想)所带来的各种问题(task过大问题、重复计算问题、并行计算问题等)。另外,由于宽依赖是部分依赖,上游算子的输出数据流向多个地方,而且流向哪里是动态确定的,如果强行进行pipeline,那么不管采用pull还是push方式在算子间传递数据都是困难的(可以自己尝试一下能否实现一个高效的pipeline方法)。
由于以上原因,最终需要将宽依赖(ShuffleDependency)前后的计算拆分为不同的stage。这时上游stage的task和下游stage的task都不是一个task,不能通过pipeline来进行处理。pipeline强调的是在同一个task中可以连续执行多个算子,而且算子间的数据不需要写入磁盘,并不是指task间的数据发送与接收。
=> 对于窄依赖来说,上下游RDD的每个partition之间关系是完全确定的,可以使用4.4节的pipeline方法进行pipeline,即只需迭代读取上游算子的输出,下游算子一条条进行处理即可。当然对于复杂的窄依赖(ManyToManyDepedency),采用pipeline处理时也可能会出现上游算子的重复计算问题,因此复杂的ManyToManyDepedency窄依赖在实际Spark的算子中很少存在。
我再仔细回答一下这个我能听明白的问题:为什么宽依赖就不能以pipeline的方式处理呢?
=> 可以看看第4章的第4.1节,里面讨论了stage和task的划分,其中的“想法二”介绍了当存在宽依赖(ShuffleDependency)时,如果仍然尽可能串联更多算子(即pipeline思想)所带来的各种问题(task过大问题、重复计算问题、并行计算问题等)。另外,由于宽依赖是部分依赖,上游算子的输出数据流向多个地方,而且流向哪里是动态确定的,如果强行进行pipeline,那么不管采用pull还是push方式在算子间传递数据都是困难的(可以自己尝试一下能否实现一个高效的pipeline方法)。
由于以上原因,最终需要将宽依赖(ShuffleDependency)前后的计算拆分为不同的stage。这时上游stage的task和下游stage的task都不是一个task,不能通过pipeline来进行处理。pipeline强调的是在同一个task中可以连续执行多个算子,而且算子间的数据不需要写入磁盘,并不是指task间的数据发送与接收。
=> 对于窄依赖来说,上下游RDD的每个partition之间关系是完全确定的,可以使用4.4节的pipeline方法进行pipeline,即只需迭代读取上游算子的输出,下游算子一条条进行处理即可。当然对于复杂的窄依赖(ManyToManyDepedency),采用pipeline处理时也可能会出现上游算子的重复计算问题,因此复杂的ManyToManyDepedency窄依赖在实际Spark的算子中很少存在。
$$$非常感谢,突然之间看懂那个图了,如果要并行计算的话,确实要重复计算。