克隆与负载均衡

要给应用程序做扩展,其中一项必要的前提,就是确保该程序的每个实例,都不需要把有待共享的信息,保存在无法分享的资源里面,例如内存或磁盘中。
比如:如果 Web 服务器程序把会话数据(session data)保存在内存或磁盘里面,那么这个程序将不好扩展。如果让这些实例都访问同一套共享数据库,则无论以后部署在何处,它们都可以访问到同一份信息。

cluster 模块

能够对应用程序做分支(fork),让它产生新的实例,并且会把外界传来的连接请求,自动分布到各个实例上面

image.png
主进程(master process)负责产生一批工作进程(worker process),每条工作进程,都是我们要扩展的这个应用程序的一个实例。传入的连接请求会分摊到这些工作进程上面,让每条进程都只承担整个负载量之中的一部分。
由于每条工作进程都是独立的,所以可以开启许多条这样的进程,直至数量与系统所能使用的 CPU 核心数相同为止。

注意 cluster 模块的一些行为

在大多数操作系统上,cluster 模块都会明确采用轮询式的负载均衡(round-robin load balancing)算法来运作

轮询算法会轮流分配请求,确保每个服务器实例所收到工作量大致相同。它会把第一条请求发给第一个服务器实例,把第二条请求发给列表中的下一个服务器,依此类推,如果走到了列表末尾,那么就返回开头,继续分配。 cluster 模块会添加一些额外的机制,以防止某条工作进程担负的工作量过大

构建简单的 HTTP 服务器

  1. import { createServer } from 'http'
  2. // 进程标识符(process identifier, PID)
  3. // 可以用来辨别这项请求是由服务器程序的哪一个实例来处理的
  4. const { pid } = process
  5. const server = createServer((req, res) => {
  6. // simulates(模拟) CPU intensive work(密集型任务)
  7. let i = 1e7; while (i > 0) { i-- }
  8. console.log(`Handling request from ${pid}`)
  9. res.end(`Hello from ${pid}\n`)
  10. })
  11. server.listen(8080, () => console.log(`Started at ${pid}`))

使用 cluster 模块来扩展应用程序

  1. if (cluster.isMaster) {
  2. // 调用 fork() 做分支
  3. } else {
  4. // 执行实际工作
  5. }
  1. // app.js
  2. import { createServer } from 'http'
  3. import { cpus } from 'os'
  4. import cluster from 'cluster'
  5. if (cluster.isMaster) { // ①
  6. const availableCpus = cpus()
  7. console.log(`Clustering to ${availableCpus.length} processes`)
  8. availableCpus.forEach(() => cluster.fork())
  9. } else { // ②
  10. const { pid } = process
  11. const server = createServer((req, res) => {
  12. let i = 1e7; while (i > 0) { i-- }
  13. console.log(`Handling request from ${pid}`)
  14. res.end(`Hello from ${pid}\n`)
  15. })
  16. server.listen(8080, () => console.log(`Started at ${pid}`))
  17. }

在主进程通过 cluster.fork() 做分支时,分出来的这条进程执行的也是同一个模块,即 app.js 模块。这次该模块是在工作进程里面执行的,因此 cluster.isMaster 的值是 false,而 cluster.isWorker 的值是 true

每条工作进程都是一条独立的 Node.js 进程,它有自己的事件循环、内存空间,并且可以加载自己的一套模块

cluster.fork 函数是通过 child_process.fork 这个 API 来实现的,这意味着主进程与工作进程之间会自动建立一条通信渠道。通过 cluster.workers 变量查出当前的所有工作进程后,就可以给这些进程发送广播消息

  1. Object.values(cluster.workers).forEach(worker => {
  2. worker.send('hello from the master')
  3. })

用 cluster 模块提升程序的弹性与可用性

所有的工作进程都是单独运作的,可以根据程序需要,专门停掉其中的某条进程,或专门让这条进程重生(respawn),而不影响其他的工作进程。只要程序中还有活跃的工作进程处于活跃状态,服务器就能继续接受连接请求。Node.js 本身不会自动管理工作进程的数量,需要应用程序根据自己的需求来管理进程池中的这些进程。

  1. import { createServer } from 'http'
  2. import { cpus } from 'os'
  3. import cluster from 'cluster'
  4. if (cluster.isMaster) {
  5. const availableCpus = cpus()
  6. console.log(`Clustering to ${availableCpus.length} processes`)
  7. availableCpus.forEach(() => cluster.fork())
  8. cluster.on('exit', (worker, code) => {
  9. if (code !== 0 && !worker.exitedAfterDisconnect) {
  10. console.log(`Worker ${worker.process.pid} crashed. Starting a new worker`)
  11. cluster.fork()
  12. }
  13. })
  14. } else {
  15. setTimeout(
  16. () => { throw new Error('Ooops') },
  17. Math.ceil(Math.random() * 3) * 1000
  18. )
  19. const { pid } = process
  20. const server = createServer((req, res) => {
  21. let i = 1e7; while (i > 0) { i-- }
  22. console.log(`Handling request from ${pid}`)
  23. res.end(`Hello from ${pid}\n`)
  24. })
  25. server.listen(8080, () => console.log(`Started at ${pid}`))
  26. }
  • code !== 0:表示该进程因错误而退出
  • !worker.exitedAfterDisconnect:表示这条进程不是主进程故意叫停的

    Zero-downtime restart(无宕机时间的重启)

    使用 cluster 模块实现,只需要每次重启其中一条工作进程就好,其余的工作进程能够照常运行,以确保应用程序的服务能力不受影响

  1. import { createServer } from 'http'
  2. import { cpus } from 'os'
  3. import { once } from 'events'
  4. import cluster from 'cluster'
  5. if (cluster.isMaster) {
  6. const availableCpus = cpus()
  7. console.log(`Clustering to ${availableCpus.length} processes`)
  8. availableCpus.forEach(() => cluster.fork())
  9. cluster.on('exit', (worker, code) => {
  10. if (code !== 0 && !worker.exitedAfterDisconnect) {
  11. console.log(`Worker ${worker.process.pid} crashed. Starting a new worker`)
  12. cluster.fork()
  13. }
  14. })
  15. process.on('SIGUSR2', async () => { // ①
  16. const workers = Object.values(cluster.workers)
  17. for (const worker of workers) { // ②
  18. console.log(`Stopping worker: ${worker.process.pid}`)
  19. worker.disconnect() // ③
  20. await once(worker, 'exit')
  21. if (!worker.exitedAfterDisconnect) continue
  22. const newWorker = cluster.fork() // ④
  23. await once(newWorker, 'listening') // ⑤
  24. }
  25. })
  26. } else {
  27. const { pid } = process
  28. const server = createServer((req, res) => {
  29. let i = 1e7; while (i > 0) { i-- }
  30. console.log(`Handling request from ${pid}`)
  31. res.end(`Hello from ${pid}\n`)
  32. })
  33. server.listen(8080, () => console.log(`Started at ${pid}`))
  34. }
  • 调用 worker.disconnect方法,让这条进程能够正常终止。如果该进程正在处理某项请求,程序不会让这个处理过程突然中断,而是会等所有正在处理的请求全部处理完毕,才让这条工作进程退出

参考

  1. github