teeyog

Results 30 issues of teeyog

#### 预备知识 先介绍在Spark SQL中两个非常重要的数据结构:Tree和Rule。 SparkSql的第一件事就是把SQLText解析成语法树,这棵树包含了很多节点对象,节点可以有特定的数据类型,同时可以有0个或者多个子节点,节点在SparkSQL中的表现形式为TreeNode对象。举个实际的例子: - Literal(value: Int): 一个常量 - Attribute(name: String): 变量name - Add(left: TreeNode, right: TreeNode): 两个表达式的和 x + (1 + 2) 在代码中的表现形式为:Add(Attribute(x), Add(Literal(1), Literal(2))) ![](https://upload-images.jianshu.io/upload_images/3597066-b0757ab170f9e845.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 而Rule则是应用在Tree上的规则,通过模式匹配,匹配成功的就进行相应的规则变换,若不成功则继续匹配子节点,如在Optimizer模块中有个常量累加的优化规则,通过该规则,可以将两个常量节点直接转化为值相加后的一个常量节点,如下图: ![](https://upload-images.jianshu.io/upload_images/3597066-c9559cd6b8f63f6b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)...

SparkSql

## 前言 SparkStreaming 7*24 小时不间断的运行,有时需要管理一些状态,比如wordCount,每个batch的数据不是独立的而是需要累加的,这时就需要sparkStreaming来维护一些状态,目前有两种方案updateStateByKey&mapWithState,mapWithState是spark1.6新加入的保存状态的方案,官方声称有10倍性能提升。 ## updateStateByKey 先上一个示例: ``` def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = { val currValueSum = currValues.sum //上面的Int类型都可以用对象类型替换 Some(currValueSum + preValue.getOrElse(0)) //当前值的和加上历史值 } kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _) ``` 这里的updateFunction方法就是需要我们自己去实现的状态跟新的逻辑,```currValues```就是当前批次的所有值,```preValue```是历史维护的状态,```updateStateByKey```返回的是包含历史所有状态信息的DStream,下面我们来看底层是怎么实现状态的管理的,通过跟踪源码看到最核心的实现方法:...

spark streaming

## 前言 Spark Streaming Job的生成是通过```JobGenerator```每隔 batchDuration 长时间动态生成的,每个batch 对应提交一个JobSet,因为针对一个batch可能有多个输出操作。 概述流程: - 定时器定时向 eventLoop 发送生成job的请求 - 通过receiverTracker 为当前batch分配block - 为当前batch生成对应的 Jobs - 将Jobs封装成JobSet 提交执行 ## 入口 在 JobGenerator 初始化的时候就创建了一个定时器: ``` private val...

spark streaming

> 看 spark streaming 源码解析之前最好先了解spark core的内容。 ## 前言 Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。 在Spark Streaming里,总体负责任务的动态调度是```JobScheduler```,而```JobScheduler```有两个很重要的成员:```JobGenerator``` 和 ```ReceiverTracker```。```JobGenerator``` 负责将每个 batch 生成具体的 RDD DAG ,而```ReceiverTracker```负责数据的来源。 Spark Streaming里的```DStream```可以看成是Spark Core里的RDD的模板,```DStreamGraph```是RDD DAG的模板。 ## 跟着例子看流程 DStream 也和...

spark streaming

> 看 spark streaming 源码解析之前最好先了解spark core的内容。 ## 前言 Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。 在Spark Streaming里,总体负责任务的动态调度是```JobScheduler```,而```JobScheduler```有两个很重要的成员:```JobGenerator``` 和 ```ReceiverTracker```。```JobGenerator``` 负责将每个 batch 生成具体的 RDD DAG ,而```ReceiverTracker```负责数据的来源。 Spark Streaming里的```DStream```可以看成是Spark Core里的RDD的模板,```DStreamGraph```是RDD DAG的模板。 ## 跟着例子看流程 DStream 也和...

spark streaming

## 概述 spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。 旧方案是静态的,storageMemory(存储内存)和executionMemory(执行内存)拥有的内存是独享的不可相互借用,故在其中一方内存充足,另一方内存不足但又不能借用的情况下会造成资源的浪费。新方案是统一管理的,初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。 总的来说内存分为三大块,包括storageMemory、executionMemory、系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。 ## 旧方案 StaticMemoryManager 在SparkEnv中会创建memoryManager: ``` val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager =...

spark

## 概述 BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager对外提供get和set数据接口,可将数据存储在memory, disk, off-heap。 ## blockManager的创建与注册 blockManagerMaster和blockManager都是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,Executor端的SparkEnv是在其守护进程CoarseGrainedExecutorBackend创建的时候创建的,下面看blockManager是怎么在sparkEnv中创建的: ``` // get&put 远程block的时候就是通过blockTransferService 完成的 val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(...

spark

> 本文基于 Spark 2.1 进行解析 ## 前言 从 Spark 2.0 开始移除了Hash Based Shuffle,想要了解可参考[Shuffle 过程](https://github.com/JerryLead/SparkInternals/blob/master/markdown/4-shuffleDetails.md),本文将讲解 Sort Based Shuffle。 ShuffleMapTask的结果(ShuffleMapStage中FinalRDD的数据)都将写入磁盘,以供后续Stage拉取,即整个Shuffle包括前Stage的Shuffle Write和后Stage的Shuffle Read,由于内容较多,本文先解析Shuffle Write。 概述: - 写records到内存缓冲区(一个数组维护的map),每次insert&update都需要检查是否达到溢写条件。 - 若需要溢写,将集合中的数据根据partitionId和key(若需要)排序后顺序溢写到一个临时的磁盘文件,并释放内存新建一个map放数据,每次溢写都是写一个新的临时文件。 - 一个task最终对应一个文件,将还在内存中的数据和已经spill的文件根据reduce端的partitionId进行合并,合并后需要再次聚合排序(有需要情况下),再根据partition的顺序写入最终文件,并返回每个partition在文件中的偏移量,最后以MapStatus对象返回给driver并注册到MapOutputTrackerMaster中,后续reduce好通过它来访问。 ## 入口...

spark

> 本文基于spark2.1进行解析 ## 前言 Spark作为分布式的计算框架可支持多种运行模式: - 本地运行模式 (单机) - 本地伪集群运行模式(单机模拟集群) - Standalone Client模式(集群) - Standalone Cluster模式(集群) - YARN Client模式(集群) - YARN Cluster模式(集群) 而Standalone 作为spark自带cluster manager,需要启动Master和Worker守护进程,本文将从源码角度解析两者的启动流程。Master和Worker之间的通信使用的是基于netty的RPC,Spark的Rpc推荐看[深入解析Spark中的RPC](https://zhuanlan.zhihu.com/p/28893155)。 ## Master 启动 启动Master是通过脚本start-master.sh启动的,里面实际调用的类是: ```...

spark

## 前言 在文章[Task执行流程](http://www.jianshu.com/p/959954008583) 中介绍了task是怎么被分配到executor上执行的,本文讲解task成功执行时将结果返回给driver的处理流程。 ## Driver端接收task完成事件 在executor上成功执行完task并拿到serializedResult 之后,通过CoarseGrainedExecutorBackend的statusUpdate方法来返回结果给driver,该方法会使用driverRpcEndpointRef 发送一条包含 serializedResult 的 StatusUpdate 消息给 driver。 ``` execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg =...

spark