Node.js默认单进程运行,对于64位最高可以使用1GB内存。
只有一个核在运行,就不能发挥多核CPU的优势。cluster模块就是为了解决这个问题而提出的。
Node中解决这个问题的方式有:

  • child_process 子进程
  • worker 多线程
  • cluster

这里重点学习下cluster模块

cluster基本概念

cluster 多线程池(集群)是node的一个内置模块,自动创建子进程,同时,程序员不需要理会master和child的通信过程,这些都是封装的。
下面展示了基本使用的方法:需要isMaster检查当前进程在子进程还是主进程,因为文件是脚本,子进程也会执行。如果是主进程,那么就fork出子进程,下面fork了几个子进程:

  1. const cluster = require('cluster');
  2. const os = require('os');
  3. if (cluster.isMaster){ // 判断是主进程
  4. for (let i = 0, n = os.cpus().length; i < n; i += 1){
  5. cluster.fork();
  6. }
  7. } else { // 子进程走这里
  8. http.createServer(function(req, res) {
  9. res.writeHead(200);
  10. res.end("hello world\n");
  11. }).listen(8000);
  12. }

cluster.fork()返回worker对象,此对象有属性和方法:

  • worker.id :唯一PID
  • worker.process: worker的process对象
  • worker.send(), 可以理解为:send to me 是要在主进程中调用,向子进程发消息用的,可以发送:string、json对象等

    1. if (cluster.isMaster) {
    2. const worker = cluster.fork();
    3. worker.send('hi son'); // 向worker发消息
    4. }

    在worker进程中用process.on监听message事件就能得到消息

    1. process.on('message', function(message) {
    2. console.log(message);
    3. });

    cluster 的属性和方法有:

  • isMaster: 主进程判断

  • isWorker: 子进程判断
  • workers:是一个对象,key是进程PID,value是进程对象
  • fork(): 新建进程
  • kill(): 终止进程

    进程通信

    cluster可以监听 online、exit事件,当一个子线程被fork,会emit一个online事件,当一个进程退出会有一个exit事件,所以可以利用这个简单做一个进程守护: ```javascript var cluster = require(‘cluster’);

if(cluster.isMaster) { var numWorkers = require(‘os’).cpus().length; console.log(‘Master cluster setting up ‘ + numWorkers + ‘ workers…’);

for(var i = 0; i < numWorkers; i++) { cluster.fork(); }

cluster.on(‘online’, function(worker) { console.log(‘Worker ‘ + worker.process.pid + ‘ is online’); });

// 当有进程退出就重新fork一个 cluster.on(‘exit’, function(worker, code, signal) { console.log( ‘Worker ‘ + worker.process.pid + ‘ died with code: ‘ + code + ‘, and signal: ‘ + signal ); console.log(‘Starting a new worker’); cluster.fork(); }); }

  1. 除了exitonline事件之外,worker进程还可以监听自定义事件:message,以及send事件(父子进程通信的话,都用process.on() \ process.send() 接受和发送消息)
  2. ```javascript
  3. worker.on('message', function(msg) { /* do sth */ })
  4. worker.send('my_event' + worker.process.pid);

利用这个也可以做一个有心跳检测机制的进程守护,见后文。

进程守护

当程序出现全局错误(未被catch到),这时候没有其他机制的情况下,node server将因为异常而终止。
这时候可能需要再把这个服务拉起来,这个操作就是进程守护。
在处理异常时候,相关的策略有:

  • 子进程捕获uncaughtException异常,并主动退出;
  • 父进程监听子进程退出事件,并延迟创建子进程;
  • 子进程监控内存占用过大,主动退出;
  • 防止僵尸进程:父进程监听子进程心跳,3次没心跳,杀掉子进程; ```javascript const cluster = require(‘cluster’);

if (cluster.isMaster) { // 核心框架 } else {

}

  1. <a name="IahL6"></a>
  2. ##### 子进程捕获uncaught异常
  3. 子进程捕获uncaught异常,并主动退出
  4. ```javascript
  5. // 子进程
  6. process.on('uncaughtException', function (err) {
  7. // 这里可以做写日志的操作
  8. console.log(err);
  9. // 退出进程
  10. process.exit(1);
  11. });

主动退出之前注意日志上报或者记录,这是一种兜底机制。

监听子进程退出事件

父进程监听子进程退出事件,并延迟创建子进程(延迟的目的是害怕子进程一死主进程立即唤起,然后子进程又死了,又唤起,这样一直循环)

  1. cluster.on('exit', function () { // 父进程监听子进程退出事件,并延迟创建子进程
  2. setTimeout(() => {
  3. createWorker() // 封装的cluster.fork(),以及其他操作
  4. }, 5000)
  5. })

子进程监控内存占用过大,主动退出

这个算是内存泄漏的兜底。
有的时候可能存在为被定位到的内存泄漏,就需要这种方式了,process.memoryUsage().rss 可以告诉你当前的进程

  1. // 子进程代码
  2. // 内存使用过多,自己退出进程
  3. setTimeout(() => {
  4. if (process.memoryUsage().rss > 734003200) {
  5. process.exit(1);
  6. }
  7. }, 5000)

通过心跳包,防止僵尸进程

比如代码中出现不期望的while(true){ } 同时没有退出,这样的进程就是僵尸进程了——没有死,但是已经不能响应其他请求了
这样的话需要主进程把僵尸进程杀掉,同时再拉起一个新的进程替代。
但是怎么检测这是一个僵尸进程进程呢?主进程定时发送心跳,能正确回应的进程就不是僵尸进程。

  1. /**
  2. * 简单的进程守护器
  3. */
  4. const cluster = require('cluster');
  5. if (cluster.isMaster) {
  6. // 构建进程
  7. for (let i = 0; i < require('os').cpus().length / 2; i++) {
  8. createWorker();
  9. }
  10. function createWorker() {
  11. // 创建子进程并进行心跳监控
  12. var worker = cluster.fork();
  13. var missed = 0;// 没有回应的ping次数
  14. // 心跳
  15. var timer = setInterval(function () {
  16. // 三次没回应,杀之
  17. if (missed == 3) {
  18. clearInterval(timer);
  19. console.log(worker.process.pid + ' has become a zombie!');
  20. process.kill(worker.process.pid);
  21. return;
  22. }
  23. // 开始心跳
  24. missed++;
  25. worker.send('ping#' + worker.process.pid);
  26. }, 10000);
  27. worker.on('message', function (msg) {
  28. // 确认心跳回应。
  29. if (msg == 'pong#' + worker.process.pid) {
  30. missed--;
  31. }
  32. });
  33. // 挂了就没必要再进行心跳了
  34. worker.on('exit', function () {
  35. clearInterval(timer);
  36. });
  37. }
  38. } else {
  39. // 回应心跳信息
  40. process.on('message', function (msg) {
  41. if (msg == 'ping#' + process.pid) {
  42. process.send('pong#' + process.pid);
  43. }
  44. });
  45. // 执行任务代码
  46. require('./app')
  47. }

上面的代码中,我们看到:

  • 主进程通过var worker = cluster.fork();构造进程之后,通过worker.send(‘ping#’ + worker.process.pid);给子进程发送心跳消息。主进程也同时监听着子进程的消息;
  • 子进程通过process.on(‘message’, fn)监听,判断是心跳消息后,及时按照约定回复主进程;
  • 主进程每发一条消息,计数器加一,每收到子进程的回调,计数器减一,
  • 这样,每次定时发送消息前,判断计数器有没有达到判定子进程是僵尸进程次数,如果达到了,将子进程杀死process.kill(worker.process.pid)。

cluster和pm2有什么关系?

PM2模块是cluster模块的一个包装层。它的作用是尽量将cluster模块抽象掉,让用户像使用单进程一样,部署多进程Node应用

  1. pm2 start app.js -i 4 // 开启
  2. pm2 reload all // 零停机热重启

i参数告诉PM2,这段代码应该在cluster_mode启动,且新建worker进程的数量是4个。
如果i参数的值是0,那么当前机器有几个CPU内核,PM2就会启动几个worker进程

多个进程监听同一个端口

cluster 实现了多个进程监听同一个端口的功能,本质还是由master进行真正的端口监听,然后通过一定的策略将链接转交于子进程。
这个转交(也可以理解为负载均衡)的策略是: round-robin,时间片轮转法,所有平台的默认方案(除了 windows)。
主进程监听端口,接收到新连接之后,通过 时间片轮转法 来决定将接收到的客户端的 socket 句柄传递给指定的 worker 处理。
(通常在 Nginx 上采用另一种算法:WRR,加权轮转法)

实现类似pm2零停机热重启的思路

我们的目的是期望实现所有子进程挨个重启一遍,同时server正常运行,那也就是说至少要保证有一个子进程工作着,最简单的思路就是,遍历当前所有子进程,一个一个重启,重启完了一个再重启下一个:

  1. Linux向主进程发送信号: SIGUSR2 一个用户命令
    1. kill -SIGUSR2 PID

    2. 主进程收到信号之后,执行热重启操作 ```javascript const workers = Object.values(cluster.workers);

const restartWorker = workerIndex => { const worker = workers[workerIndex]; // 获得一个子进程 if (!worker) return;

worker.on(“exit”, () => { if (!worker.exitedAfterDisconnect) return; console.log(Exited process ${worker.process.pid});

  1. // 重新启动
  2. cluster.fork().on("listening", () => { // 启动起来了就启动下一个
  3. restartWorker(workerIndex + 1);
  4. });

});

worker.disconnect(); };

// 当需要重启的时候,从第一个开始 restartWorker(0);

``` disconnect方法杀死worker进程,这种情况下 worker.exitedAfterDisconnect值为true
新启动的进程监听listening事件,表示启动成功,这时候再去重新启动下一个子进程