ApacheSparkBook
ApacheSparkBook copied to clipboard
第六章《Shuffle 机制》勘误与修改建议
第171页,6.4.1 AppendOnlyMap的原理,
“如果该位置已经被占用,就使用二次探测法来找下一个空闲位置”,源码的实现应该是“线性探测法”不是“二次探测法”。
所以,图6.12中 第一次定位应该是Hash(K6)*2 第二次定位应该是(Hash(K6)+1)*2 第三次定位应该是(Hash(K6)+1+2)*2
如果有第四次应该是(Hash(K6)+1+2+3)*2
这里面乘以2是因为每次key和value一起放入占两个位置,如果是占一个位置,其实就是线性探测法的公式hi=(h(key)+i)%m。 不知道说的对不对,请指正。
@hangjianglaoweng 感谢指出,目测目前的代码实现用了线性探测法。然而,AppendOnlyMap的注释中说使用了二次探测法(This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing)。想要深究的话,可以给Spark提个issue,看社区到底是想采用二次探测还是线性探测。
利杰你好,我在用spark-core2.13做debug的时候,发现代码里有些流程和书里描述的不太一样 ShuffleWrite框架的设计与实现
- 数组的扩容机制
"如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"
这里描述的流程与实际代码不符,实际代码是如果存放不下,会直接扩容为原来的二倍,在扩容完之后在把数据存进去,存完以后再判断是否需要溢写,所以这里我认为这里在扩容时会有OOM的风险;并且如果curSize=MAXIMUM_CAPACITY,则会直接抛异常
- Map的扩容机制 与1类似,都是扩容以后再判断是否需要溢写
- 需要map()端combine,需要或者不需要按Key进行排序
“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”
实际上是如果不需要按Key排序,那么按照 partitionId + hash(Key) 排序,这里我猜测是为了在合并溢写文件的时候效率更高
@MingRongXi
第一个问题:数组的扩容机制 "如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上" 这里描述的流程与实际代码不符,实际代码是如果存放不下,会直接扩容为原来的二倍,在扩容完之后在把数据存进去,存完以后再判断是否需要溢写,所以这里我认为这里在扩容时会有OOM的风险;并且如果curSize=MAXIMUM_CAPACITY,则会直接抛异常
==> Lijie: “如果Array存放不下,则会先扩容” 指的是“如果存放不下,会扩容为原来的二倍,扩容完后可以存放新的数据”。 “如果还存放不下,就将Array中的record排序后spill到磁盘上" 指的是“如果没有扩容的空间或者存放的数据总大小超过阈值,将Array中的record排序后spill到磁盘上”。我记得Spark会定时估计Array大小,如果超过阈值就spill到磁盘,所以一般不会出现OOM。但由于是定时的粗略估计,而不是精确统计(需要统计所有对象的大小太耗时),所以在存放过程中仍可能存在OOM。当然,还有很多OOM的情况,可以参考我们之前的paper:https://jerrylead.github.io/papers/ISSRE-2015-OOM.pdf 如果你在实际运行中这里出现了OOM情况,可以报告给社区。
第二个问题:Map的扩容机制 与1类似,都是扩容以后再判断是否需要溢写 ==> Lijie: 我将描述简化为“如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"就是为了避免后续代码更改带来一些细节描述出入。本质来讲,空间不足时先扩容,不能扩容达到阈值就spill。
第三个问题:需要map()端combine,需要或者不需要按Key进行排序 “如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序” 实际上是如果不需要按Key排序,那么按照 partitionId + hash(Key) 排序,这里我猜测是为了在合并溢写文件的时候效率更高
=> Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。
@JerryLead 感谢利杰回复,第一点和第二点是我理解的不到位,关于第三点,我看代码确认了下
=> Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。
=> MingRongXi:在SortShuffleWriter insert完数据后,会调用writePartitionedMapOutput方法,writePartitionedMapOutput里调用了comparator,而comparator只有在map端不需要排序且不需要聚合的时候才会返回None,如果需要map端聚合,就会使用hash(Key) 或者 Key 排序
@MingRongXi
关于第三点, => Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。 => MingRongXi:在SortShuffleWriter insert完数据后,会调用writePartitionedMapOutput方法,writePartitionedMapOutput里调用了comparator,而comparator只有在map端不需要排序且不需要聚合的时候才会返回None,如果需要map端聚合,就会使用hash(Key) 或者 Key 排序 => Lijie: 从设计角度来说,map端如果需要combine但不需要key排序,那么只需区分partition Id ,并能够按照key进行combine即可,这时有两种情况:如果内存能放大的下,不必进行key/hash(key)排序;如果内存放不下需要spill,那么可以在spill前再次对key/hash(key)排序方便merge(如书中AppendOnlyMap的图示)。当然实现时,为了方便或者统一接口,两种情况下可以都对key/hash(key)排序。本书更注重的是设计方法,开阔思路,而非完全遵照Spark现有实现,而且实现是在不断迭代优化,也许后面会有更好的实现方法。
好的,理解了,感谢!
@hangjianglaoweng 感谢指出,目测目前的代码实现用了线性探测法。然而,AppendOnlyMap的注释中说使用了二次探测法(This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing)。想要深究的话,可以给Spark提个issue,看社区到底是想采用二次探测还是线性探测。
参考 SPARK-4690,代码使用的是二次探测法。 第0次位置P(0)=Hash(K); 第1次探测位置P(1)=Hash(K)+1; 第2次探测位置P(2)=P(1)+2=Hash(K)+3; 递推公式为P(i+1)=P(i)+i; 通项公式为P(i+1)=Hash(K)+(n+1)*n/2; 关键点在于看通项公式的次幂而不是递推公式的次幂,因此是二次探测而不是线性探测。 书中文字和插图示例也存在问题,即第三次定位应为向后递增3个record位置,而不是4个record位置。