goroutine是Go语言实现并发编程的利器,简单的一个指令go function就能启动一个goroutine。但是,Go语言并没有提供终止goroutine的接口,也就是说,我们不能从外部去停止一个goroutine,只能由goroutine内部退出(main函数终止除外)。但是我们有很多情况下需要主动关闭goroutine,如需要实现一个系统自动熔断的功能就需要主动关闭goroutine

    一、使用channel进行控制
    Go语言有一个著名的设计哲学:Do not communicate by sharing memory; instead, share memory by communicating.——通过通信共享内存,而不是通过共享内存来进行通信。Go语言中实现goroutine之间通信的机制就是channel。因此我们可以使用channel来给goroutine发送消息来变更goroutine的行为。下面是使用channel控制的几种方式。

    1.1 for-range结构
    for-rang从channel上接收值,直到channel关闭,该结构在Go并发编程中很常用,这对于从单一通道上获取数据去执行某些任务是十分方便的。示例如下

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. var wg sync.WaitGroup
    7. func worker(ch chan int) {
    8. defer wg.Done()
    9. for value := range ch {
    10. fmt.Println(value) // do something
    11. }
    12. }
    13. func main() {
    14. ch := make(chan int)
    15. wg.Add(1)
    16. go worker(ch)
    17. for i := 0; i < 3; i++ {
    18. ch <- i
    19. }
    20. close(ch)
    21. wg.Wait()

    1.2 for-select结构
    当channel比较多时,for-range结构借不是很方便了。Go语言提供了另外一种和channel相关的语法: selectselect能够让goroutine在多个通信操作上等待(可以理解为监听多个channel)。由于这个特性,for-select结构在Go并发编程中使用的频率很高。我在使用Go的开发中,这是我用的最多的一种组合形式:

    1. for {
    2. select {
    3. }
    4. }

    for-select的使用十分灵活,这里我举两个例子

    1.2.1 指定一个退出通道
    对于for-select结构,一般我会定义一个特定的退出通道,用于接收退出的信号,如quit。退出通道的使用也分两情况,下面看两个示例。

    • 向退出通道发送退出信号

      1. package main
      2. import (
      3. "fmt"
      4. "sync"
      5. "time"
      6. )
      7. var wg sync.WaitGroup
      8. func worker(in, quit <-chan int) {
      9. defer wg.Done()
      10. for {
      11. select {
      12. case <-quit:
      13. fmt.Println("收到退出信号")
      14. return // 必须return,否则goroutine是不会结束的
      15. case v := <-in:
      16. fmt.Println(v)
      17. }
      18. }
      19. }
      20. func main() {
      21. quit := make(chan int) // 退出通道
      22. in := make(chan int)
      23. wg.Add(1)
      24. go worker(in, quit)
      25. for i := 0; i < 3; i++ {
      26. in <- i
      27. time.Sleep(1 * time.Second)
      28. }
      29. quit <- 1 // 向quit通道发送退出信号
      30. wg.Wait()
      31. }
    • 关闭退出通道

    上面这个例子中,如果启动了100个groutine,那么我们就需要向quit通道中发送100次数据,这就很麻烦。怎么办呢?很简单,关闭channel,这样所有监听quit channel的goroutine就都会收到关闭信号。上面的代码只要做一个很小的替换就能工作:

    1. // wg.Add(1)
    2. wg.Add(100) //前提是你真的有100个goroutine
    3. // quit <- 1 替换为下面的代码
    4. close(quit)

    1.2.2 多个channel都关闭才能退出
    上面讲了定义一个特定的退出通道的方法。这里再讲另一个场景,如果select上监听了多个通道,需要所有的通道都关闭后才能结束goroutine, 这种要如何处理呢?
    这里就利用select的一个特性,select不会在nil的通道上进行等待,因此将channel赋值为nil即可。此外,还需要利用channel的ok值。

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. var wg sync.WaitGroup
    8. func worker(in1, in2 <-chan int) {
    9. defer wg.Done()
    10. for {
    11. select {
    12. case v, ok := <-in1:
    13. if !ok {
    14. fmt.Println("收到退出信号")
    15. in1 = nil
    16. }
    17. // do something
    18. fmt.Println(v)
    19. case v, ok := <-in2:
    20. if !ok {
    21. fmt.Println("收到退出信号")
    22. in2 = nil
    23. }
    24. // do something
    25. fmt.Println(v)
    26. }
    27. // select已经结束,我们需要判断两个通道的状态
    28. // 都为nil则结束当前goroutine
    29. if in1 == nil && in2 == nil {
    30. return
    31. }
    32. }
    33. }
    34. func main() {
    35. in1 := make(chan int) // 退出通道,接收
    36. in2 := make(chan int)
    37. wg.Add(2)
    38. go worker(in1, in2)
    39. go worker(in2, in2)
    40. for i := 0; i < 3; i++ {
    41. in1 <- i
    42. time.Sleep(1 * time.Second)
    43. in2 <- i
    44. }
    45. close(in1)
    46. close(in2)
    47. wg.Wait()
    48. }

    二、使用context包
    context包是官方提供的一个用于控制多个goroutine写作的包,篇幅受限,这里只举一个例子,这个例子说明了2个问题:

    1. 使用context的cancel信号,可以终止goroutine的运行
    2. context是可以向下传递的

      1. package main
      2. import (
      3. "context"
      4. "fmt"
      5. "sync"
      6. )
      7. var wg sync.WaitGroup
      8. func gen(ctx context.Context) <-chan int {
      9. // 创建子context
      10. subCtx, _ := context.WithCancel(ctx)
      11. go sub(subCtx) // 这里使用ctx,也能给goroutine通知
      12. dst := make(chan int)
      13. n := 1
      14. go func() {
      15. defer wg.Done()
      16. for {
      17. select {
      18. case <-ctx.Done():
      19. fmt.Println("end")
      20. return // return,防止goroutine泄露
      21. case dst <- n:
      22. n++
      23. }
      24. }
      25. }()
      26. return dst
      27. }
      28. func sub(ctx context.Context) {
      29. defer wg.Done()
      30. for {
      31. select {
      32. case <-ctx.Done():
      33. fmt.Println("end too")
      34. return // returning not to leak the goroutine
      35. default:
      36. fmt.Println("test")
      37. }
      38. }
      39. }
      40. func main() {
      41. wg.Add(2)
      42. ctx, cancel := context.WithCancel(context.Background())
      43. for n := range gen(ctx) {
      44. fmt.Println(n)
      45. if n == 5 {
      46. break
      47. }
      48. }
      49. cancel()
      50. wg.Wait()
      51. }

    三、总结
    在Go语言的并发编程中,goroutine的启动十分方便,但是goroutine的管理是需要自己去编程实现的。尤其是在多个goroutine协作时,更需要小心谨慎处理,否则程序会有意想不到的bug。
    本文主要描述了如何实现从外部主动关闭goroutine的2种方式:

    • channel
    • context

    主动关闭goroutine除了实现特定功能外,还能提升程序性能。goroutine由于某种原因阻塞,不能继续运行,此时程序应该干预,将goroutine结束,而不是让他一直阻塞,如果此类goroutine很多,会耗费更多的资源。因此,有效的管理goroutine是十分有必要的。

    https://zhuanlan.zhihu.com/p/66659719