blog-frontend
blog-frontend copied to clipboard
Egg-cluster源码学习(水)
参考
1. 整体了解一下
1. Egg多进程模型
为了将多核CPU的性能发挥到极致,最大程度地榨干服务器资源,egg采用了多进程模型,解决了一个Node.js进程只能运行在一个CPU上的问题,egg-cluster
是用于egg多进程管理的基础模块,负责底层的IPC通道的建立以及处理各进程的通信
-
master: 主进程
-
worker: master的子进程,一般是根据服务器有多少个CPU启动多少个这样的worker进程,主要用于对外服务,处理业务层面上的事情
-
agent: master的子进程,主要处理公共资源的访问,或者帮worker处理一些公共事务
master类似于一个守护进程的存在
-
负责agent的启动,退出,重启
-
负责各个worker的启动,退出,refork,重启
-
负责agent和各个worker之间的通信
-
负责各个worker之间的通信
各进程的启动顺序
-
master启动后先启动agent进程
-
agent初始化成功后,通过IPC通道通知master
-
master根据CPU个数启动相同数目的worker进程
-
worker进程初始化成功后,通过IPC通道通知master
-
所有进程都初始化成功后,master通知agent和各个worker进程应用启动成功
启动差异
master启动agent使用的是child_process
的fork
模式,而启动各个worker使用的是cluster
的fork
模式,这是为什么呢?
cluster意为集成,集成了两个方面,第一个方面就是集成了child_process.fork方法创建node子进程的方式,第二个方面就是集成了根据多核CPU创建子进程后,自动控制负载均衡的方式。
因为agent是类似于作为各个worker秘书的存在,只负责帮他们处理轻量级的服务,是不直接对外提供http访问的,而worker是需要对提供对外http访问的,需要使用cluster模块的fork进行负载均衡的处理
进程间通信
从上图可以观察到
-
master和agent/worker是
real communication
-
agent和worker之间以及各个worker之间是
virtual communication
这里这么称呼的原因是
-
master继承的是
events
模块,events拥有监听,发消息的能力,本身是通过订阅者模式来进行事务处理的,并且master是agent/worker的父进程,相互可以通过IPC通道进行通信 -
agent和各个worker之间毕竟是不同进程,是无法直接进行通信的,需要借助master的力量进行转发,在源码中体现在
messenger
工具类
上面两个词理解为:直接通信/间接通信
2.源码
2.1 入口文件index.js
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
可以发现**egg.startCluster
是egg-cluster
模块暴露的一个API**,startCluster
主要负责启动master进程,在启动成功后执行callbak方法,比如一些非业务的初始化操作,master进程不应该有业务逻辑,代码越精简越好,业务上的初始化操作应该在app.js/agent.js
里面的beforeStart
进行
2.2 主进程Master.js Master类
constructor部分,把日志相关的都省略了
constructor(options) {
super();
// 参数处理
this.options = parseOptions(options);
// 实例化一个信使,一个管理者
this.workerManager = new Manager();
this.messenger = new Messenger(this);
// 借用ready模块的方法
ready.mixin(this);
// 标志位
this.isProduction = isProduction();
this.agentWorkerIndex = 0;
this.closed = false;
this[REALPORT] = this.options.port;
this.isStarted = false;
const startTime = Date.now();
// master启动成功后通知parent,app worker, agent
this.ready(() => {
this.isStarted = true;
const action = 'egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// 开始监听agent/worker的状态
this.workerManager.startCheck();
});
// 监听agent 退出/启动
this.on('agent-exit', this.onAgentExit.bind(this));
this.on('agent-start', this.onAgentStart.bind(this));
// 监听 app worker的退出启动
this.on('app-exit', this.onAppExit.bind(this));
this.on('app-start', this.onAppStart.bind(this));
// 开发环境下监听app worker重启
this.on('reload-worker', this.onReload.bind(this));
// 监听agent启动,只执行一次
this.once('agent-start', this.forkAppWorkers.bind(this));
this.on('realport', port => {
if (port) this[REALPORT] = port;
});
// kill(2) Ctrl-C 监听SIGINT信号
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\ 监听SIGOUT信号
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default 监听SIGTERM信号
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
// 监听exit事件
process.once('exit', this.onExit.bind(this));
// 监听端口冲突
detectPort((err, port) => {
/* istanbul ignore if */
if (err) {
err.name = 'ClusterPortConflictError';
err.message = '[master] try get free port error, ' + err.message;
this.logger.error(err);
process.exit(1);
}
this.options.clusterPort = port;
// 没有端口冲突则执行该方法
this.forkAgentWorker();
});
// agent/worker出事儿了怎么办?
this.workerManager.on('exception', ({ agent, worker }) => {
const err = new Error(`[master] ${agent} agent and ${worker} worker(s) alive, exit to avoid unknown state`);
err.name = 'ClusterWorkerExceptionError';
err.count = { agent, worker };
this.logger.error(err);
process.exit(1);
});
}
1. 对于参数的处理
我寻思他对于参数的处理特别nice,学习一手
constructor(options) {
this.options = parseOptions(options)
}
进入到util/options
文件
module.exports = function(options) {
const defaults = {
// 略
}
options = extend(defaults, options);
// 略
}
结下来就是extend
函数
function extend(target, src) {
const keys = Object.keys(src);
for (const key of keys) {
if (src[key] != null) {
target[key] = src[key];
}
}
return target;
}
龟龟,酷炫,接下来的部分就是对参数进行校验,如果参数不对,就该assert模块显神威了
简略的过程就像这样:)
const assert = require('assert')
let a = (function () {
assert(false, "应该为true")
return 1
})()
console.log(a)
assert.js:42
throw new errors.AssertionError({
^
AssertionError [ERR_ASSERTION]: 应该为true
at /Users/lilongyao/Documents/start/test-cluster/1.js:4:3
at Object.<anonymous> (/Users/lilongyao/Documents/start/test-cluster/1.js:6:3)
at Module._compile (module.js:652:30)
at Object.Module._extensions..js (module.js:663:10)
at Module.load (module.js:565:32)
at tryModuleLoad (module.js:505:12)
at Function.Module._load (module.js:497:3)
at Function.Module.runMain (module.js:693:10)
at startup (bootstrap_node.js:191:16)
at bootstrap_node.js:612:3
2.3 管理你的孩子manager.js
这个文件主要是Master对自己子进程的管理
-
对agent进行设置/删除
-
对worker进行设置/删/查
'use strict';
const EventEmitter = require('events');
// worker manager to record agent and worker forked by egg-cluster
// can do some check stuff here to monitor the healthy
class Manager extends EventEmitter {
constructor() {
super();
this.workers = new Map();
this.agent = null;
}
setAgent(agent) {
this.agent = agent;
}
deleteAgent() {
this.agent = null;
}
setWorker(worker) {
this.workers.set(worker.process.pid, worker);
}
getWorker(pid) {
return this.workers.get(pid);
}
deleteWorker(pid) {
this.workers.delete(pid);
}
listWorkerIds() {
return Array.from(this.workers.keys());
}
getListeningWorkerIds() {
const keys = [];
for (const id of this.workers.keys()) {
if (this.getWorker(id).state === 'listening') {
keys.push(id);
}
}
return keys;
}
count() {
return {
agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
worker: this.listWorkerIds().length,
};
}
// check agent and worker must both alive
// if exception appear 3 times, emit an exception event
startCheck() {
this.exception = 0;
this.timer = setInterval(() => {
const count = this.count();
if (count.agent && count.worker) {
this.exception = 0;
return;
}
this.exception++;
if (this.exception >= 3) {
this.emit('exception', count);
clearInterval(this.timer);
}
}, 10000);
}
}
module.exports = Manager;
瞅瞅,Class
,Map
着实赏心悦目,最后一个startCheck
方法在master.js中的ready
最后执行: 每隔10秒检查agent/worker是否都活着,连续三次检查不通过就报异常~
2.4 信使类messenger.js
提供parent,master,agent,app之间的交流能力,这个注释就已经带秀了
*
* ┌────────┐
* │ parent │
* /└────────┘\
* / | \
* / ┌────────┐ \
* / │ master │ \
* / └────────┘ \
* / / \ \
* ┌───────┐ ┌───────┐
* │ agent │ ------- │ app │
* └───────┘ └───────┘
*
*
概览
class Messenger {
constructor(mater) {
}
send(data) {
}
sendToMaster(data) {
}
sendToParent(data) {
}
sendToAppWorker(data) {
}
sendToAgentWorker(data) {
}
}
上面提到了,所有的沟通都需要进过master进程
如果要发消息给master本身,因为其是events子类,用emit
和on
就完事了`
sendToMaster(data) {
this.master.emit(data.action, data.data)
}
如果要发消息给父进程,通过process.send
方法
sendToParent(data) {
process.send && process.send(data);
}
如果发消息给worker
,遍历/检查一下就完事了
sendToAppWorker(data) {
for (const id in cluster.workers) {
const worker = cluster.workers[id];
if (worker.state === 'disconnected') {
continue;
}
// check receiverPid
if (data.receiverPid && data.receiverPid !== String(worker.process.pid)) {
continue;
}
sendmessage(worker, data);
}
}
2.5 回到主线
接着回到主线~~~, 看看constructor
结束后的方法有些什么
class Master extends EventEmitter {
forkAgentWorker(){}
forkAppWorker(){}
killAgentWorker(){}
killAppWorker(){}
onAgentExit(data){}
onAgentStart(){}
onAppExit(data){}
onAppStart(data){}
onExit(code){}
onSignal(signal){}
onReload(){}
close(){}
}
2.5.1 forkAgentWorker
前面说到的流程
-
master启动后通过
child_process
启动agent进程 -
agent初始化成功后,通过ipc通道通知master
agent.ready(() => {
process.send({ action: 'agent-start', to: 'master' });
})
这里简化一下源码
forkAgentWorker() {
// 来啦.我滴儿
const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
// 来都来了,就让管理者管一下把
this.workerManager.setAgent(agentWorker);
// agent也要做点自己的事情
agentWorker.on('message',msg => {})
agentWorker.on('error', msg => {})
agentWorker.once('exit', msg => {})
}
2.5.2 forkAppWorkers
根据前面的流程
-
master根据cpu个数通过
cluster
启动相同数目的worker进程 -
worker进程初始化成功后,通过IPC通道通知master
-
所有进程都初始化成功之后,master通知agent和各个worker进程应用启动成功
看看简化滴代码
forkAppWorkers(){
// 通过cfork库启动孩儿们
cfork({
exec: appWorkerFile,
args,
silent: false,
// 有几个儿子呢?
count: this.options.workers,
// don't refork in local env
refork: this.isProduction,
});
// 触发了cluster模块fork事件时,交给管理者管理
cluster.on('fork', worker => {
this.workerManager.setWorker(worker);
// 给agent发消息
worker.on('message', msg => {
msg.from = 'app'
this.messenger.send(msg);
})
})
// 记下日志把
cluster.on('disconnect', worker => {})
// 告诉master,我挂了
cluster.on('exit', ...)
// 告诉master, 我启动成功了
cluster.on('listenning', ...)
}
水了,看过程谁不会呢- -先这样8