14.10 复用

14.10.1 典型的客户端/服务器(C/S)模式

客户端-服务器应用正是 goroutines 和 channels 的亮点所在。

客户端 (Client) 可以是运行在任意设备上的任意程序,它会按需发送请求 (request) 至服务器。服务器 (Server) 接收到这个请求后开始相应的工作,然后再将响应 (response) 返回给客户端。典型情况下一般是多个客户端(即多个请求)对应一个(或少量)服务器。例如我们日常使用的浏览器客户端,其功能就是向服务器请求网页。而 Web 服务器则会向浏览器响应网页数据。

使用 Go 的服务器通常会在协程中执行向客户端的响应,故而会对每一个客户端请求启动一个协程。一个常用的操作方法是客户端请求自身中包含一个通道,而服务器则向这个通道发送响应。

例如下面这个 Request 结构,其中内嵌了一个 replyc 通道。

  1. type Request struct {
  2. a, b int
  3. replyc chan int // reply channel inside the Request
  4. }

或者更通俗的:

  1. type Reply struct{...}
  2. type Request struct{
  3. arg1, arg2, arg3 some_type
  4. replyc chan *Reply
  5. }

接下来先使用简单的形式,服务器会为每一个请求启动一个协程并在其中执行 run() 函数,此举会将类型为 binOpop 操作返回的 int 值发送到 replyc 通道。

  1. type binOp func(a, b int) int
  2. func run(op binOp, req *Request) {
  3. req.replyc <- op(req.a, req.b)
  4. }

server() 协程会无限循环以从 chan *Request 接收请求,并且为了避免被长时间操作所堵塞,它将为每一个请求启动一个协程来做具体的工作:

  1. func server(op binOp, service chan *Request) {
  2. for {
  3. req := <-service; // requests arrive here
  4. // start goroutine for request:
  5. go run(op, req); // don’t wait for op to complete
  6. }
  7. }

server() 本身则是以协程的方式在 startServer() 函数中启动:

  1. func startServer(op binOp) chan *Request {
  2. reqChan := make(chan *Request);
  3. go server(op, reqChan);
  4. return reqChan;
  5. }

startServer() 则会在 main 协程中被调用。

在以下测试例子中,100 个请求会被发送到服务器,只有它们全部被送达后我们才会按相反的顺序检查响应:

  1. func main() {
  2. adder := startServer(func(a, b int) int { return a + b })
  3. const N = 100
  4. var reqs [N]Request
  5. for i := 0; i < N; i++ {
  6. req := &reqs[i]
  7. req.a = i
  8. req.b = i + N
  9. req.replyc = make(chan int)
  10. adder <- req // adder is a channel of requests
  11. }
  12. // checks:
  13. for i := N - 1; i >= 0; i-- {
  14. // doesn’t matter what order
  15. if <-reqs[i].replyc != N+2*i {
  16. fmt.Println(“fail at”, i)
  17. } else {
  18. fmt.Println(“Request “, i, is ok!”)
  19. }
  20. }
  21. fmt.Println(“done”)
  22. }

这些代码可以在 multiplex_server.go 找到

输出:

  1. Request 99 is ok!
  2. Request 98 is ok!
  3. ...
  4. Request 1 is ok!
  5. Request 0 is ok!
  6. done

这个程序仅启动了 100 个协程。然而即使执行 100,000 个协程我们也能在数秒内看到它完成。这说明了 Go 的协程是如何的轻量:如果我们启动相同数量的真实的线程,程序早就崩溃了。

示例: 14.14-multiplex_server.go

  1. package main
  2. import "fmt"
  3. type Request struct {
  4. a, b int
  5. replyc chan int // reply channel inside the Request
  6. }
  7. type binOp func(a, b int) int
  8. func run(op binOp, req *Request) {
  9. req.replyc <- op(req.a, req.b)
  10. }
  11. func server(op binOp, service chan *Request) {
  12. for {
  13. req := <-service // requests arrive here
  14. // start goroutine for request:
  15. go run(op, req) // don't wait for op
  16. }
  17. }
  18. func startServer(op binOp) chan *Request {
  19. reqChan := make(chan *Request)
  20. go server(op, reqChan)
  21. return reqChan
  22. }
  23. func main() {
  24. adder := startServer(func(a, b int) int { return a + b })
  25. const N = 100
  26. var reqs [N]Request
  27. for i := 0; i < N; i++ {
  28. req := &reqs[i]
  29. req.a = i
  30. req.b = i + N
  31. req.replyc = make(chan int)
  32. adder <- req
  33. }
  34. // checks:
  35. for i := N - 1; i >= 0; i-- { // doesn't matter what order
  36. if <-reqs[i].replyc != N+2*i {
  37. fmt.Println("fail at", i)
  38. } else {
  39. fmt.Println("Request ", i, " is ok!")
  40. }
  41. }
  42. fmt.Println("done")
  43. }

14.10.2 卸载 (Teardown):通过信号通道关闭服务器

在上一个版本中 server()main() 函数返回后并没有完全关闭,而被强制结束了。为了改进这一点,我们可以提供一个退出通道给 server()

  1. func startServer(op binOp) (service chan *Request, quit chan bool) {
  2. service = make(chan *Request)
  3. quit = make(chan bool)
  4. go server(op, service, quit)
  5. return service, quit
  6. }

server() 函数现在则使用 selectservice 通道和 quit 通道之间做出选择:

  1. func server(op binOp, service chan *request, quit chan bool) {
  2. for {
  3. select {
  4. case req := <-service:
  5. go run(op, req)
  6. case <-quit:
  7. return
  8. }
  9. }
  10. }

quit 通道接收到一个 true 值时,server 就会返回并结束。

main() 函数中我们做出如下更改:

  1. adder, quit := startServer(func(a, b int) int { return a + b })

main() 函数的结尾处我们放入这一行:quit <- true

完整的代码在 multiplex_server2.go,输出和上一个版本是一样的。

示例: 14.15-multiplex_server2.go

  1. package main
  2. import "fmt"
  3. type Request struct {
  4. a, b int
  5. replyc chan int // reply channel inside the Request
  6. }
  7. type binOp func(a, b int) int
  8. func run(op binOp, req *Request) {
  9. req.replyc <- op(req.a, req.b)
  10. }
  11. func server(op binOp, service chan *Request, quit chan bool) {
  12. for {
  13. select {
  14. case req := <-service:
  15. go run(op, req)
  16. case <-quit:
  17. return
  18. }
  19. }
  20. }
  21. func startServer(op binOp) (service chan *Request, quit chan bool) {
  22. service = make(chan *Request)
  23. quit = make(chan bool)
  24. go server(op, service, quit)
  25. return service, quit
  26. }
  27. func main() {
  28. adder, quit := startServer(func(a, b int) int { return a + b })
  29. const N = 100
  30. var reqs [N]Request
  31. for i := 0; i < N; i++ {
  32. req := &reqs[i]
  33. req.a = i
  34. req.b = i + N
  35. req.replyc = make(chan int)
  36. adder <- req
  37. }
  38. // checks:
  39. for i := N - 1; i >= 0; i-- { // doesn't matter what order
  40. if <-reqs[i].replyc != N+2*i {
  41. fmt.Println("fail at", i)
  42. } else {
  43. fmt.Println("Request ", i, " is ok!")
  44. }
  45. }
  46. quit <- true
  47. fmt.Println("done")
  48. }

练习 14.13 multiplex_server3.go:使用之前的例子,编写一个在 Request 结构上带有 String() 方法的版本,它能决定服务器如何输出;并使用以下两个请求来测试这个程序:

  1. req1 := &Request{3, 4, make(chan int)}
  2. req2 := &Request{150, 250, make(chan int)}
  3. ...
  4. // show the output
  5. fmt.Println(req1,"\n",req2)

链接