Lijie Xu

Results 41 comments of Lijie Xu

@zdkzdk 如果“走partitioner”的意思是“根据partitioner提供的信息进行shuffle”的话,那么窄依赖不需要根据partitioner提供的信息进行shuffle,只需根据partitioner提供的信息获取数据进行处理(一般是map()等操作)。

“我感觉宽依赖也可以以pipeline的方式并行计算,比如groupByKey,父RDD的分区内的数据经过partitioner确定分区id后,直接分发到子RDD对应的分区中不就行了,为啥要等父RDD各个分区分别算完,再发送到子RDD聚合呢?” => 是直接发送,同步在reduce端,reduce需要收集所有的records,进行聚合后才能输出。

我再仔细回答一下这个我能听明白的问题:为什么宽依赖就不能以pipeline的方式处理呢? => 可以看看第4章的第4.1节,里面讨论了stage和task的划分,其中的“想法二”介绍了当存在宽依赖(ShuffleDependency)时,如果仍然尽可能串联更多算子(即pipeline思想)所带来的各种问题(task过大问题、重复计算问题、并行计算问题等)。另外,由于宽依赖是部分依赖,上游算子的输出数据流向多个地方,而且流向哪里是动态确定的,如果强行进行pipeline,那么不管采用pull还是push方式在算子间传递数据都是困难的(可以自己尝试一下能否实现一个高效的pipeline方法)。 由于以上原因,最终需要将宽依赖(ShuffleDependency)前后的计算拆分为不同的stage。这时上游stage的task和下游stage的task都不是一个task,不能通过pipeline来进行处理。pipeline强调的是在同一个task中可以连续执行多个算子,而且算子间的数据不需要写入磁盘,并不是指task间的数据发送与接收。 => 对于窄依赖来说,上下游RDD的每个partition之间关系是完全确定的,可以使用4.4节的pipeline方法进行pipeline,即只需迭代读取上游算子的输出,下游算子一条条进行处理即可。当然对于复杂的窄依赖(ManyToManyDepedency),采用pipeline处理时也可能会出现上游算子的重复计算问题,因此复杂的ManyToManyDepedency窄依赖在实际Spark的算子中很少存在。

1. 对于`rdd2 = rdd1.groupByKey(partitioner1)`,当`partitioner1 == rdd1采用的partitioner`,也就是你说的`分区器的类型和分区都要相同`时,groupByKey()不进行shuffle,而只进行mapPartitions()操作,详见`combineByKeyWithClassTag`代码。除此以外,groupByKey()将进行shuffle。 “如果左边的分区数比右边大,子RDD的一个分区依赖父RDD多个分区的全部“,这种情况属于NarrowDependency(见图3.2)不进行shuffle,不过这种情况目前只会出现在用户自定义的算子中,Spark代码中的groupByKey()算子没有对这种情况进行特殊检查和处理,因为进行检查的代价太高(检查保证相同key的records会被放到同一个partition中),所以目前groupByKey()的逻辑就是partitioner相同不进行shuffle,除此以外都进行shuffle。 2. 可以换,比如下面代码将HashPartitioner换为RangePartitioner,不管什么partitioner,只要能将相同key的records放在同一分区即可。 ```scala package basic.transfromation import org.apache.spark.sql.SparkSession import org.apache.spark.{HashPartitioner, RangePartitioner} object GroupByKeyExample { def main(args: Array[String]): Unit = { val spark = SparkSession...

@ycli12 谢谢提出的问题,ManyToManyDependency确实和ShuffleDependency很像,但ManyToManyDependency属于NarrowDependency,因为ManyToManyDependency不涉及分区,也不需要运行Shuffle write/read流程,具体例子见cartesian()。当然如果强行套用Shuffle框架,将RDD中所有数据都放到一个分区,然后执行Shuffle过程,那么child RDD可能不能收到想要的数据,比如如果将图3.2中的第二图中的分区1和2都改为分区1,那么RDD2中第二个p得不到任何数据。所以虽然ManyToManyDependency和ShuffleDependency样子相似,但两者是不同的计算逻辑。

@ycli12 理论上会出现重复计算,如果父RDD已经被cache就不用重复计算。

@ fouradam 抱歉,后续会将代码上传

@fouradam 代码已上传,配图由于版权问题暂不上传。书中的大部分图可以看清,如果看不清可以留言,我和编辑协商后再上传那些图。

``` xml 4.0.0 cn.ac.iscas SparkBook 1.0-SNAPSHOT 2018 2.11.8 1.8 2.4.3 org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} org.scala-lang scala-reflect ${scala.version} org.scala-lang scalap ${scala.version} junit junit 4.12 test org.specs specs 1.2.5 test...

这里没有说清楚,抱歉。 应该是 10,000 个 records,这里说 9 个只是画图方便,有空我会再改下,谢谢指出。