blog-frontend icon indicating copy to clipboard operation
blog-frontend copied to clipboard

Egg-cluster源码学习(水)

Open Caaalabash opened this issue 4 years ago • 0 comments

参考

Egg 源码解析之 egg-cluster

1. 整体了解一下

1. Egg多进程模型

为了将多核CPU的性能发挥到极致,最大程度地榨干服务器资源,egg采用了多进程模型,解决了一个Node.js进程只能运行在一个CPU上的问题,egg-cluster是用于egg多进程管理的基础模块,负责底层的IPC通道的建立以及处理各进程的通信

  • master: 主进程

  • worker: master的子进程,一般是根据服务器有多少个CPU启动多少个这样的worker进程,主要用于对外服务,处理业务层面上的事情

  • agent: master的子进程,主要处理公共资源的访问,或者帮worker处理一些公共事务

master类似于一个守护进程的存在

  • 负责agent的启动,退出,重启

  • 负责各个worker的启动,退出,refork,重启

  • 负责agent和各个worker之间的通信

  • 负责各个worker之间的通信

各进程的启动顺序

  • master启动后先启动agent进程

  • agent初始化成功后,通过IPC通道通知master

  • master根据CPU个数启动相同数目的worker进程

  • worker进程初始化成功后,通过IPC通道通知master

  • 所有进程都初始化成功后,master通知agent和各个worker进程应用启动成功

启动差异

master启动agent使用的是child_processfork模式,而启动各个worker使用的是clusterfork模式,这是为什么呢?

cluster意为集成,集成了两个方面,第一个方面就是集成了child_process.fork方法创建node子进程的方式,第二个方面就是集成了根据多核CPU创建子进程后,自动控制负载均衡的方式。

因为agent是类似于作为各个worker秘书的存在,只负责帮他们处理轻量级的服务,是不直接对外提供http访问的,而worker是需要对提供对外http访问的,需要使用cluster模块的fork进行负载均衡的处理

进程间通信

从上图可以观察到

  • master和agent/worker是real communication

  • agent和worker之间以及各个worker之间是virtual communication

这里这么称呼的原因是

  • master继承的是events模块,events拥有监听,发消息的能力,本身是通过订阅者模式来进行事务处理的,并且master是agent/worker的父进程,相互可以通过IPC通道进行通信

  • agent和各个worker之间毕竟是不同进程,是无法直接进行通信的,需要借助master的力量进行转发,在源码中体现在messenger工具类

上面两个词理解为:直接通信/间接通信

2.源码

2.1 入口文件index.js

exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};

可以发现**egg.startClusteregg-cluster模块暴露的一个API**,startCluster主要负责启动master进程,在启动成功后执行callbak方法,比如一些非业务的初始化操作,master进程不应该有业务逻辑,代码越精简越好,业务上的初始化操作应该在app.js/agent.js里面的beforeStart进行

2.2 主进程Master.js Master类

constructor部分,把日志相关的都省略了

  constructor(options) {
    super();
    // 参数处理
    this.options = parseOptions(options);
    // 实例化一个信使,一个管理者
    this.workerManager = new Manager();
    this.messenger = new Messenger(this);
    // 借用ready模块的方法
    ready.mixin(this);
    // 标志位
    this.isProduction = isProduction();
    this.agentWorkerIndex = 0;
    this.closed = false;
    this[REALPORT] = this.options.port;
    this.isStarted = false;
    const startTime = Date.now();

    // master启动成功后通知parent,app worker, agent
    this.ready(() => {
      this.isStarted = true;
      const action = 'egg-ready';
      this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
      this.messenger.send({ action, to: 'app', data: this.options });
      this.messenger.send({ action, to: 'agent', data: this.options });

      // 开始监听agent/worker的状态
      this.workerManager.startCheck();
    });
    // 监听agent 退出/启动
    this.on('agent-exit', this.onAgentExit.bind(this));
    this.on('agent-start', this.onAgentStart.bind(this));
    // 监听 app worker的退出启动
    this.on('app-exit', this.onAppExit.bind(this));
    this.on('app-start', this.onAppStart.bind(this));
    // 开发环境下监听app worker重启
    this.on('reload-worker', this.onReload.bind(this));

    // 监听agent启动,只执行一次
    this.once('agent-start', this.forkAppWorkers.bind(this));
    this.on('realport', port => {
      if (port) this[REALPORT] = port;
    });

    // kill(2) Ctrl-C  监听SIGINT信号
    process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
    // kill(3) Ctrl-\  监听SIGOUT信号
    process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
    // kill(15) default  监听SIGTERM信号
    process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
    // 监听exit事件
    process.once('exit', this.onExit.bind(this));
    // 监听端口冲突
    detectPort((err, port) => {
      /* istanbul ignore if */
      if (err) {
        err.name = 'ClusterPortConflictError';
        err.message = '[master] try get free port error, ' + err.message;
        this.logger.error(err);
        process.exit(1);
      }
      this.options.clusterPort = port;
      // 没有端口冲突则执行该方法
      this.forkAgentWorker();
    });

    // agent/worker出事儿了怎么办?
    this.workerManager.on('exception', ({ agent, worker }) => {
      const err = new Error(`[master] ${agent} agent and ${worker} worker(s) alive, exit to avoid unknown state`);
      err.name = 'ClusterWorkerExceptionError';
      err.count = { agent, worker };
      this.logger.error(err);
      process.exit(1);
    });
  }

1. 对于参数的处理

我寻思他对于参数的处理特别nice,学习一手

constructor(options) {
  this.options = parseOptions(options)
}

进入到util/options文件

module.exports = function(options) {
  const defaults = {
    // 略
  }
 options = extend(defaults, options);
  // 略
}

结下来就是extend函数

function extend(target, src) {
  const keys = Object.keys(src);
  for (const key of keys) {
    if (src[key] != null) {
      target[key] = src[key];
    }
  }
  return target;
}

龟龟,酷炫,接下来的部分就是对参数进行校验,如果参数不对,就该assert模块显神威了

简略的过程就像这样:)

const assert = require('assert')

let a  = (function () {
  assert(false, "应该为true")
  return 1
})()

console.log(a)
assert.js:42
  throw new errors.AssertionError({
  ^

AssertionError [ERR_ASSERTION]: 应该为true
    at /Users/lilongyao/Documents/start/test-cluster/1.js:4:3
    at Object.<anonymous> (/Users/lilongyao/Documents/start/test-cluster/1.js:6:3)
    at Module._compile (module.js:652:30)
    at Object.Module._extensions..js (module.js:663:10)
    at Module.load (module.js:565:32)
    at tryModuleLoad (module.js:505:12)
    at Function.Module._load (module.js:497:3)
    at Function.Module.runMain (module.js:693:10)
    at startup (bootstrap_node.js:191:16)
    at bootstrap_node.js:612:3

2.3 管理你的孩子manager.js

这个文件主要是Master对自己子进程的管理

  • 对agent进行设置/删除

  • 对worker进行设置/删/查

'use strict';

const EventEmitter = require('events');

// worker manager to record agent and worker forked by egg-cluster
// can do some check stuff here to monitor the healthy
class Manager extends EventEmitter {
  constructor() {
    super();
    this.workers = new Map();
    this.agent = null;
  }

  setAgent(agent) {
    this.agent = agent;
  }

  deleteAgent() {
    this.agent = null;
  }

  setWorker(worker) {
    this.workers.set(worker.process.pid, worker);
  }

  getWorker(pid) {
    return this.workers.get(pid);
  }

  deleteWorker(pid) {
    this.workers.delete(pid);
  }

  listWorkerIds() {
    return Array.from(this.workers.keys());
  }

  getListeningWorkerIds() {
    const keys = [];
    for (const id of this.workers.keys()) {
      if (this.getWorker(id).state === 'listening') {
        keys.push(id);
      }
    }
    return keys;
  }

  count() {
    return {
      agent: (this.agent && this.agent.status === 'started') ? 1 : 0,
      worker: this.listWorkerIds().length,
    };
  }

  // check agent and worker must both alive
  // if exception appear 3 times, emit an exception event
  startCheck() {
    this.exception = 0;
    this.timer = setInterval(() => {
      const count = this.count();
      if (count.agent && count.worker) {
        this.exception = 0;
        return;
      }
      this.exception++;
      if (this.exception >= 3) {
        this.emit('exception', count);
        clearInterval(this.timer);
      }
    }, 10000);
  }
}

module.exports = Manager;

瞅瞅,Class,Map着实赏心悦目,最后一个startCheck方法在master.js中的ready最后执行: 每隔10秒检查agent/worker是否都活着,连续三次检查不通过就报异常~

2.4 信使类messenger.js

提供parent,master,agent,app之间的交流能力,这个注释就已经带秀了

 *
 *             ┌────────┐
 *             │ parent │
 *            /└────────┘\
 *           /     |      \
 *          /  ┌────────┐  \
 *         /   │ master │   \
 *        /    └────────┘    \
 *       /     /         \    \
 *     ┌───────┐         ┌───────┐
 *     │ agent │ ------- │  app  │
 *     └───────┘         └───────┘
 *
 *

概览

class Messenger {
  constructor(mater) {

  }
  send(data) {

  }
  sendToMaster(data) {

  }
  sendToParent(data) {

  }
  sendToAppWorker(data) {

  }
  sendToAgentWorker(data) {

  }
}

上面提到了,所有的沟通都需要进过master进程

如果要发消息给master本身,因为其是events子类,用emiton就完事了`

sendToMaster(data) {
  this.master.emit(data.action, data.data)
}

如果要发消息给父进程,通过process.send方法

sendToParent(data) {
  process.send && process.send(data);
}

如果发消息给worker,遍历/检查一下就完事了

  sendToAppWorker(data) {
    for (const id in cluster.workers) {
      const worker = cluster.workers[id];
      if (worker.state === 'disconnected') {
        continue;
      }
      // check receiverPid
      if (data.receiverPid && data.receiverPid !== String(worker.process.pid)) {
        continue;
      }
      sendmessage(worker, data);
    }
  }

2.5 回到主线

接着回到主线~~~, 看看constructor结束后的方法有些什么

class Master extends EventEmitter {
  
  forkAgentWorker(){}
  forkAppWorker(){}
  killAgentWorker(){}
  killAppWorker(){}
  onAgentExit(data){}
  onAgentStart(){}
  onAppExit(data){}
  onAppStart(data){}
  onExit(code){}
  onSignal(signal){}
  onReload(){}
  close(){}
}

2.5.1 forkAgentWorker

前面说到的流程

  • master启动后通过child_process启动agent进程

  • agent初始化成功后,通过ipc通道通知master

agent.ready(() => {
  process.send({ action: 'agent-start', to: 'master' });
})

这里简化一下源码

forkAgentWorker() {
  // 来啦.我滴儿
  const agentWorker = childprocess.fork(agentWorkerFile, args, opt);
  // 来都来了,就让管理者管一下把
  this.workerManager.setAgent(agentWorker);
  // agent也要做点自己的事情
  agentWorker.on('message',msg => {})
  agentWorker.on('error', msg => {})
  agentWorker.once('exit', msg => {})
}

2.5.2 forkAppWorkers

根据前面的流程

  • master根据cpu个数通过cluster启动相同数目的worker进程

  • worker进程初始化成功后,通过IPC通道通知master

  • 所有进程都初始化成功之后,master通知agent和各个worker进程应用启动成功

看看简化滴代码

forkAppWorkers(){
  // 通过cfork库启动孩儿们
  cfork({
    exec: appWorkerFile,
    args,
    silent: false,
    // 有几个儿子呢? 
    count: this.options.workers,
    // don't refork in local env
    refork: this.isProduction,
  });
  // 触发了cluster模块fork事件时,交给管理者管理
  cluster.on('fork', worker => {
    this.workerManager.setWorker(worker);
    // 给agent发消息
    worker.on('message', msg => {
      msg.from = 'app'
      this.messenger.send(msg);
    })
  })
  // 记下日志把
  cluster.on('disconnect', worker => {})
  // 告诉master,我挂了
  cluster.on('exit', ...)
  // 告诉master, 我启动成功了
  cluster.on('listenning', ...)
}

水了,看过程谁不会呢- -先这样8

Caaalabash avatar May 02 '20 08:05 Caaalabash