来源: Go编程时光
偶然间看到一篇写于15年的文章,说实话,标题确实吸引了我。
关于这篇文章,我就不直接翻译了,原文地址我放在文章最后了。
项目的需求就是很简单,客户端发送请求,服务端接收请求处理数据(原文是把资源上传至 Amazon S3 资源中)。本质上就是这样,如何使用 Go 每分钟处理百万请求? - 图1
我稍微改动了原文的业务代码,但是并不影响核心模块。在第一版中,每收到一个 Request,开启一个 G 进行处理,很常规的操作。

初版

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "net/http"
  6. "time"
  7. )
  8. type Payload struct {
  9. // 传啥不重要
  10. }
  11. func (p *Payload) UpdateToS3() error {
  12. //存储逻辑,模拟操作耗时
  13. time.Sleep(500 * time.Millisecond)
  14. fmt.Println("上传成功")
  15. return nil
  16. }
  17. func payloadHandler(w http.ResponseWriter, r *http.Request) {
  18. // 业务过滤
  19. // 请求body解析......
  20. var p Payload
  21. go p.UpdateToS3()
  22. w.Write([]byte("操作成功"))
  23. }
  24. func main() {
  25. http.HandleFunc("/payload", payloadHandler)
  26. log.Fatal(http.ListenAndServe(":8099", nil))
  27. }

这样操作存在什么问题呢?一般情况下,没什么问题。但是如果是高并发的场景下,不对 G 进行控制,你的 CPU 使用率暴涨,内存占用暴涨……,直至程序奔溃。

如果此操作落地至数据库,例如mysql。相应的,你数据库服务器的磁盘IO、网络带宽 、CPU负载、内存消耗都会达到非常高的情况,一并奔溃。所以,一旦程序中出现不可控的事物,往往是危险的信号。

中版


  • ```go

package main

import ( “fmt” “log” “net/http” “time” )

const MaxQueue = 400

var Queue chan Payload

func init() { Queue = make(chan Payload, MaxQueue) }

type Payload struct { // 传啥不重要 }

func (p Payload) UpdateToS3() error { //存储逻辑,模拟操作耗时 time.Sleep(500 time.Millisecond) fmt.Println(“上传成功”) return nil }

func payloadHandler(w http.ResponseWriter, r *http.Request) { // 业务过滤 // 请求body解析…… var p Payload //go p.UpdateToS3() Queue <- p w.Write([]byte(“操作成功”)) }

// 处理任务 func StartProcessor() { for { select { case payload := <-Queue: payload.UpdateToS3() } } }

func main() { http.HandleFunc(“/payload”, payloadHandler) //单独开一个g接收与处理任务 go StartProcessor() log.Fatal(http.ListenAndServe(“:8099”, nil)) }

  1. 这一版借助带 buffered channel 来完成这个功能,这样控制住了无限制的G,但是依然没有解决问题。<br />原因是处理请求是一个同步的操作,每次只会处理一个任务,然而高并发下请求进来的速度会远远超过处理的速度。这种情况,一旦 channel 满了之后, 后续的请求将会被阻塞等待。然后你会发现,响应的时间会大幅度的开始增加, 甚至不再有任何的响应。
  2. <a name="xwNUA"></a>
  3. ### 终版
  4. ```go
  5. package main
  6. import (
  7. "fmt"
  8. "log"
  9. "net/http"
  10. "time"
  11. )
  12. const (
  13. MaxWorker = 100 //随便设置值
  14. MaxQueue = 200 // 随便设置值
  15. )
  16. // 一个可以发送工作请求的缓冲 channel
  17. var JobQueue chan Job
  18. func init() {
  19. JobQueue = make(chan Job, MaxQueue)
  20. }
  21. type Payload struct{}
  22. type Job struct {
  23. PayLoad Payload
  24. }
  25. type Worker struct {
  26. WorkerPool chan chan Job
  27. JobChannel chan Job
  28. quit chan bool
  29. }
  30. func NewWorker(workerPool chan chan Job) Worker {
  31. return Worker{
  32. WorkerPool: workerPool,
  33. JobChannel: make(chan Job),
  34. quit: make(chan bool),
  35. }
  36. }
  37. // Start 方法开启一个 worker 循环,监听退出 channel,可按需停止这个循环
  38. func (w Worker) Start() {
  39. go func() {
  40. for {
  41. // 将当前的 worker 注册到 worker 队列中
  42. w.WorkerPool <- w.JobChannel
  43. select {
  44. case job := <-w.JobChannel:
  45. // 真正业务的地方
  46. // 模拟操作耗时
  47. time.Sleep(500 * time.Millisecond)
  48. fmt.Printf("上传成功:%v\n", job)
  49. case <-w.quit:
  50. return
  51. }
  52. }
  53. }()
  54. }
  55. func (w Worker) stop() {
  56. go func() {
  57. w.quit <- true
  58. }()
  59. }
  60. // 初始化操作
  61. type Dispatcher struct {
  62. // 注册到 dispatcher 的 worker channel 池
  63. WorkerPool chan chan Job
  64. }
  65. func NewDispatcher(maxWorkers int) *Dispatcher {
  66. pool := make(chan chan Job, maxWorkers)
  67. return &Dispatcher{WorkerPool: pool}
  68. }
  69. func (d *Dispatcher) Run() {
  70. // 开始运行 n 个 worker
  71. for i := 0; i < MaxWorker; i++ {
  72. worker := NewWorker(d.WorkerPool)
  73. worker.Start()
  74. }
  75. go d.dispatch()
  76. }
  77. func (d *Dispatcher) dispatch() {
  78. for {
  79. select {
  80. case job := <-JobQueue:
  81. go func(job Job) {
  82. // 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker
  83. jobChannel := <-d.WorkerPool
  84. // 分发任务到 worker job channel 中
  85. jobChannel <- job
  86. }(job)
  87. }
  88. }
  89. }
  90. // 接收请求,把任务筛入JobQueue。
  91. func payloadHandler(w http.ResponseWriter, r *http.Request) {
  92. work := Job{PayLoad: Payload{}}
  93. JobQueue <- work
  94. _, _ = w.Write([]byte("操作成功"))
  95. }
  96. func main() {
  97. // 通过调度器创建worker,监听来自 JobQueue的任务
  98. d := NewDispatcher(MaxWorker)
  99. d.Run()
  100. http.HandleFunc("/payload", payloadHandler)
  101. log.Fatal(http.ListenAndServe(":8099", nil))
  102. }

最终采用的是两级 channel,一级是将用户请求数据放入到 chan Job 中,这个 channel job 相当于待处理的任务队列。
另一级用来存放可以处理任务的 work 缓存队列,类型为 chan chan Job。调度器把待处理的任务放入一个空闲的缓存队列当中,work 会一直处理它的缓存队列。通过这种方式,实现了一个 worker 池。大致画了一个图帮助理解,如何使用 Go 每分钟处理百万请求? - 图2
首先我们在接收到一个请求后,创建 Job 任务,把它放入到任务队列中等待 work 池处理。

  1. func payloadHandler(w http.ResponseWriter, r *http.Request) {
  2. job := Job{PayLoad: Payload{}}
  3. JobQueue <- work
  4. _, _ = w.Write([]byte("操作成功"))
  5. }

调度器初始化work池后,在 dispatch 中,一旦我们接收到 JobQueue 的任务,就去尝试获取一个可用的 worker,分发任务给 worker 的 job channel 中。 注意这个过程不是同步的,而是每接收到一个 job,就开启一个 G 去处理。这样可以保证 JobQueue 不需要进行阻塞,对应的往 JobQueue 理论上也不需要阻塞地写入任务。


  • ```go

func (d *Dispatcher) Run() { // 开始运行 n 个 worker for i := 0; i < MaxWorker; i++ { worker := NewWorker(d.WorkerPool) worker.Start() } go d.dispatch() }

func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: go func(job Job) { // 尝试获取一个可用的 worker job channel,阻塞直到有可用的 worker jobChannel := <-d.WorkerPool // 分发任务到 worker job channel 中 jobChannel <- job }(job) } } } ```

这里”不可控”的 G 和上面还是又所不同的。仅仅极短时间内处于阻塞读 Chan 状态, 当有空闲的 worker 被唤醒,然后分发任务,整个生命周期远远短于上面的操作。

附录
[1]http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/