ApacheSparkBook
ApacheSparkBook copied to clipboard
第二章《Spark 系统部署与应用运行的基本流程》勘误与修改建议
p35第三行和p36页,ParallelCollectonRDD笔误,应为ParallelCollectionRDD p35第四行,MapRartitionsRDD笔误,应为MapPartitionsRDD
P21 图2.1关于Driver,我看了一下源码 Driver应该是线程不是进程,以下为源码信息
ApplicationMaster
--main函数
//处理参数
val amArgs = new ApplicationMasterArguments(args)
--点进来
//解析参数 --class=>userclass=>你要运行的jar包
parseArgs(args.toList)
//创建AM
val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
--点进ApplicationMaster
//发现他创建了
private val client = new YarnRMClient()
--点进YarnRMClient
//发现一个参数AMRMClient它负责am与rm之间的通信
private var amClient: AMRMClient[ContainerRequest] = _
//am.run
override def run(): Unit = System.exit(master.run())
--点进run方法
//如果是集群模式,跑Driver
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
--点进runDriver()方法
//启动用户的应用程序,就是我们的--class后面的jar包
userClassThread = startUserApplication
--点进startUserApplication
//通过类加载器获取我们程序的主函数
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
//发现他创建了一个线程,这个线程就是Driver
val userThread = new Thread {
//run函数中
run(){
//判断mainMethod是否为静态,是静态就调用,不是静态就报错
if (!Modifier.isStatic(mainMethod.getModifiers)) {......}
else{......}
......
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
......
//等待sparkcontext的创建,如果sc没有被创建,那么当前线程会被阻塞
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
//sc不为空时
if (sc != null) {
//创建rpc通信环境
val rpcEnv = sc.env.rpcEnv
......
//注册AM
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
//返回可用资源列表
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
--点进createAllocator
//拿到可用资源
allocator = client.createAllocator(......)
//分配资源
allocator.allocateResources()
handleAllocatedContainers(allocatedContainers.asScala)
--点进handleAllocatedContainers(allocatedContainers.asScala)
//发现这个函数会根据节点的位置,选择自己最需要的资源(根据拓扑距离......)
//运行分配的容器
runAllocatedContainers(containersToUse)
--点进runAllocatedContainers(containersToUse)
//发现这个函数的作用是:在分配的容器中启动executor
//创建executorRunnable
new ExecutorRunnable(......)
//执行ExecutorRunnable.run
--点进run方法
//发现他创建了NMClient,用以与NM联系,并启动NM的容器
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
--点进startContainer()
//发现准备指令
val commands = prepareCommand()
--点击prepareCommand
//发现其commands = /bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
//将准备指令发给ctx
ctx.setCommands(commands.asJava)
//nmClient发送指令给NM
nmClient.startContainer(container.get, ctx)
}
page 38,第2行
2014年Google发表了MapReduce论文
应该是2004年吧。