teeyog
teeyog
## 前言 在文章[TaskScheduler 任务提交与调度源码解析](http://www.jianshu.com/p/d3b620581dc2) 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],即对某个task需要在某个executor上执行的描述,仅仅是逻辑上的,还并未真正到executor上执行,本文将从源码角度解析Task是怎么被分配到executor上执行的。 ## Driver端发送LaunchTask事件 通过resourceOffers逻辑分配完task后,CoarseGrainedSchedulerBackend以Seq[Seq[TaskDescription]]参数调用了launchTasks方法: ``` private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task = maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task...
## 概述 推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例。spark推测式执行默认是关闭的,可通过spark.speculation属性来开启。 ## 检测是否有需要推测式执行的Task 在SparkContext创建了schedulerBackend和taskScheduler后,立即调用了taskScheduler 的start方法: ``` override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit...
## 前言 Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动…… ## 本地化级别(Locality Levels) - PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中,性能最好 - NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块在节点上,而task在节点上某个executor中运行;或者是数据和task在一个节点上的不同executor中,数据需要在进程间进行传输 - NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分,比如说SparkSQL读取MySql中的数据 - RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上,数据需要通过网络在节点之间进行传输 - ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差 这些Task的本地化级别其实描述的就是计算与数据的位置关系,这个最终的关系是如何产生的呢?接下来对其来龙去脉进行详细的讲解。 ## DAGScheduler提交tasks DAGScheduler对job进行stage划分完后,会通过submitMissingTasks方法将Stage以TaskSet的形式提交给TaskScheduler,看看该方法关于位置优先的一些代码: ``` ... // 获取还未执行或未成功执行分区的id val partitionsToCompute:...
## 前言 spark应用程序的调度体现在两个地方,第一个是Yarn对spark应用间的调度,第二个是spark应用内(同一个SparkContext)的多个TaskSetManager的调度,这里暂时只对应用内部调度进行分析。 spark的调度模式分为两种:FIFO(先进先出)和FAIR(公平调度)。默认是FIFO,即谁先提交谁先执行,而FAIR支持在调度池中再进行分组,可以有不同的权重,根据权重、资源等来决定谁先执行。spark的调度模式可以通过spark.scheduler.mode进行设置。 ## 调度池初始化 在DAGScheluer对job划分好stage并以TaskSet的形式提交给TaskScheduler后,TaskScheduler的实现类会为每个TaskSet创建一个TaskSetMagager对象,并将该对象添加到调度池中: ``` schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ``` schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。 ``` def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty rootPool =...
在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagager对taskSet的task进行调度与执行。 ``` taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) ``` submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。先看整个实现: ``` override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with...
DAGScheduler在划分完Stage后([[spark] DAGScheduler划分stage源码解析](http://www.jianshu.com/p/a1ec03185b82) ),将会通过submitStage(finalStage)来提交stage: ``` private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //获取未计算完的parentStage,判断是否计算完的条件是 //_numAvailableOutputs...
## 概述 Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG,并根据ShuffleDependency来进行stage的划分,stage包含多个tasks,个数由该stage的finalRDD决定,stage里面的task完全相同,DAGScheduler 完成stage的划分后基于每个Stage生成TaskSet,并提交给TaskScheduler,TaskScheduler负责具体的task的调度,在Worker节点上启动task。  ## Job的提交 以count为例,直接看源码都有哪些步骤: ``` def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum DAGScheduler#runJob DAGScheduler#runJob DAGScheduler#runJob DAGScheduler#dagScheduler.runJob DAGScheduler#submitJob eventProcessLoop.post(JobSubmitted(**)) ``` eventProcessLoop是一个DAGSchedulerEventProcessLoop(this)对象,可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler可以共用同一消息处理代码,逻辑清晰,处理方式统一。 eventProcessLoop接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中: ``` private def...
## 前言 在spark应用程序中,常常会遇到运算量很大经过很复杂的 Transformation才能得到的RDD即Lineage链较长、宽依赖的RDD,此时我们可以考虑将这个RDD持久化。 cache也是可以持久化到磁盘,只不过是直接将partition的输出数据写到磁盘,而checkpoint是在逻辑job完成后,若有需要checkpoint的RDD,再单独启动一个job去完成checkpoint,这样该RDD就被计算了两次,所以建议在有checkpoint的时候先将该RDD cache到内存,到时候直接写到磁盘就行了。 ## checkpoint的实现 需要使用checkpoint都需要通过sparkcontext的setCheckpointDir方法设置一个目录以存checkpoint的各种信息数据,下面我们来看看该方法: ``` def setCheckpointDir(directory: String) { if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Spark is not running in local mode, therefore the checkpoint directory...
> spark的缓存机制保证了需要访问重复数据的应用(如迭代型算法和交互式应用)可以运行的更快。 完整的存储级别介绍如下所示: | Storage Level| Meaning| | ------------- |:-------------:| | MEMORY_ONLY | 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是系统默认的存储级别。 | | MEMORY_AND_DISK | 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取。| | MEMORY_ONLY_SER | 将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。 | |MEMORY_AND_DISK_SER | 和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。 | | DISK_ONLY|...
> RDD(Resilient Distributed Dataset):弹性分布式数据集。 ## 特性 - A list of partitions (可分片) - A function for computing each split (compute func) - A list of dependencies on other RDDs (依赖)...