前言
本次解读主要以阅读代码的逻辑编写,而非直接归总性的编写。虽然可能会在阅读时产生疑惑,但是胜在推敲理解的过程。
**
egg-cluster目录结构
.├── History.md├── LICENSE├── README.md├── index.js├── lib│ ├── agent_worker.js│ ├── app_worker.js│ ├── master.js│ └── utils│ ├── manager.js│ ├── messenger.js│ ├── options.js│ └── terminate.js└── package.json
承接上文Eggjs 源码解读——01.egg-bin
**./node_modules/egg-bin/lib/start-cluster**
#!/usr/bin/env node'use strict';const debug = require('debug')('egg-bin:start-cluster');const options = JSON.parse(process.argv[2]);debug('start cluster options: %j', options);require(options.framework).startCluster(options);
上面代码启动了./node_modules/egg/index.js的startCluster
exports.startCluster = require('egg-cluster').startCluster;
根据线索找到./node_modules/egg/node_modules/egg-cluster/index.js ,文章之后的路径省略./node_modules/egg/node_modules/egg-cluster默认在egg-cluster下
'use strict';const Master = require('./lib/master');exports.startCluster = function(options, callback) {new Master(options).ready(callback);};
从该文件得知,实例化了Master,查看./lib/master.js ,看见大概700多行代码,第一次看的时候确实头大。我的方法是大体的初略先过一遍,再去egg官网看文档,多进程模型和进程间通讯 再回来第二遍细看代码。
理解的过程就会清晰很多,该文件代码有很多知识点,算是学习一遍以前用过的,认识的没用过的,不认识的。
解读过程将在代码块中直接紧跟代码注释说明,非在主流程上的代码暂时省略,以免影响阅读
Master
// Master 继承了 EventEmitter模块,具有了事件的监听和订阅的功能class Master extends EventEmitter {constructor(options) {super();this.options = parseOptions(options);/*** const parseOptions = require('./utils/options');* 主要做了参数的转换和增加,例如验证https并给出返回是否是https,* 例如:egg-scripts start --port=8443 --daemon --https.key=cer.xxx.xxx.key --https.cert=cer.xxx.xxx.pem --workers=1* 将RSA PRIVATE KEY文件和CERTIFICATE文件放在根目录下便可以启用https模式了*/this.workerManager = new Manager();/*** 第一步:启动workerManager* 用来记录workers和agent的信息,workers用的是Map存储,具有增删查等方法*/this.messenger = new Messenger(this);/*** 步骤二:启动Messenger,用来负责进程之间的通信* 封装了send方法:* to: 消息去哪里* from: 消息从哪里来* action: 消息是干什么的* data: 消息的具体内容* ┌────────┐* │ parent │* /└────────┘\* / | \* / ┌────────┐ \* / │ master │ \* / └────────┘ \* / / \ \* ┌───────┐ ┌───────┐* │ agent │ ------- │ app │* └───────┘ └───────┘*/ready.mixin(this);/*** 步骤三:给this注册了ready方法,并且给ready方法添加了回调* 回调的内容是开始监听*/this.ready(() => {// ...省略代码});// ...省略代码,主要做了一些版本输出,变量保存的工作// ...省略代码,监听一些事件,执行对应的方法,例如:监听agent-start之后去启动workersthis.once('agent-start', this.forkAppWorkers.bind(this));// 步骤四:先检测可用端口后通过this.options.clusterPort 存储,并且启动Agentthis.detectPorts().then(() => {// 启动agentthis.forkAgentWorker();});// ...省略代码}}
forkAgentWorker方法
forkAgentWorker() {// ... 省略代码// this.getAgentWorkerFile() == agent_worker.js文件// 通过childprocess fork方法创建Agent子进程const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);agentWorker.status = 'starting';agentWorker.id = ++this.agentWorkerIndex;// 通过上面代码构造函数实例化的workerManager保存agentWorkerthis.workerManager.setAgent(agentWorker);this.log('[master] agent_worker#%s:%s start with clusterPort:%s',agentWorker.id, agentWorker.pid, this.options.clusterPort);// ...省略代码,具体内容当agent出现错误、异常关闭、收到消息等等时候,通过之前建立 的messenger发才能够消息给master}
agent_worker.js
./lib/agent_worker.js
// options.framework == 'egg-example/node_modules/egg' 指向egg// 可以看到通过egg里的Agent实例化了agent,具体功能后面再做解读const Agent = require(options.framework).Agent;debug('new Agent with options %j', options);const agent = new Agent(options);function startErrorHandler(err) {consoleLogger.error(err);consoleLogger.error('[agent_worker] start error, exiting with code:1');process.exitCode = 1;process.kill(process.pid);}agent.ready(err => {// don't send started message to master when start errorif (err) return;agent.removeListener('error', startErrorHandler);// 当agent启动成功后,向master发送消息,agent-startprocess.send({ action: 'agent-start', to: 'master' });});// exit if agent start erroragent.once('error', startErrorHandler);// 通过forkAgentWorker方法,可以看到agent worker 是 master 通过 child_process.fork 出来的。// 但是在 master 意外被强杀,如 kill -9 杀掉, 那么agent worker 缺变成了孤儿进程,所以用了graceful-processgracefulExit({logger: consoleLogger,label: 'agent_worker',beforeExit: () => agent.close(),});
回到./lib/master.js 查看当监听到agent-start后是如何处理的。以下代码可以跳过不看。
// 监听器this.on('agent-start', this.onAgentStart.bind(this));// 单次监听器,监听销毁,下次就不会再执行forkAppWorkersthis.once('agent-start', this.forkAppWorkers.bind(this));// 事件监听器会按照添加的顺序依次调用,所以按照顺序先看onAgentStart,再看forkAppWorkersonAgentStart() {this.agentWorker.status = 'started';// 可以从代码找到,this.isAllAppWorkerStarted还没定义,要等到再看forkAppWorkers才会定义,所以此处启动过程不执行// Send egg-ready when agent is started after launchedif (this.isAllAppWorkerStarted) {this.messenger.send({action: 'egg-ready',to: 'agent',data: this.options,});}// 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行this.messenger.send({action: 'egg-pids',to: 'app',data: [ this.agentWorker.pid ],});// ready之后this.isStarted才等于true,此处启动过程不执行// should send current worker pids when agent restartif (this.isStarted) {this.messenger.send({action: 'egg-pids',to: 'agent',data: this.workerManager.getListeningWorkerIds(),});}// 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行this.messenger.send({action: 'agent-start',to: 'app',});this.logger.info('[master] agent_worker#%s:%s started (%sms)',this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);// 在终端看到的[master] agent_worker#1:17780 started (1086ms)来自于此处。}
上述代码看了一圈发现什么都没执行,我在这里就产生了疑惑,为什么on监听器要写once监听器前面,虽然肯定会过滤掉不会执行的,但是没必要空执行一次,如果once写在on前面,那么代码是可以取到work去执行想执行的任务。只能带着疑惑,继续看forkAppWorkers方法了,要等执行完forkAppWorkers后再重新回看上述代码
forkAppWorkers方法
forkAppWorkers() {// ...省略代码cfork({exec: this.getAppWorkerFile(), // app_worker.js文件args,silent: false,count: this.options.workers, // 子进程个数// don't refork in local envrefork: this.isProduction,windowsHide: process.platform === 'win32',});// ...省略代码}
相关知识和源码cfork
app_worker.js
// ...省略代码// 可以看到通过egg里的Application实例化了appconst Application = require(options.framework).Application;const app = new Application(options);app.ready(startServer);// 启动 http 服务function startServer(err) {// ...省略代码let server;// https configif (httpsOptions.key && httpsOptions.cert) {httpsOptions.key = fs.readFileSync(httpsOptions.key);httpsOptions.cert = fs.readFileSync(httpsOptions.cert);httpsOptions.ca = httpsOptions.ca && fs.readFileSync(httpsOptions.ca);// 如果是https,启动https serverserver = require('https').createServer(httpsOptions, app.callback());} else {// 如果是http,启动http serverserver = require('http').createServer(app.callback());}// ...省略代码// server启动后通过触发事件的方式,作为参数传入app,在Application中会有一个方法监听,具体内容放在下一章节// this.once('server', server => this.onServer(server));app.emit('server', server);if (options.sticky) {server.listen(options.stickyWorkerPort, '127.0.0.1');// Listen to messages sent from the master. Ignore everything else.process.on('message', (message, connection) => {if (message !== 'sticky-session:connection') {return;}// Emulate a connection event on the server by emitting the// event with the connection the master sent us.server.emit('connection', connection);connection.resume();});} else {if (listenConfig.path) {server.listen(listenConfig.path);} else {if (typeof port !== 'number') {consoleLogger.error('[app_worker] port should be number, but got %s(%s)', port, typeof port);exitProcess();return;}const args = [ port ];if (listenConfig.hostname) args.push(listenConfig.hostname);debug('listen options %s', args);server.listen(...args);}}}// ...省略代码
agent和app启动完成
除了以上提到的内容之外,master监听了很多事件来管理agent和app。
this.on('agent-exit', this.onAgentExit.bind(this));// agent退出时this.on('agent-start', this.onAgentStart.bind(this));// agent启动时this.on('app-exit', this.onAppExit.bind(this));// app退出时this.on('app-start', this.onAppStart.bind(this));// app启动时this.on('reload-worker', this.onReload.bind(this));// agent重新启动时// kill(2) Ctrl-Cprocess.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));// kill(3) Ctrl-\process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));// kill(15) defaultprocess.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));process.once('exit', this.onExit.bind(this));
总结
通过ps aux | grep egg,可以抓取到当前运行的进程,输出符合预想。
Green 18288 10.0 0.8 4685840 66084 s001 S+ 2:27下午 0:00.96 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-bin@4.14.1@egg-bin/lib/start-cluster {"typescript":false,"declarations":true,"workers":1,"baseDir":"/Volumes/sec/projects/egg-example","framework":"/Volumes/sec/projects/egg-example/node_modules/egg"}Green 18300 0.0 0.0 4287740 748 s003 S+ 2:28下午 0:00.01 grep eggGreen 18294 0.0 1.1 4651768 90768 s001 S+ 2:27下午 0:01.10 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/app_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}Green 18293 0.0 1.0 4634148 86752 s001 S+ 2:27下午 0:00.99 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/agent_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}Green 18287 0.0 0.3 4603364 28020 s001 S+ 2:27下午 0:00.69 node /Volumes/sec/projects/egg-example/node_modules/.bin/egg-bin dev
官方提供的流程如下图
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
- Master 启动后先 fork Agent 进程
- Agent 初始化成功后,通过 IPC 通道通知 Master
- Master 再 fork 多个 App Worker
- App Worker 初始化成功,通知 Master
- 所有的进程初始化成功后,Master 通知 Agent 和 Worker 应用启动成功
总结来说:egg-cluster 是 Egg 的多进程模型的启动模式,在使用 cluster 模式启动 Egg 应用时,我们只需要配置相关启动参数, egg-cluster 会自动创建相关进程,并管理各个进程之间的通信以及异常处理等问题。主进程 Master 通过 child_process 模块的 fork 函数创建 Agent 子进程,并通过 cluster 模块创建 Worker 子进程。Master/Agent/Worker 三者各司其职,共同保证 Egg 应用的正常运行。按官方的玩笑话来说,Master就是个包工头,不干活,只负责盯着下面的工人活干的怎么样,有没有出问题,而Agent和Worker才是干活的工人。而两个工人具体干的是什么活,请看下一章。
