<参考>如何优雅地关闭channel

关闭channel的基本原则

只在发送端关闭channel,而且只能在有且只有一个发送者的情况下

多sender或者在receive端关闭

比较粗暴的方式

如果必须在接收方关闭,或者在多sender中的任意一个sender中关闭channel,可以尝试使用recover的方式处理panic(which is not very recommended),例如

  1. func SafeClose(ch chan T) (justClosed bool) {
  2. defer func() {
  3. if recover() != nil {
  4. // The return result can be altered
  5. // in a defer function call.
  6. justClosed = false
  7. }
  8. }()
  9. // assume ch != nil here.
  10. close(ch) // panic if ch is closed
  11. return true // <=> justClosed = true; return
  12. }

同样的,相同的方法可以用来处理向可能已经关闭了的ch中发送内容

  1. func SafeSend(ch chan T, value T) (closed bool) {
  2. defer func() {
  3. if recover() != nil {
  4. closed = true
  5. }
  6. }()
  7. ch <- value // panic if ch is closed
  8. return false // <=> closed = false; return
  9. }

比较优雅的方式

sync.Once

  1. type MyChannel struct {
  2. C chan T
  3. once sync.Once
  4. }
  5. func NewMyChannel() *MyChannel {
  6. return &MyChannel{C: make(chan T)}
  7. }
  8. func (mc *MyChannel) SafeClose() {
  9. mc.once.Do(func() {
  10. close(mc.C)
  11. })
  12. }

但是如果用once,再多sender的情况下,还是会造成其他sender向已经关闭了的channel中塞数据,所以使用一个锁是不错的选择

  1. type MyChannel struct {
  2. C chan T
  3. closed bool
  4. mutex sync.Mutex
  5. }
  6. func NewMyChannel() *MyChannel {
  7. return &MyChannel{C: make(chan T)}
  8. }
  9. func (mc *MyChannel) SafeClose() {
  10. mc.mutex.Lock()
  11. defer mc.mutex.Unlock()
  12. if !mc.closed {
  13. close(mc.C)
  14. mc.closed = true
  15. }
  16. }
  17. func (mc *MyChannel) IsClosed() bool {
  18. mc.mutex.Lock()
  19. defer mc.mutex.Unlock()
  20. return mc.closed
  21. }

再次强调,不推荐在接收方close(ch)

优雅的方式

一个sender,多个receiver

最简单的一个模型,sender关闭channel即可

一个receiver,多个sender

可以通过关闭额外的一个channel去通知那多个sender

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. )
  8. func main() {
  9. rand.Seed(time.Now().UnixNano())
  10. log.SetFlags(0)
  11. // ...
  12. const MaxRandomNumber = 100000
  13. const NumSenders = 1000
  14. wgReceivers := sync.WaitGroup{}
  15. wgReceivers.Add(1)
  16. // ...
  17. dataCh := make(chan int, 100)
  18. stopCh := make(chan struct{})
  19. // stopCh is an additional signal channel.
  20. // Its sender is the receiver of channel dataCh.
  21. // Its reveivers are the senders of channel dataCh.
  22. // senders
  23. for i := 0; i < NumSenders; i++ {
  24. go func() {
  25. for {
  26. // The first select is to try to exit the goroutine
  27. // as early as possible. In fact, it is not essential
  28. // for this specified example, so it can be omitted.
  29. select {
  30. case <- stopCh:
  31. return
  32. default:
  33. }
  34. // Even if stopCh is closed, the first branch in the
  35. // second select may be still not selected for some
  36. // loops if the send to dataCh is also unblocked.
  37. // But this is acceptable for this example, so the
  38. // first select block above can be omitted.
  39. select {
  40. case <- stopCh:
  41. return
  42. case dataCh <- rand.Intn(MaxRandomNumber):
  43. }
  44. }
  45. }()
  46. }
  47. // the receiver
  48. go func() {
  49. defer wgReceivers.Done()
  50. for value := range dataCh {
  51. if value == MaxRandomNumber-1 {
  52. // The receiver of the dataCh channel is
  53. // also the sender of the stopCh channel.
  54. // It is safe to close the stop channel here.
  55. close(stopCh)
  56. return
  57. }
  58. log.Println(value)
  59. }
  60. }()
  61. // ...
  62. wgReceivers.Wait()
  63. }

在golang中可以不用关闭channel,channel在不被任何goroutine使用的时候,最后都会被垃圾回收机制回收,无论channel已经关闭(原作者说的,我还不太确定,因为还没找到相关资料也还没去看源码)

多个receiver,多个sender

可以由任意一个玩家(receiver或者sender皆可)告诉一个管理者(可以单独启一个goroutine之类),然后管理者去关闭一个signalChannel来达到关闭所有channel的目的

  1. package main
  2. import (
  3. "time"
  4. "math/rand"
  5. "sync"
  6. "log"
  7. "strconv"
  8. )
  9. func main() {
  10. rand.Seed(time.Now().UnixNano())
  11. log.SetFlags(0)
  12. // ...
  13. const MaxRandomNumber = 100000
  14. const NumReceivers = 10
  15. const NumSenders = 1000
  16. wgReceivers := sync.WaitGroup{}
  17. wgReceivers.Add(NumReceivers)
  18. // ...
  19. dataCh := make(chan int, 100)
  20. stopCh := make(chan struct{})
  21. // stopCh is an additional signal channel.
  22. // Its sender is the moderator goroutine shown below.
  23. // Its reveivers are all senders and receivers of dataCh.
  24. toStop := make(chan string, 1)
  25. // The channel toStop is used to notify the moderator
  26. // to close the additional signal channel (stopCh).
  27. // Its senders are any senders and receivers of dataCh.
  28. // Its reveiver is the moderator goroutine shown below.
  29. var stoppedBy string
  30. // moderator
  31. go func() {
  32. stoppedBy = <-toStop
  33. close(stopCh)
  34. }()
  35. // senders
  36. for i := 0; i < NumSenders; i++ {
  37. go func(id string) {
  38. for {
  39. value := rand.Intn(MaxRandomNumber)
  40. if value == 0 {
  41. // Here, a trick is used to notify the moderator
  42. // to close the additional signal channel.
  43. select {
  44. case toStop <- "sender#" + id:
  45. default:
  46. }
  47. return
  48. }
  49. // The first select here is to try to exit the goroutine
  50. // as early as possible. This select blocks with one
  51. // receive operation case and one default branches will
  52. // be specially optimized as a try-receive operation by
  53. // the standard Go compiler.
  54. select {
  55. case <- stopCh:
  56. return
  57. default:
  58. }
  59. // Even if stopCh is closed, the first branch in the
  60. // second select may be still not selected for some
  61. // loops (and for ever in theory) if the send to
  62. // dataCh is also non-blocking.
  63. // This is why the first select block above is needed.
  64. select {
  65. case <- stopCh:
  66. return
  67. case dataCh <- value:
  68. }
  69. }
  70. }(strconv.Itoa(i))
  71. }
  72. // receivers
  73. for i := 0; i < NumReceivers; i++ {
  74. go func(id string) {
  75. defer wgReceivers.Done()
  76. for {
  77. // Same as the sender goroutine, the first select here
  78. // is to try to exit the goroutine as early as possible.
  79. // This select blocks with one send operation case and
  80. // one default branches will be specially optimized as
  81. // a try-send operation by the standard Go compiler.
  82. select {
  83. case <- stopCh:
  84. return
  85. default:
  86. }
  87. // Even if stopCh is closed, the first branch in the
  88. // second select may be still not selected for some
  89. // loops (and for ever in theory) if the receive from
  90. // dataCh is also non-blocking.
  91. // This is why the first select block is needed.
  92. select {
  93. case <- stopCh:
  94. return
  95. case value := <-dataCh:
  96. if value == MaxRandomNumber-1 {
  97. // The same trick is used to notify
  98. // the moderator to close the
  99. // additional signal channel.
  100. select {
  101. case toStop <- "receiver#" + id:
  102. default:
  103. }
  104. return
  105. }
  106. log.Println(value)
  107. }
  108. }
  109. }(strconv.Itoa(i))
  110. }
  111. // ...
  112. wgReceivers.Wait()
  113. log.Println("stopped by", stoppedBy)
  114. }