前言
本次解读主要以阅读代码的逻辑编写,而非直接归总性的编写。虽然可能会在阅读时产生疑惑,但是胜在推敲理解的过程。
**
egg-cluster目录结构
.
├── History.md
├── LICENSE
├── README.md
├── index.js
├── lib
│ ├── agent_worker.js
│ ├── app_worker.js
│ ├── master.js
│ └── utils
│ ├── manager.js
│ ├── messenger.js
│ ├── options.js
│ └── terminate.js
└── package.json
承接上文Eggjs 源码解读——01.egg-bin
**./node_modules/egg-bin/lib/start-cluster**
#!/usr/bin/env node
'use strict';
const debug = require('debug')('egg-bin:start-cluster');
const options = JSON.parse(process.argv[2]);
debug('start cluster options: %j', options);
require(options.framework).startCluster(options);
上面代码启动了./node_modules/egg/index.js
的startCluster
exports.startCluster = require('egg-cluster').startCluster;
根据线索找到./node_modules/egg/node_modules/egg-cluster/index.js
,文章之后的路径省略./node_modules/egg/node_modules/egg-cluster
默认在egg-cluster
下
'use strict';
const Master = require('./lib/master');
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
从该文件得知,实例化了Master,查看./lib/master.js
,看见大概700多行代码,第一次看的时候确实头大。我的方法是大体的初略先过一遍,再去egg官网看文档,多进程模型和进程间通讯 再回来第二遍细看代码。
理解的过程就会清晰很多,该文件代码有很多知识点,算是学习一遍以前用过的,认识的没用过的,不认识的。
解读过程将在代码块中直接紧跟代码注释说明,非在主流程上的代码暂时省略,以免影响阅读
Master
// Master 继承了 EventEmitter模块,具有了事件的监听和订阅的功能
class Master extends EventEmitter {
constructor(options) {
super();
this.options = parseOptions(options);
/**
* const parseOptions = require('./utils/options');
* 主要做了参数的转换和增加,例如验证https并给出返回是否是https,
* 例如:egg-scripts start --port=8443 --daemon --https.key=cer.xxx.xxx.key --https.cert=cer.xxx.xxx.pem --workers=1
* 将RSA PRIVATE KEY文件和CERTIFICATE文件放在根目录下便可以启用https模式了
*/
this.workerManager = new Manager();
/**
* 第一步:启动workerManager
* 用来记录workers和agent的信息,workers用的是Map存储,具有增删查等方法
*/
this.messenger = new Messenger(this);
/**
* 步骤二:启动Messenger,用来负责进程之间的通信
* 封装了send方法:
* to: 消息去哪里
* from: 消息从哪里来
* action: 消息是干什么的
* data: 消息的具体内容
* ┌────────┐
* │ parent │
* /└────────┘\
* / | \
* / ┌────────┐ \
* / │ master │ \
* / └────────┘ \
* / / \ \
* ┌───────┐ ┌───────┐
* │ agent │ ------- │ app │
* └───────┘ └───────┘
*/
ready.mixin(this);
/**
* 步骤三:给this注册了ready方法,并且给ready方法添加了回调
* 回调的内容是开始监听
*/
this.ready(() => {
// ...省略代码
});
// ...省略代码,主要做了一些版本输出,变量保存的工作
// ...省略代码,监听一些事件,执行对应的方法,例如:监听agent-start之后去启动workers
this.once('agent-start', this.forkAppWorkers.bind(this));
// 步骤四:先检测可用端口后通过this.options.clusterPort 存储,并且启动Agent
this.detectPorts()
.then(() => {
// 启动agent
this.forkAgentWorker();
});
// ...省略代码
}
}
forkAgentWorker
方法
forkAgentWorker() {
// ... 省略代码
// this.getAgentWorkerFile() == agent_worker.js文件
// 通过childprocess fork方法创建Agent子进程
const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
agentWorker.status = 'starting';
agentWorker.id = ++this.agentWorkerIndex;
// 通过上面代码构造函数实例化的workerManager保存agentWorker
this.workerManager.setAgent(agentWorker);
this.log('[master] agent_worker#%s:%s start with clusterPort:%s',
agentWorker.id, agentWorker.pid, this.options.clusterPort);
// ...省略代码,具体内容当agent出现错误、异常关闭、收到消息等等时候,通过之前建立 的messenger发才能够消息给master
}
agent_worker.js
./lib/agent_worker.js
// options.framework == 'egg-example/node_modules/egg' 指向egg
// 可以看到通过egg里的Agent实例化了agent,具体功能后面再做解读
const Agent = require(options.framework).Agent;
debug('new Agent with options %j', options);
const agent = new Agent(options);
function startErrorHandler(err) {
consoleLogger.error(err);
consoleLogger.error('[agent_worker] start error, exiting with code:1');
process.exitCode = 1;
process.kill(process.pid);
}
agent.ready(err => {
// don't send started message to master when start error
if (err) return;
agent.removeListener('error', startErrorHandler);
// 当agent启动成功后,向master发送消息,agent-start
process.send({ action: 'agent-start', to: 'master' });
});
// exit if agent start error
agent.once('error', startErrorHandler);
// 通过forkAgentWorker方法,可以看到agent worker 是 master 通过 child_process.fork 出来的。
// 但是在 master 意外被强杀,如 kill -9 杀掉, 那么agent worker 缺变成了孤儿进程,所以用了graceful-process
gracefulExit({
logger: consoleLogger,
label: 'agent_worker',
beforeExit: () => agent.close(),
});
回到./lib/master.js
查看当监听到agent-start
后是如何处理的。以下代码可以跳过不看。
// 监听器
this.on('agent-start', this.onAgentStart.bind(this));
// 单次监听器,监听销毁,下次就不会再执行forkAppWorkers
this.once('agent-start', this.forkAppWorkers.bind(this));
// 事件监听器会按照添加的顺序依次调用,所以按照顺序先看onAgentStart,再看forkAppWorkers
onAgentStart() {
this.agentWorker.status = 'started';
// 可以从代码找到,this.isAllAppWorkerStarted还没定义,要等到再看forkAppWorkers才会定义,所以此处启动过程不执行
// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({
action: 'egg-ready',
to: 'agent',
data: this.options,
});
}
// 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行
this.messenger.send({
action: 'egg-pids',
to: 'app',
data: [ this.agentWorker.pid ],
});
// ready之后this.isStarted才等于true,此处启动过程不执行
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({
action: 'egg-pids',
to: 'agent',
data: this.workerManager.getListeningWorkerIds(),
});
}
// 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行
this.messenger.send({
action: 'agent-start',
to: 'app',
});
this.logger.info('[master] agent_worker#%s:%s started (%sms)',
this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
// 在终端看到的[master] agent_worker#1:17780 started (1086ms)来自于此处。
}
上述代码看了一圈发现什么都没执行,我在这里就产生了疑惑,为什么on监听器要写once监听器前面,虽然肯定会过滤掉不会执行的,但是没必要空执行一次,如果once写在on前面,那么代码是可以取到work去执行想执行的任务。只能带着疑惑,继续看forkAppWorkers
方法了,要等执行完forkAppWorkers
后再重新回看上述代码
forkAppWorkers
方法
forkAppWorkers() {
// ...省略代码
cfork({
exec: this.getAppWorkerFile(), // app_worker.js文件
args,
silent: false,
count: this.options.workers, // 子进程个数
// don't refork in local env
refork: this.isProduction,
windowsHide: process.platform === 'win32',
});
// ...省略代码
}
相关知识和源码cfork
app_worker.js
// ...省略代码
// 可以看到通过egg里的Application实例化了app
const Application = require(options.framework).Application;
const app = new Application(options);
app.ready(startServer);
// 启动 http 服务
function startServer(err) {
// ...省略代码
let server;
// https config
if (httpsOptions.key && httpsOptions.cert) {
httpsOptions.key = fs.readFileSync(httpsOptions.key);
httpsOptions.cert = fs.readFileSync(httpsOptions.cert);
httpsOptions.ca = httpsOptions.ca && fs.readFileSync(httpsOptions.ca);
// 如果是https,启动https server
server = require('https').createServer(httpsOptions, app.callback());
} else {
// 如果是http,启动http server
server = require('http').createServer(app.callback());
}
// ...省略代码
// server启动后通过触发事件的方式,作为参数传入app,在Application中会有一个方法监听,具体内容放在下一章节
// this.once('server', server => this.onServer(server));
app.emit('server', server);
if (options.sticky) {
server.listen(options.stickyWorkerPort, '127.0.0.1');
// Listen to messages sent from the master. Ignore everything else.
process.on('message', (message, connection) => {
if (message !== 'sticky-session:connection') {
return;
}
// Emulate a connection event on the server by emitting the
// event with the connection the master sent us.
server.emit('connection', connection);
connection.resume();
});
} else {
if (listenConfig.path) {
server.listen(listenConfig.path);
} else {
if (typeof port !== 'number') {
consoleLogger.error('[app_worker] port should be number, but got %s(%s)', port, typeof port);
exitProcess();
return;
}
const args = [ port ];
if (listenConfig.hostname) args.push(listenConfig.hostname);
debug('listen options %s', args);
server.listen(...args);
}
}
}
// ...省略代码
agent和app启动完成
除了以上提到的内容之外,master监听了很多事件来管理agent和app。
this.on('agent-exit', this.onAgentExit.bind(this));// agent退出时
this.on('agent-start', this.onAgentStart.bind(this));// agent启动时
this.on('app-exit', this.onAppExit.bind(this));// app退出时
this.on('app-start', this.onAppStart.bind(this));// app启动时
this.on('reload-worker', this.onReload.bind(this));// agent重新启动时
// kill(2) Ctrl-C
process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
// kill(3) Ctrl-\
process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
// kill(15) default
process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
process.once('exit', this.onExit.bind(this));
总结
通过ps aux | grep egg,可以抓取到当前运行的进程,输出符合预想。
Green 18288 10.0 0.8 4685840 66084 s001 S+ 2:27下午 0:00.96 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-bin@4.14.1@egg-bin/lib/start-cluster {"typescript":false,"declarations":true,"workers":1,"baseDir":"/Volumes/sec/projects/egg-example","framework":"/Volumes/sec/projects/egg-example/node_modules/egg"}
Green 18300 0.0 0.0 4287740 748 s003 S+ 2:28下午 0:00.01 grep egg
Green 18294 0.0 1.1 4651768 90768 s001 S+ 2:27下午 0:01.10 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/app_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}
Green 18293 0.0 1.0 4634148 86752 s001 S+ 2:27下午 0:00.99 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/agent_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}
Green 18287 0.0 0.3 4603364 28020 s001 S+ 2:27下午 0:00.69 node /Volumes/sec/projects/egg-example/node_modules/.bin/egg-bin dev
官方提供的流程如下图
+---------+ +---------+ +---------+
| Master | | Agent | | Worker |
+---------+ +----+----+ +----+----+
| fork agent | |
+-------------------->| |
| agent ready | |
|<--------------------+ |
| | fork worker |
+----------------------------------------->|
| worker ready | |
|<-----------------------------------------+
| Egg ready | |
+-------------------->| |
| Egg ready | |
+----------------------------------------->|
- Master 启动后先 fork Agent 进程
- Agent 初始化成功后,通过 IPC 通道通知 Master
- Master 再 fork 多个 App Worker
- App Worker 初始化成功,通知 Master
- 所有的进程初始化成功后,Master 通知 Agent 和 Worker 应用启动成功
总结来说:egg-cluster 是 Egg 的多进程模型的启动模式,在使用 cluster 模式启动 Egg 应用时,我们只需要配置相关启动参数, egg-cluster 会自动创建相关进程,并管理各个进程之间的通信以及异常处理等问题。主进程 Master 通过 child_process 模块的 fork 函数创建 Agent 子进程,并通过 cluster 模块创建 Worker 子进程。Master/Agent/Worker 三者各司其职,共同保证 Egg 应用的正常运行。按官方的玩笑话来说,Master就是个包工头,不干活,只负责盯着下面的工人活干的怎么样,有没有出问题,而Agent和Worker才是干活的工人。而两个工人具体干的是什么活,请看下一章。