Notes
Notes copied to clipboard
spark的StorageLevel源码分析
spark的存储
类的全名为 org.apache.spark.storage.StorageLevel
class相关
首先该类主构造器中定义了5个属性:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1
) extends Externalizable{
override def readExternal(in: ObjectInput): Unit = ???
override def writeExternal(out: ObjectOutput): Unit = ???
}
由于并不希望所有的东西都被序列化,或者说在对象还原之后,内部的子对象会重新创建而不需要将该子对象序列化,所以这里用有着readExternal和writeExternal方法的Externalizable接口代替Serializable,在序列化和反序列化时会调用对应的方法
再看辅助构造器:
def this() = this(false, true, false, false)
// TODO: Also add fields for caching priority, dataset ID, and flushing.
private def this(flags: Int, replication: Int) {
this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
一个是私有的,一个是对外开放的,私有的那个过一会再说怎么用,现在先卖个关子
接下来是一些方法:
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
override def hashCode(): Int = toInt * 41 + replication
这里明确告诉我们replication要小于40,而useOffHeap的使用主要是避免一些jvm gc和节约jvm 内存,注意如果是useOffHeap,那么将不支持useDisk和useMemory,也不支持deserialized,同时replication要等于1,源代码是:
if (useOffHeap) {
require(!useDisk, "Off-heap storage level does not support using disk")
require(!useMemory, "Off-heap storage level does not support using heap memory")
require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
里面有个很有意思的方法,toInt
def toInt: Int = {
var ret = 0
if (_useDisk) {
ret |= 8
}
if (_useMemory) {
ret |= 4
}
if (_useOffHeap) {
ret |= 2
}
if (_deserialized) {
ret |= 1
}
ret
}
这个方法主要是利用二进制数实现一个使用存储的判等,如果使用_useDisk,最终的ret是1000:
object A extends App{
val _useDisk = true
val _useMemory = false
val _useOffHeap = false
val _deserialized = false
def toInt: Int = {
var ret = 0
if (_useDisk) {
ret |= 8
}
if (_useMemory) {
ret |= 4
}
if (_useOffHeap) {
ret |= 2
}
if (_deserialized) {
ret |= 1
}
ret
}
println(Integer.toBinaryString(toInt))//1000,二进制的8
}
那clone和equals方法也容易理解
override def clone(): StorageLevel = {
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
}
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
s.useDisk == useDisk &&
s.useMemory == useMemory &&
s.useOffHeap == useOffHeap &&
s.deserialized == deserialized &&
s.replication == replication
case _ =>
false
}
而有效性检查则是
def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)
也就是只要replication大于0以及使用了内存、磁盘或者OffHeap的任意一种就可以
接下来查看刚才接口中还未实现的两个方法:
先看序列化时候调用的writeExternal
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeByte(toInt)
out.writeByte(_replication)
}
先介绍
def tryOrIOException(block: => Unit) {
try {
block
} catch {
case e: IOException => throw e
case NonFatal(t) => throw new IOException(t)
}
}
避免了每次去catchIOException,而对代码块进行封装,
这里调用toInt去保存此时的关于useDisk,useMemory,useMemory的状态,比如如果是useDisk,其他为false,那么则会写入8,也就是二进制的1000,不得不佩服设计的巧妙,同时也将replication作为第二个字节写入了,这样就保存了此时的对象状态
下面进行反序列化
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val flags = in.readByte()
_useDisk = (flags & 8) != 0
_useMemory = (flags & 4) != 0
_useOffHeap = (flags & 2) != 0
_deserialized = (flags & 1) != 0
_replication = in.readByte()
}
如果我们在序列化阶段写入的是8,那么此时flags就是8,那么8与8进行&操作结果将是8,也就是1000,是不等于0的,所以_useDisk为true,其他部分进行&操作是false,所以能在反序列化的时候得到正确的值
最后看一下描述以及toString
@throws(classOf[IOException])
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
override def toString: String = {
s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
}
def description: String = {
var result = ""
result += (if (useDisk) "Disk " else "")
result += (if (useMemory) "Memory " else "")
result += (if (useOffHeap) "ExternalBlockStore " else "")
result += (if (deserialized) "Deserialized " else "Serialized ")
result += s"${replication}x Replicated"
result
}
这个也没啥介绍的,比较直观
object
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
如果是DISK_ONLY,那么则构建的是new StorageLevel(true, false, false, false,1),也就是useDisk属性为true,其余为false,replication为1,使用默认值,在这里省略为:new StorageLevel(true, false, false, false)
同样我们知道了如果是replication为2,则是DISK_ONLY_2,以此类推
而fromString方法主要使用模式匹配选择上述策略
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
接着看两个私有方法,一个用了线程安全的ConcurrentHashMap构建了一个cache,另一个则使用putIfAbsent的原子性操作去修改和或获取level,这个方法类似put,但只有当不含有该key时才能添加进去,如果已经包含该key,则会保存现有值
private[spark] val storageLevelCache = new ConcurrentHashMap[StorageLevel, StorageLevel]()
private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
storageLevelCache.putIfAbsent(level, level)
storageLevelCache.get(level)
}
最后介绍几个apply方法:
/**
* :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap.
注释有问题,这里应该是返回一个新的对象而需要设置全部属性
*/
@DeveloperApi
def apply(
useDisk: Boolean,
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
replication: Int): StorageLevel = {
getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
}
/**
* :: DeveloperApi ::
* Create a new StorageLevel object.
这里应该是将useOffHeap设为了false,传递参数时不需要给定useOffHeap的值
*/
@DeveloperApi
def apply(
useDisk: Boolean,
useMemory: Boolean,
deserialized: Boolean,
replication: Int = 1): StorageLevel = {
getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
}
/**
* :: DeveloperApi ::
* Create a new StorageLevel object from its integer representation.
通过flags和replication设置,调用的就是上述那个私有的辅助构造器
*/
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel = {
getCachedStorageLevel(new StorageLevel(flags, replication))
}
/**
* :: DeveloperApi ::
* Read StorageLevel object from ObjectInput stream.
这边是序列化相关的做法
*/
@DeveloperApi
def apply(in: ObjectInput): StorageLevel = {
val obj = new StorageLevel()
obj.readExternal(in)
getCachedStorageLevel(obj)
}
这是源码,但其实我们发现注释有问题,我给出了中文注释 java api:
package org.apache.spark.api.java;
import org.apache.spark.storage.StorageLevel;
/**
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);
/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}
/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param useOffHeap saved to Tachyon, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useOffHeap,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
这个也是很容易理解的,最后我们看一下官方的使用建议
使用
| storage level | meaning |
|---|---|
| MEMORY_ONLY | 将rdd作为java对象存储在jvm中反序列化,如果rdd在内存中装不下,一些分区将不会被cached,而且在需要的时候将进行再次计算,这是默认的存储级别 |
| MEMORY_AND_DISK | 将rdd作为java对象在jvm中反序列化,如果rdd在内存中装不下,超出部分将被保存在磁盘,在需要的时候从磁盘读取 |
| MEMORY_ONLY_SER | 将rdd作为序列化对象存储(每个分区占一个字节数组),通常比反序列化更高效利用空间,尤其是用fast serializer,但读取的cpu使用率更高 |
| MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,但是在内存装不下时使用磁盘存储,而不是在需要的时候重新计算 |
| DISK_ONLY | 只将rdd存储在磁盘上 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 和前面的level类似,只不过将分区复制到两个集群节点上,即replication=2 |
| OFF_HEAP (experimental) | 在Tachyon中用序列化格式存储rdd,相比MEMORY_ONLY_SER,OFF_HEAP减少垃圾回收的开销,使得executors更小以及共享内存池。这在高并发环境下有吸引力。此外,由于rdd驻留在Tachyon中,executor的崩溃不会造成数据的丢失,这种模式下,Tachyon中的内存是可废弃的,因此,Tachyon并不会尝试去重建从内存中清除的块。 |
使用建议: 优先使用默认方案,如果内存不够则使用MEMORY_ONLY_SER来节约空间,不到万不得已不使用disk,除非进行大数据量操作