Notes icon indicating copy to clipboard operation
Notes copied to clipboard

spark的StorageLevel源码分析

Open cjuexuan opened this issue 9 years ago • 0 comments

spark的存储

类的全名为 org.apache.spark.storage.StorageLevel

class相关

首先该类主构造器中定义了5个属性:

class StorageLevel private(
                              private var _useDisk: Boolean,
                              private var _useMemory: Boolean,
                              private var _useOffHeap: Boolean,
                              private var _deserialized: Boolean,
                              private var _replication: Int = 1
                            ) extends Externalizable{
  override def readExternal(in: ObjectInput): Unit = ???

  override def writeExternal(out: ObjectOutput): Unit = ???
}

由于并不希望所有的东西都被序列化,或者说在对象还原之后,内部的子对象会重新创建而不需要将该子对象序列化,所以这里用有着readExternalwriteExternal方法的Externalizable接口代替Serializable,在序列化和反序列化时会调用对应的方法

再看辅助构造器:

 def this() = this(false, true, false, false) 
   // TODO: Also add fields for caching priority, dataset ID, and flushing.
  private def this(flags: Int, replication: Int) {
    this((flags & 8) != 0, (flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
  }

一个是私有的,一个是对外开放的,私有的那个过一会再说怎么用,现在先卖个关子

接下来是一些方法:

 def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication

  assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")

override def hashCode(): Int = toInt * 41 + replication

这里明确告诉我们replication要小于40,而useOffHeap的使用主要是避免一些jvm gc和节约jvm 内存,注意如果是useOffHeap,那么将不支持useDiskuseMemory,也不支持deserialized,同时replication要等于1,源代码是:

  if (useOffHeap) {
    require(!useDisk, "Off-heap storage level does not support using disk")
    require(!useMemory, "Off-heap storage level does not support using heap memory")
    require(!deserialized, "Off-heap storage level does not support deserialized storage")
    require(replication == 1, "Off-heap storage level does not support multiple replication")
  }

里面有个很有意思的方法,toInt


def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }

这个方法主要是利用二进制数实现一个使用存储的判等,如果使用_useDisk,最终的ret是1000:


object A extends App{
  val _useDisk = true
  val _useMemory = false
  val _useOffHeap = false
  val _deserialized = false

  def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }
  println(Integer.toBinaryString(toInt))//1000,二进制的8

}

那clone和equals方法也容易理解

  override def clone(): StorageLevel = {
    new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
  }

  override def equals(other: Any): Boolean = other match {
    case s: StorageLevel =>
      s.useDisk == useDisk &&
      s.useMemory == useMemory &&
      s.useOffHeap == useOffHeap &&
      s.deserialized == deserialized &&
      s.replication == replication
    case _ =>
      false
  }

而有效性检查则是

def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)

也就是只要replication大于0以及使用了内存、磁盘或者OffHeap的任意一种就可以 接下来查看刚才接口中还未实现的两个方法: 先看序列化时候调用的writeExternal

  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    out.writeByte(toInt)
    out.writeByte(_replication)
  }

先介绍

  def tryOrIOException(block: => Unit) {
    try {
      block
    } catch {
      case e: IOException => throw e
      case NonFatal(t) => throw new IOException(t)
    }
  }

避免了每次去catchIOException,而对代码块进行封装, 这里调用toInt去保存此时的关于useDisk,useMemory,useMemory的状态,比如如果是useDisk,其他为false,那么则会写入8,也就是二进制的1000,不得不佩服设计的巧妙,同时也将replication作为第二个字节写入了,这样就保存了此时的对象状态 下面进行反序列化

  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
    val flags = in.readByte()
    _useDisk = (flags & 8) != 0
    _useMemory = (flags & 4) != 0
    _useOffHeap = (flags & 2) != 0
    _deserialized = (flags & 1) != 0
    _replication = in.readByte()
  }

如果我们在序列化阶段写入的是8,那么此时flags就是8,那么8与8进行&操作结果将是8,也就是1000,是不等于0的,所以_useDisk为true,其他部分进行&操作是false,所以能在反序列化的时候得到正确的值 最后看一下描述以及toString

  @throws(classOf[IOException])
  private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

  override def toString: String = {
    s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
  }
  def description: String = {
    var result = ""
    result += (if (useDisk) "Disk " else "")
    result += (if (useMemory) "Memory " else "")
    result += (if (useOffHeap) "ExternalBlockStore " else "")
    result += (if (deserialized) "Deserialized " else "Serialized ")
    result += s"${replication}x Replicated"
    result
  }

这个也没啥介绍的,比较直观

object


object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)


如果是DISK_ONLY,那么则构建的是new StorageLevel(true, false, false, false,1),也就是useDisk属性为true,其余为false,replication为1,使用默认值,在这里省略为:new StorageLevel(true, false, false, false) 同样我们知道了如果是replication为2,则是DISK_ONLY_2,以此类推

而fromString方法主要使用模式匹配选择上述策略

  @DeveloperApi
  def fromString(s: String): StorageLevel = s match {
    case "NONE" => NONE
    case "DISK_ONLY" => DISK_ONLY
    case "DISK_ONLY_2" => DISK_ONLY_2
    case "MEMORY_ONLY" => MEMORY_ONLY
    case "MEMORY_ONLY_2" => MEMORY_ONLY_2
    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
    case "MEMORY_AND_DISK" => MEMORY_AND_DISK
    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
    case "OFF_HEAP" => OFF_HEAP
    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
  }

接着看两个私有方法,一个用了线程安全的ConcurrentHashMap构建了一个cache,另一个则使用putIfAbsent的原子性操作去修改和或获取level,这个方法类似put,但只有当不含有该key时才能添加进去,如果已经包含该key,则会保存现有值


  private[spark] val storageLevelCache = new ConcurrentHashMap[StorageLevel, StorageLevel]()

  private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
    storageLevelCache.putIfAbsent(level, level)
    storageLevelCache.get(level)
  }

最后介绍几个apply方法:



  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object without setting useOffHeap.
   注释有问题,这里应该是返回一个新的对象而需要设置全部属性
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      useOffHeap: Boolean,
      deserialized: Boolean,
      replication: Int): StorageLevel = {
    getCachedStorageLevel(
      new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object.
   这里应该是将useOffHeap设为了false,传递参数时不需要给定useOffHeap的值
   */
  @DeveloperApi
  def apply(
      useDisk: Boolean,
      useMemory: Boolean,
      deserialized: Boolean,
      replication: Int = 1): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
  }

  /**
   * :: DeveloperApi ::
   * Create a new StorageLevel object from its integer representation.
   通过flags和replication设置,调用的就是上述那个私有的辅助构造器
   */
  @DeveloperApi
  def apply(flags: Int, replication: Int): StorageLevel = {
    getCachedStorageLevel(new StorageLevel(flags, replication))
  }

  /**
   * :: DeveloperApi ::
   * Read StorageLevel object from ObjectInput stream.
   这边是序列化相关的做法
   */
  @DeveloperApi
  def apply(in: ObjectInput): StorageLevel = {
    val obj = new StorageLevel()
    obj.readExternal(in)
    getCachedStorageLevel(obj)
  }

这是源码,但其实我们发现注释有问题,我给出了中文注释 java api:

package org.apache.spark.api.java;

import org.apache.spark.storage.StorageLevel;

/**
 * Expose some commonly useful storage level constants.
 */
public class StorageLevels {
  public static final StorageLevel NONE = create(false, false, false, false, 1);
  public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
  public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
  public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
  public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
  public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
  public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
  public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
  public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
  public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
  public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
  public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

  /**
   * Create a new StorageLevel object.
   * @param useDisk saved to disk, if true
   * @param useMemory saved to memory, if true
   * @param deserialized saved as deserialized objects, if true
   * @param replication replication factor
   */
  @Deprecated
  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
      int replication) {
    return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
  }

  /**
   * Create a new StorageLevel object.
   * @param useDisk saved to disk, if true
   * @param useMemory saved to memory, if true
   * @param useOffHeap saved to Tachyon, if true
   * @param deserialized saved as deserialized objects, if true
   * @param replication replication factor
   */
  public static StorageLevel create(
    boolean useDisk,
    boolean useMemory,
    boolean useOffHeap,
    boolean deserialized,
    int replication) {
    return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
  }
}

这个也是很容易理解的,最后我们看一下官方的使用建议

使用

storage level meaning
MEMORY_ONLY 将rdd作为java对象存储在jvm中反序列化,如果rdd在内存中装不下,一些分区将不会被cached,而且在需要的时候将进行再次计算,这是默认的存储级别
MEMORY_AND_DISK 将rdd作为java对象在jvm中反序列化,如果rdd在内存中装不下,超出部分将被保存在磁盘,在需要的时候从磁盘读取
MEMORY_ONLY_SER 将rdd作为序列化对象存储(每个分区占一个字节数组),通常比反序列化更高效利用空间,尤其是用fast serializer,但读取的cpu使用率更高
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER类似,但是在内存装不下时使用磁盘存储,而不是在需要的时候重新计算
DISK_ONLY 只将rdd存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和前面的level类似,只不过将分区复制到两个集群节点上,即replication=2
OFF_HEAP (experimental) Tachyon中用序列化格式存储rdd,相比MEMORY_ONLY_SER,OFF_HEAP减少垃圾回收的开销,使得executors更小以及共享内存池。这在高并发环境下有吸引力。此外,由于rdd驻留在Tachyon中,executor的崩溃不会造成数据的丢失,这种模式下,Tachyon中的内存是可废弃的,因此,Tachyon并不会尝试去重建从内存中清除的块。

使用建议: 优先使用默认方案,如果内存不够则使用MEMORY_ONLY_SER来节约空间,不到万不得已不使用disk,除非进行大数据量操作

cjuexuan avatar Feb 18 '16 05:02 cjuexuan