前言

本次解读主要以阅读代码的逻辑编写,而非直接归总性的编写。虽然可能会在阅读时产生疑惑,但是胜在推敲理解的过程。
**

egg-cluster目录结构

  1. .
  2. ├── History.md
  3. ├── LICENSE
  4. ├── README.md
  5. ├── index.js
  6. ├── lib
  7. ├── agent_worker.js
  8. ├── app_worker.js
  9. ├── master.js
  10. └── utils
  11. ├── manager.js
  12. ├── messenger.js
  13. ├── options.js
  14. └── terminate.js
  15. └── package.json

**

承接上文Eggjs 源码解读——01.egg-bin

**./node_modules/egg-bin/lib/start-cluster**

  1. #!/usr/bin/env node
  2. 'use strict';
  3. const debug = require('debug')('egg-bin:start-cluster');
  4. const options = JSON.parse(process.argv[2]);
  5. debug('start cluster options: %j', options);
  6. require(options.framework).startCluster(options);

上面代码启动了./node_modules/egg/index.jsstartCluster

  1. exports.startCluster = require('egg-cluster').startCluster;

根据线索找到./node_modules/egg/node_modules/egg-cluster/index.js ,文章之后的路径省略./node_modules/egg/node_modules/egg-cluster默认在egg-cluster

  1. 'use strict';
  2. const Master = require('./lib/master');
  3. exports.startCluster = function(options, callback) {
  4. new Master(options).ready(callback);
  5. };

从该文件得知,实例化了Master,查看./lib/master.js ,看见大概700多行代码,第一次看的时候确实头大。我的方法是大体的初略先过一遍,再去egg官网看文档,多进程模型和进程间通讯 再回来第二遍细看代码。
理解的过程就会清晰很多,该文件代码有很多知识点,算是学习一遍以前用过的,认识的没用过的,不认识的。

解读过程将在代码块中直接紧跟代码注释说明,非在主流程上的代码暂时省略,以免影响阅读

Master

  1. // Master 继承了 EventEmitter模块,具有了事件的监听和订阅的功能
  2. class Master extends EventEmitter {
  3. constructor(options) {
  4. super();
  5. this.options = parseOptions(options);
  6. /**
  7. * const parseOptions = require('./utils/options');
  8. * 主要做了参数的转换和增加,例如验证https并给出返回是否是https,
  9. * 例如:egg-scripts start --port=8443 --daemon --https.key=cer.xxx.xxx.key --https.cert=cer.xxx.xxx.pem --workers=1
  10. * 将RSA PRIVATE KEY文件和CERTIFICATE文件放在根目录下便可以启用https模式了
  11. */
  12. this.workerManager = new Manager();
  13. /**
  14. * 第一步:启动workerManager
  15. * 用来记录workers和agent的信息,workers用的是Map存储,具有增删查等方法
  16. */
  17. this.messenger = new Messenger(this);
  18. /**
  19. * 步骤二:启动Messenger,用来负责进程之间的通信
  20. * 封装了send方法:
  21. * to: 消息去哪里
  22. * from: 消息从哪里来
  23. * action: 消息是干什么的
  24. * data: 消息的具体内容
  25. * ┌────────┐
  26. * │ parent │
  27. * /└────────┘\
  28. * / | \
  29. * / ┌────────┐ \
  30. * / │ master │ \
  31. * / └────────┘ \
  32. * / / \ \
  33. * ┌───────┐ ┌───────┐
  34. * │ agent │ ------- │ app │
  35. * └───────┘ └───────┘
  36. */
  37. ready.mixin(this);
  38. /**
  39. * 步骤三:给this注册了ready方法,并且给ready方法添加了回调
  40. * 回调的内容是开始监听
  41. */
  42. this.ready(() => {
  43. // ...省略代码
  44. });
  45. // ...省略代码,主要做了一些版本输出,变量保存的工作
  46. // ...省略代码,监听一些事件,执行对应的方法,例如:监听agent-start之后去启动workers
  47. this.once('agent-start', this.forkAppWorkers.bind(this));
  48. // 步骤四:先检测可用端口后通过this.options.clusterPort 存储,并且启动Agent
  49. this.detectPorts()
  50. .then(() => {
  51. // 启动agent
  52. this.forkAgentWorker();
  53. });
  54. // ...省略代码
  55. }
  56. }

forkAgentWorker方法

  1. forkAgentWorker() {
  2. // ... 省略代码
  3. // this.getAgentWorkerFile() == agent_worker.js文件
  4. // 通过childprocess fork方法创建Agent子进程
  5. const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
  6. agentWorker.status = 'starting';
  7. agentWorker.id = ++this.agentWorkerIndex;
  8. // 通过上面代码构造函数实例化的workerManager保存agentWorker
  9. this.workerManager.setAgent(agentWorker);
  10. this.log('[master] agent_worker#%s:%s start with clusterPort:%s',
  11. agentWorker.id, agentWorker.pid, this.options.clusterPort);
  12. // ...省略代码,具体内容当agent出现错误、异常关闭、收到消息等等时候,通过之前建立 的messenger发才能够消息给master
  13. }

agent_worker.js

./lib/agent_worker.js

  1. // options.framework == 'egg-example/node_modules/egg' 指向egg
  2. // 可以看到通过egg里的Agent实例化了agent,具体功能后面再做解读
  3. const Agent = require(options.framework).Agent;
  4. debug('new Agent with options %j', options);
  5. const agent = new Agent(options);
  6. function startErrorHandler(err) {
  7. consoleLogger.error(err);
  8. consoleLogger.error('[agent_worker] start error, exiting with code:1');
  9. process.exitCode = 1;
  10. process.kill(process.pid);
  11. }
  12. agent.ready(err => {
  13. // don't send started message to master when start error
  14. if (err) return;
  15. agent.removeListener('error', startErrorHandler);
  16. // 当agent启动成功后,向master发送消息,agent-start
  17. process.send({ action: 'agent-start', to: 'master' });
  18. });
  19. // exit if agent start error
  20. agent.once('error', startErrorHandler);
  21. // 通过forkAgentWorker方法,可以看到agent worker 是 master 通过 child_process.fork 出来的。
  22. // 但是在 master 意外被强杀,如 kill -9 杀掉, 那么agent worker 缺变成了孤儿进程,所以用了graceful-process
  23. gracefulExit({
  24. logger: consoleLogger,
  25. label: 'agent_worker',
  26. beforeExit: () => agent.close(),
  27. });

回到./lib/master.js 查看当监听到agent-start后是如何处理的。以下代码可以跳过不看。

  1. // 监听器
  2. this.on('agent-start', this.onAgentStart.bind(this));
  3. // 单次监听器,监听销毁,下次就不会再执行forkAppWorkers
  4. this.once('agent-start', this.forkAppWorkers.bind(this));
  5. // 事件监听器会按照添加的顺序依次调用,所以按照顺序先看onAgentStart,再看forkAppWorkers
  6. onAgentStart() {
  7. this.agentWorker.status = 'started';
  8. // 可以从代码找到,this.isAllAppWorkerStarted还没定义,要等到再看forkAppWorkers才会定义,所以此处启动过程不执行
  9. // Send egg-ready when agent is started after launched
  10. if (this.isAllAppWorkerStarted) {
  11. this.messenger.send({
  12. action: 'egg-ready',
  13. to: 'agent',
  14. data: this.options,
  15. });
  16. }
  17. // 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行
  18. this.messenger.send({
  19. action: 'egg-pids',
  20. to: 'app',
  21. data: [ this.agentWorker.pid ],
  22. });
  23. // ready之后this.isStarted才等于true,此处启动过程不执行
  24. // should send current worker pids when agent restart
  25. if (this.isStarted) {
  26. this.messenger.send({
  27. action: 'egg-pids',
  28. to: 'agent',
  29. data: this.workerManager.getListeningWorkerIds(),
  30. });
  31. }
  32. // 此处的消息最后由./lib/utils/messenger.js中的sendToAppWorker处理,因为cluster.workers还为空,不执行
  33. this.messenger.send({
  34. action: 'agent-start',
  35. to: 'app',
  36. });
  37. this.logger.info('[master] agent_worker#%s:%s started (%sms)',
  38. this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
  39. // 在终端看到的[master] agent_worker#1:17780 started (1086ms)来自于此处。
  40. }

上述代码看了一圈发现什么都没执行,我在这里就产生了疑惑,为什么on监听器要写once监听器前面,虽然肯定会过滤掉不会执行的,但是没必要空执行一次,如果once写在on前面,那么代码是可以取到work去执行想执行的任务。只能带着疑惑,继续看forkAppWorkers方法了,要等执行完forkAppWorkers后再重新回看上述代码

forkAppWorkers方法

  1. forkAppWorkers() {
  2. // ...省略代码
  3. cfork({
  4. exec: this.getAppWorkerFile(), // app_worker.js文件
  5. args,
  6. silent: false,
  7. count: this.options.workers, // 子进程个数
  8. // don't refork in local env
  9. refork: this.isProduction,
  10. windowsHide: process.platform === 'win32',
  11. });
  12. // ...省略代码
  13. }

相关知识和源码cfork

app_worker.js

  1. // ...省略代码
  2. // 可以看到通过egg里的Application实例化了app
  3. const Application = require(options.framework).Application;
  4. const app = new Application(options);
  5. app.ready(startServer);
  6. // 启动 http 服务
  7. function startServer(err) {
  8. // ...省略代码
  9. let server;
  10. // https config
  11. if (httpsOptions.key && httpsOptions.cert) {
  12. httpsOptions.key = fs.readFileSync(httpsOptions.key);
  13. httpsOptions.cert = fs.readFileSync(httpsOptions.cert);
  14. httpsOptions.ca = httpsOptions.ca && fs.readFileSync(httpsOptions.ca);
  15. // 如果是https,启动https server
  16. server = require('https').createServer(httpsOptions, app.callback());
  17. } else {
  18. // 如果是http,启动http server
  19. server = require('http').createServer(app.callback());
  20. }
  21. // ...省略代码
  22. // server启动后通过触发事件的方式,作为参数传入app,在Application中会有一个方法监听,具体内容放在下一章节
  23. // this.once('server', server => this.onServer(server));
  24. app.emit('server', server);
  25. if (options.sticky) {
  26. server.listen(options.stickyWorkerPort, '127.0.0.1');
  27. // Listen to messages sent from the master. Ignore everything else.
  28. process.on('message', (message, connection) => {
  29. if (message !== 'sticky-session:connection') {
  30. return;
  31. }
  32. // Emulate a connection event on the server by emitting the
  33. // event with the connection the master sent us.
  34. server.emit('connection', connection);
  35. connection.resume();
  36. });
  37. } else {
  38. if (listenConfig.path) {
  39. server.listen(listenConfig.path);
  40. } else {
  41. if (typeof port !== 'number') {
  42. consoleLogger.error('[app_worker] port should be number, but got %s(%s)', port, typeof port);
  43. exitProcess();
  44. return;
  45. }
  46. const args = [ port ];
  47. if (listenConfig.hostname) args.push(listenConfig.hostname);
  48. debug('listen options %s', args);
  49. server.listen(...args);
  50. }
  51. }
  52. }
  53. // ...省略代码

agent和app启动完成

除了以上提到的内容之外,master监听了很多事件来管理agent和app。

  1. this.on('agent-exit', this.onAgentExit.bind(this));// agent退出时
  2. this.on('agent-start', this.onAgentStart.bind(this));// agent启动时
  3. this.on('app-exit', this.onAppExit.bind(this));// app退出时
  4. this.on('app-start', this.onAppStart.bind(this));// app启动时
  5. this.on('reload-worker', this.onReload.bind(this));// agent重新启动时
  6. // kill(2) Ctrl-C
  7. process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
  8. // kill(3) Ctrl-\
  9. process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
  10. // kill(15) default
  11. process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
  12. process.once('exit', this.onExit.bind(this));

总结

通过ps aux | grep egg,可以抓取到当前运行的进程,输出符合预想。

  1. Green 18288 10.0 0.8 4685840 66084 s001 S+ 2:27下午 0:00.96 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-bin@4.14.1@egg-bin/lib/start-cluster {"typescript":false,"declarations":true,"workers":1,"baseDir":"/Volumes/sec/projects/egg-example","framework":"/Volumes/sec/projects/egg-example/node_modules/egg"}
  2. Green 18300 0.0 0.0 4287740 748 s003 S+ 2:28下午 0:00.01 grep egg
  3. Green 18294 0.0 1.1 4651768 90768 s001 S+ 2:27下午 0:01.10 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/app_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}
  4. Green 18293 0.0 1.0 4634148 86752 s001 S+ 2:27下午 0:00.99 /usr/local/bin/node --require /Volumes/sec/projects/egg-example/node_modules/_egg-ts-helper@1.25.8@egg-ts-helper/register.js /Volumes/sec/projects/egg-example/node_modules/_egg-cluster@1.26.0@egg-cluster/lib/agent_worker.js {"framework":"/Volumes/sec/projects/egg-example/node_modules/egg","baseDir":"/Volumes/sec/projects/egg-example","workers":1,"plugins":null,"https":false,"typescript":false,"declarations":true,"clusterPort":58502}
  5. Green 18287 0.0 0.3 4603364 28020 s001 S+ 2:27下午 0:00.69 node /Volumes/sec/projects/egg-example/node_modules/.bin/egg-bin dev

官方提供的流程如下图

+---------+           +---------+          +---------+
|  Master |           |  Agent  |          |  Worker |
+---------+           +----+----+          +----+----+
     |      fork agent     |                    |
     +-------------------->|                    |
     |      agent ready    |                    |
     |<--------------------+                    |
     |                     |     fork worker    |
     +----------------------------------------->|
     |     worker ready    |                    |
     |<-----------------------------------------+
     |      Egg ready      |                    |
     +-------------------->|                    |
     |      Egg ready      |                    |
     +----------------------------------------->|
  1. Master 启动后先 fork Agent 进程
  2. Agent 初始化成功后,通过 IPC 通道通知 Master
  3. Master 再 fork 多个 App Worker
  4. App Worker 初始化成功,通知 Master
  5. 所有的进程初始化成功后,Master 通知 Agent 和 Worker 应用启动成功

总结来说:egg-cluster 是 Egg 的多进程模型的启动模式,在使用 cluster 模式启动 Egg 应用时,我们只需要配置相关启动参数, egg-cluster 会自动创建相关进程,并管理各个进程之间的通信以及异常处理等问题。主进程 Master 通过 child_process 模块的 fork 函数创建 Agent 子进程,并通过 cluster 模块创建 Worker 子进程。Master/Agent/Worker 三者各司其职,共同保证 Egg 应用的正常运行。按官方的玩笑话来说,Master就是个包工头,不干活,只负责盯着下面的工人活干的怎么样,有没有出问题,而Agent和Worker才是干活的工人。而两个工人具体干的是什么活,请看下一章。