Node.js默认单进程运行,对于64位最高可以使用1GB内存。
只有一个核在运行,就不能发挥多核CPU的优势。cluster模块就是为了解决这个问题而提出的。
Node中解决这个问题的方式有:
- child_process 子进程
- worker 多线程
- cluster
cluster基本概念
cluster 多线程池(集群)是node的一个内置模块,自动创建子进程,同时,程序员不需要理会master和child的通信过程,这些都是封装的。
下面展示了基本使用的方法:需要isMaster检查当前进程在子进程还是主进程,因为文件是脚本,子进程也会执行。如果是主进程,那么就fork出子进程,下面fork了几个子进程:
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster){ // 判断是主进程
for (let i = 0, n = os.cpus().length; i < n; i += 1){
cluster.fork();
}
} else { // 子进程走这里
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
cluster.fork()返回worker对象,此对象有属性和方法:
- worker.id :唯一PID
- worker.process: worker的process对象
worker.send(), 可以理解为:send to me 是要在主进程中调用,向子进程发消息用的,可以发送:string、json对象等
if (cluster.isMaster) {
const worker = cluster.fork();
worker.send('hi son'); // 向worker发消息
}
在worker进程中用process.on监听message事件就能得到消息
process.on('message', function(message) {
console.log(message);
});
cluster 的属性和方法有:
isMaster: 主进程判断
- isWorker: 子进程判断
- workers:是一个对象,key是进程PID,value是进程对象
- fork(): 新建进程
- kill(): 终止进程
进程通信
cluster可以监听 online、exit事件,当一个子线程被fork,会emit一个online事件,当一个进程退出会有一个exit事件,所以可以利用这个简单做一个进程守护: ```javascript var cluster = require(‘cluster’);
if(cluster.isMaster) { var numWorkers = require(‘os’).cpus().length; console.log(‘Master cluster setting up ‘ + numWorkers + ‘ workers…’);
for(var i = 0; i < numWorkers; i++) { cluster.fork(); }
cluster.on(‘online’, function(worker) { console.log(‘Worker ‘ + worker.process.pid + ‘ is online’); });
// 当有进程退出就重新fork一个 cluster.on(‘exit’, function(worker, code, signal) { console.log( ‘Worker ‘ + worker.process.pid + ‘ died with code: ‘ + code + ‘, and signal: ‘ + signal ); console.log(‘Starting a new worker’); cluster.fork(); }); }
除了exit,online事件之外,worker进程还可以监听自定义事件:message,以及send事件(父子进程通信的话,都用process.on() \ process.send() 接受和发送消息)
```javascript
worker.on('message', function(msg) { /* do sth */ })
worker.send('my_event' + worker.process.pid);
进程守护
当程序出现全局错误(未被catch到),这时候没有其他机制的情况下,node server将因为异常而终止。
这时候可能需要再把这个服务拉起来,这个操作就是进程守护。
在处理异常时候,相关的策略有:
- 子进程捕获uncaughtException异常,并主动退出;
- 父进程监听子进程退出事件,并延迟创建子进程;
- 子进程监控内存占用过大,主动退出;
- 防止僵尸进程:父进程监听子进程心跳,3次没心跳,杀掉子进程; ```javascript const cluster = require(‘cluster’);
if (cluster.isMaster) { // 核心框架 } else {
}
<a name="IahL6"></a>
##### 子进程捕获uncaught异常
子进程捕获uncaught异常,并主动退出
```javascript
// 子进程
process.on('uncaughtException', function (err) {
// 这里可以做写日志的操作
console.log(err);
// 退出进程
process.exit(1);
});
主动退出之前注意日志上报或者记录,这是一种兜底机制。
监听子进程退出事件
父进程监听子进程退出事件,并延迟创建子进程(延迟的目的是害怕子进程一死主进程立即唤起,然后子进程又死了,又唤起,这样一直循环)
cluster.on('exit', function () { // 父进程监听子进程退出事件,并延迟创建子进程
setTimeout(() => {
createWorker() // 封装的cluster.fork(),以及其他操作
}, 5000)
})
子进程监控内存占用过大,主动退出
这个算是内存泄漏的兜底。
有的时候可能存在为被定位到的内存泄漏,就需要这种方式了,process.memoryUsage().rss 可以告诉你当前的进程
// 子进程代码
// 内存使用过多,自己退出进程
setTimeout(() => {
if (process.memoryUsage().rss > 734003200) {
process.exit(1);
}
}, 5000)
通过心跳包,防止僵尸进程
比如代码中出现不期望的while(true){ } 同时没有退出,这样的进程就是僵尸进程了——没有死,但是已经不能响应其他请求了
这样的话需要主进程把僵尸进程杀掉,同时再拉起一个新的进程替代。
但是怎么检测这是一个僵尸进程进程呢?主进程定时发送心跳,能正确回应的进程就不是僵尸进程。
/**
* 简单的进程守护器
*/
const cluster = require('cluster');
if (cluster.isMaster) {
// 构建进程
for (let i = 0; i < require('os').cpus().length / 2; i++) {
createWorker();
}
function createWorker() {
// 创建子进程并进行心跳监控
var worker = cluster.fork();
var missed = 0;// 没有回应的ping次数
// 心跳
var timer = setInterval(function () {
// 三次没回应,杀之
if (missed == 3) {
clearInterval(timer);
console.log(worker.process.pid + ' has become a zombie!');
process.kill(worker.process.pid);
return;
}
// 开始心跳
missed++;
worker.send('ping#' + worker.process.pid);
}, 10000);
worker.on('message', function (msg) {
// 确认心跳回应。
if (msg == 'pong#' + worker.process.pid) {
missed--;
}
});
// 挂了就没必要再进行心跳了
worker.on('exit', function () {
clearInterval(timer);
});
}
} else {
// 回应心跳信息
process.on('message', function (msg) {
if (msg == 'ping#' + process.pid) {
process.send('pong#' + process.pid);
}
});
// 执行任务代码
require('./app')
}
上面的代码中,我们看到:
- 主进程通过var worker = cluster.fork();构造进程之后,通过worker.send(‘ping#’ + worker.process.pid);给子进程发送心跳消息。主进程也同时监听着子进程的消息;
- 子进程通过process.on(‘message’, fn)监听,判断是心跳消息后,及时按照约定回复主进程;
- 主进程每发一条消息,计数器加一,每收到子进程的回调,计数器减一,
- 这样,每次定时发送消息前,判断计数器有没有达到判定子进程是僵尸进程次数,如果达到了,将子进程杀死process.kill(worker.process.pid)。
cluster和pm2有什么关系?
PM2模块是cluster模块的一个包装层。它的作用是尽量将cluster模块抽象掉,让用户像使用单进程一样,部署多进程Node应用
pm2 start app.js -i 4 // 开启
pm2 reload all // 零停机热重启
i参数告诉PM2,这段代码应该在cluster_mode启动,且新建worker进程的数量是4个。
如果i参数的值是0,那么当前机器有几个CPU内核,PM2就会启动几个worker进程
多个进程监听同一个端口
cluster 实现了多个进程监听同一个端口的功能,本质还是由master进行真正的端口监听,然后通过一定的策略将链接转交于子进程。
这个转交(也可以理解为负载均衡)的策略是: round-robin,时间片轮转法,所有平台的默认方案(除了 windows)。
主进程监听端口,接收到新连接之后,通过 时间片轮转法 来决定将接收到的客户端的 socket 句柄传递给指定的 worker 处理。
(通常在 Nginx 上采用另一种算法:WRR,加权轮转法)
实现类似pm2零停机热重启的思路
我们的目的是期望实现所有子进程挨个重启一遍,同时server正常运行,那也就是说至少要保证有一个子进程工作着,最简单的思路就是,遍历当前所有子进程,一个一个重启,重启完了一个再重启下一个:
- Linux向主进程发送信号: SIGUSR2 一个用户命令
kill -SIGUSR2 PID
2. 主进程收到信号之后,执行热重启操作 ```javascript const workers = Object.values(cluster.workers);
const restartWorker = workerIndex => { const worker = workers[workerIndex]; // 获得一个子进程 if (!worker) return;
worker.on(“exit”, () => {
if (!worker.exitedAfterDisconnect) return;
console.log(Exited process ${worker.process.pid}
);
// 重新启动
cluster.fork().on("listening", () => { // 启动起来了就启动下一个
restartWorker(workerIndex + 1);
});
});
worker.disconnect(); };
// 当需要重启的时候,从第一个开始 restartWorker(0);
```
disconnect方法杀死worker进程,这种情况下 worker.exitedAfterDisconnect值为true
新启动的进程监听listening事件,表示启动成功,这时候再去重新启动下一个子进程