为什么需要mutex

相比于 Go 语言宣扬的“用通讯的方式共享数据”,通过共享数据的方式来传递信息和协调线程运行的做法其实更加主流,毕竟大多数的现代编程语言,都是用后一种方式作为并发编程的解决方案的(这种方案的历史非常悠久,恐怕可以追溯到上个世纪多进程编程时代伊始了)

同步的用途有两个,一个是避免多个线程在同一时刻操作同一个数据块,另一个是协调多个线程,以避免它们在同一时刻执行同一个代码块。

使用互斥锁的注意事项

  • 不要重复锁定互斥锁
  • 不要忘记解锁互斥锁,必要时使用defer语句
  • 不要对尚未锁定或者已解锁的互斥锁解锁
  • 不要在多个函数之间直接传递互斥锁


读写锁

  • 对于某个受到读写锁保护的共享资源,多个写操作不能同时进行
  • 写操作和读操作也不能同时进行,但多个读操作却可以同时进行
  • 对写锁进行解锁,会唤醒“所有因试图锁定读锁,而被阻塞的 goroutine”
  • 对读锁进行解锁,只会在没有其他读锁锁定的前提下,唤醒“因试图锁定写锁,而被阻塞的 goroutine”;并且,最终只会有一个被唤醒的 goroutine 能够成功完成对写锁的锁定,其他的 goroutine 还要在原处继续等待。至于是哪一个 goroutine,那就要看谁的等待时间最长


  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "sync"
  10. )
  11. var protecting uint
  12. func init() {
  13. flag.UintVar(&protecting, "protecting", 0, "It indicates whether to use a mutex to protect data writing.")
  14. }
  15. func main() {
  16. flag.Parse()
  17. var buffer bytes.Buffer
  18. const (
  19. max1 = 5 // 代表启动goroutine的数量
  20. max2 = 10 // 代表每个goruntine需要写入的数据块的数量
  21. max3 = 10 // 代表每个数据块有多少重复的数字
  22. )
  23. var mu sync.Mutex
  24. sign := make(chan struct{}, max1)
  25. for i := 1; i <= max1; i++ {
  26. go func(id int, writer io.Writer) {
  27. defer func() {
  28. sign <- struct{}{}
  29. }()
  30. for j := 1; j <= max2; j ++ {
  31. header := fmt.Sprintf("\n[id: %d, iteration: %d]", id , j)
  32. data := fmt.Sprintf(" %d", id * j)
  33. if protecting > 0 {
  34. mu.Lock()
  35. }
  36. _, err := writer.Write([]byte(header))
  37. if err != nil {
  38. log.Printf("error: %s [%d]", err, id)
  39. }
  40. for k := 0; k < max3; k++ {
  41. _, err := writer.Write([]byte(data))
  42. if err != nil {
  43. log.Printf("error: %s [%d]", err, id)
  44. }
  45. }
  46. if protecting > 0 {
  47. mu.Unlock()
  48. }
  49. }
  50. }(i, &buffer)
  51. }
  52. for i := 0; i < max1; i++ {
  53. <- sign
  54. }
  55. data, err := ioutil.ReadAll(&buffer)
  56. if err != nil {
  57. log.Fatalf("fatal error: %s", err)
  58. }
  59. log.Printf("The contents:\n%s",data)
  60. }
  1. package main
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "sync"
  9. "time"
  10. )
  11. // singleHandler 代表处理函数的类型
  12. type singleHandler func() (data string, n int, err error)
  13. // handlerConfig 代表处理流程配置的类型
  14. type handlerConfig struct {
  15. handler singleHandler // 单次处理函数
  16. goNum int // 需要启用的goroutine的数量
  17. number int // 单个goroutine中处理次数
  18. interval time.Duration // 单个goroutine中处理的时间间隔
  19. counter int // 数据量计数器,以字节为单位
  20. counterMu sync.Mutex // 数据量计数器专用互斥锁
  21. }
  22. // count 会增加计数器的值,并会返回增加后的计数
  23. func (hc *handlerConfig) count(increment int) int {
  24. hc.counterMu.Lock()
  25. defer hc.counterMu.Unlock()
  26. hc.counter += increment
  27. return hc.counter
  28. }
  29. func main() {
  30. // mu 代表以下流程要使用的互斥锁
  31. var mu sync.Mutex
  32. // genWriter 代表的是用于生成写入函数的函数
  33. genWriter := func(writer io.Writer) singleHandler {
  34. return func() (data string, n int, err error) {
  35. // 准备数据
  36. data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano))
  37. // 写入数据
  38. mu.Lock()
  39. defer mu.Unlock()
  40. n, err = writer.Write([]byte(data))
  41. return
  42. }
  43. }
  44. // genReader 代表的是用于生成读取函数的函数
  45. genReader := func(reader io.Reader) singleHandler{
  46. return func() (data string, n int, err error) {
  47. buffer, ok := reader.(*bytes.Buffer)
  48. if !ok {
  49. err = errors.New("unsupported reader")
  50. return
  51. }
  52. // 读取数据
  53. mu.Lock()
  54. defer mu.Unlock()
  55. data, err = buffer.ReadString('\t')
  56. n = len(data)
  57. return
  58. }
  59. }
  60. // buffer 代表缓冲区
  61. var buffer bytes.Buffer
  62. // 数据写入配置
  63. writingConfig := handlerConfig {
  64. handler: genWriter(&buffer),
  65. goNum: 5,
  66. number: 4,
  67. interval: time.Millisecond * 100,
  68. }
  69. // sign 代表信号的通道
  70. readingConfig := handlerConfig{
  71. handler: genReader(&buffer),
  72. goNum: 10,
  73. number: 2,
  74. interval: time.Millisecond * 100,
  75. }
  76. // sign 代表信号的通道
  77. sign := make(chan struct{}, writingConfig.goNum + readingConfig.goNum)
  78. // 启动多个goroutine对换重组进行多次数据写入
  79. for i := 1; i <= writingConfig.goNum; i++ {
  80. go func(i int) {
  81. defer func() {
  82. sign <- struct{}{}
  83. }()
  84. for j := 1; j <= writingConfig.number; j ++ {
  85. time.Sleep(writingConfig.interval)
  86. data, n, err := writingConfig.handler()
  87. if err != nil {
  88. log.Printf("writer [%d-%d]: error: %s", i, j, err)
  89. continue
  90. }
  91. total := writingConfig.count(n)
  92. log.Printf("writer [%d-%d]: %s (total: %d)", i, j, data, total)
  93. }
  94. }(i)
  95. }
  96. // 启用多个goroutine对缓冲区进行多次数据读取
  97. for i := 1; i <= readingConfig.goNum; i++ {
  98. go func(i int){
  99. defer func() {
  100. sign <- struct{}{}
  101. }()
  102. for j := 1; j<= readingConfig.number; j ++ {
  103. time.Sleep(readingConfig.interval)
  104. var data string
  105. var n int
  106. var err error
  107. for {
  108. data, n, err = readingConfig.handler()
  109. if err == nil || err != io.EOF {
  110. break
  111. }
  112. // 如果读比写快(读时会发生EOF错误),那就等一会再读
  113. time.Sleep(readingConfig.interval)
  114. }
  115. if err != nil {
  116. log.Printf("read [%d-%d]: error: %s", i, j, err)
  117. continue
  118. }
  119. total := readingConfig.count(n)
  120. log.Printf("reader [%d-%d]: %s (totl: %d)", i, j, data, total)
  121. }
  122. }(i)
  123. }
  124. // signNumber 代表需要接受信号的数量
  125. signNumber := writingConfig.goNum + readingConfig.goNum
  126. // 等待上面启用的所有goroutine的运行全部结束
  127. for j := 0; j < signNumber; j ++ {
  128. <-sign
  129. }
  130. }