Lijie Xu
Lijie Xu
@hangjianglaoweng 感谢指出,目测目前的代码实现用了线性探测法。然而,AppendOnlyMap的注释中说使用了二次探测法(This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing)。想要深究的话,可以给Spark提个issue,看社区到底是想采用二次探测还是线性探测。
@MingRongXi 第一个问题:数组的扩容机制 "如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上" 这里描述的流程与实际代码不符,实际代码是如果存放不下,会直接扩容为原来的二倍,在扩容完之后在把数据存进去,存完以后再判断是否需要溢写,所以这里我认为这里在扩容时会有OOM的风险;并且如果curSize=MAXIMUM_CAPACITY,则会直接抛异常 ==> Lijie: “如果Array存放不下,则会先扩容” 指的是“如果存放不下,会扩容为原来的二倍,扩容完后可以存放新的数据”。 “如果还存放不下,就将Array中的record排序后spill到磁盘上" 指的是“如果没有扩容的空间或者存放的数据总大小超过阈值,将Array中的record排序后spill到磁盘上”。我记得Spark会定时估计Array大小,如果超过阈值就spill到磁盘,所以一般不会出现OOM。但由于是定时的粗略估计,而不是精确统计(需要统计所有对象的大小太耗时),所以在存放过程中仍可能存在OOM。当然,还有很多OOM的情况,可以参考我们之前的paper:https://jerrylead.github.io/papers/ISSRE-2015-OOM.pdf 如果你在实际运行中这里出现了OOM情况,可以报告给社区。 第二个问题:Map的扩容机制 与1类似,都是扩容以后再判断是否需要溢写 ==> Lijie: 我将描述简化为“如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"就是为了避免后续代码更改带来一些细节描述出入。本质来讲,空间不足时先扩容,不能扩容达到阈值就spill。 第三个问题:需要map()端combine,需要或者不需要按Key进行排序 “如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序” 实际上是如果不需要按Key排序,那么按照 partitionId + hash(Key) 排序,这里我猜测是为了在合并溢写文件的时候效率更高 => Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second...
@MingRongXi 关于第三点, => Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。 => MingRongXi:在SortShuffleWriter insert完数据后,会调用writePartitionedMapOutput方法,writePartitionedMapOutput里调用了comparator,而comparator只有在map端不需要排序且不需要聚合的时候才会返回None,如果需要map端聚合,就会使用hash(Key) 或者 Key 排序 => Lijie: 从设计角度来说,map端如果需要combine但不需要key排序,那么只需区分partition Id ,并能够按照key进行combine即可,这时有两种情况:如果内存能放大的下,不必进行key/hash(key)排序;如果内存放不下需要spill,那么可以在spill前再次对key/hash(key)排序方便merge(如书中AppendOnlyMap的图示)。当然实现时,为了方便或者统一接口,两种情况下可以都对key/hash(key)排序。本书更注重的是设计方法,开阔思路,而非完全遵照Spark现有实现,而且实现是在不断迭代优化,也许后面会有更好的实现方法。
1. 见图3.2中的ManyToManyDependency,以及书中对应的解释,举例见P71 cartesian()的描述。 2. 为了避免内存溢出和错误容忍,详见6.2.4节和8.3.2中的(1)。
@ hangjianglaoweng 本书主要介绍基于RDD接口的Join,书中介绍的join类似于SparkSQL中的Shuffle hash join。SparkSQL面向高性能的SQL查询分析,所以会对SQL语句实现做很多优化,Broadcast Hash Join以及Sort Merge Join是其中的两个优化,针对一些特殊的join场景执行效率会高些。关于SparkSQL的内核解析,可以阅读我师弟的著作《SparkSQL内核剖析》。
@eshellman Thanks for your advice. I've added the readme for Thai version. https://github.com/JerryLead/SparkInternals/tree/master/markdown/thai
Narrow指的是完全依赖,parentRDD中每个p中的数据不需要再进行partition后发给childRDD。下面的cartesian(otherRDD)展示了N:N的Narrow Dependency,整个计算过程不需要shuffle。
@ycli12 感谢指出,你的理解是对的,补充一句“在缓存空间用了少于50%,比如20%的时候,框架执行内存还是可以使用80%。但是此时如果需要缓存更多数据,缓存空间由于被框架执行内存占用而不足,只能丢弃需要缓存的数据。“
@ycli12 谢谢提供的信息,后续会改进
1、你说 父RDD的一个或多个分区的数据需要全部流入子RDD的一个或多个分区,就是窄依赖。是不是可以这么理解,只要需要通过partitioner进行分发的,就是宽依赖,否则就是窄依赖。=> 不建议使用“只要需要通过partitioner进行分发的,就是宽依赖”,因为很难去定义这里的partitioner是什么。目前图3.2及其相关描述已经很清晰,不需要再引入其他可能导致歧义的定义。 假如父RDD有3个分区,但每个分区只有一条数据,是不是无论子RDD如何,这都是窄依赖,因为符合你上面说的。假如走了partitioner分发呢??=> 不是,即使在只有一条数据的情况下,如果是ShuffleDependency,那么数据可能进入下游的分区id是不确定的(根据具体key来定),可能第一个分区,也可能是下游的第二个分区,而NarrowDependency情况下,其进入的下游分区是确定的。 2、为什么宽依赖就不能以pipeline的方式处理呢?无非就是中间加个partitioner判断而已。=> 需要经过网络去获取远程数据,获取完整结果并处理后才能进行下一步处理,属于同步操作。 3、假如一个stage里面有很多窄依赖,导致节点负载过重,有没有办法强行在窄依赖之间划分stage呢?flink是可以的 => 可以,但要考虑内存消耗、计算效率等因素。