许世伟的架构课

:::tips 对于下面的讨论, 一定要记住是在 讨论 进程内!

:::

锁机制

锁真的慢吗

经常看到这样的忠告

不要通过共享内存(锁)来通信, 要通过通信(channel)来共享内存

对于我这样的菜鸟来说, 以前也确实是这样想的, 这就体现了 知其然不知其所以然 实际上由于对 机制和 channel的误解比较深

锁真正的危害

对于进程内通讯的原语来说, 锁并不慢, 在这个条件下, 只有原子操作会比锁更快

channel本身是共享变量, channel的操作也是必然有锁的

  • 锁真正的危害在于不容易控制, 锁之后忘记解锁或意外导致未解锁才是锁的危害
  • 锁中不要执行费时的操作

执行体

执行体的互斥

如果一组数据的并发访问,符合大部分情况下是读, 少量情况有写这种读写操作那么应该用读写锁

读锁

  1. mutex.RLock()
  2. defer mutex.RUnlock()
  3. doReadOnlyThings

写锁

  1. mutex.Lock()
  2. defer mutex.Unlock()
  3. doWriteThings

写操作和普通锁一样, 整体来说是这样的

读锁阻止写操作

写锁阻止读和写操作

执行体的同步

信号量

一个常用的场景就是将一个任务分为多个小人务, 分配给n个执行体并行做

大概如下

  1. func (wg *WaitGroup) Add(n int)
  2. func (wg *WaitGroup) Done()
  3. func (wg *WaitGroup) Wait()

用法大概如下

  1. var wg WaitGroup
  2. ...
  3. wg.Add(n)
  4. for 循环n {
  5. go func() {
  6. defer wg.Done()
  7. doTaski // 执行第i个任务
  8. }()
  9. }
  10. wg.Wait()

条件变量(Condtion Variable)

:::tips 变量: 一组要在多个执行体之间协同的数据

条件: 做任务前的前置条件, 和做任务时需要唤醒其他人的 唤醒条件

:::

一个更通用的同步原语, 我们用它来模拟一下 channel的通讯机制

  1. func NewCond(l Locker) *Cond
  2. func (c *Cond) Broadcast()
  3. func (c *Cond) Signal()
  4. func (c *Cond) Wait()

初始化, 初始化时需要传入一个互斥体, 可以是普通锁, 也可以是读写锁

  1. var mutex sync.Mutex // 或者 sync.RWMutex
  2. var cond = sync.NewCond(&mutex)
  3. ...

传入锁的原因: cond.Wait()中需要

  1. 把自己加入到挂起队列
  2. mutex.Unlock()
  3. 等待被唤醒 // 挂起的执行体会被后续的 cond.Broadcast 或 cond.Signal() 唤醒
  4. mutex.Lock()

条件变量的使用方法大致如下

  1. mutex.Lock()
  2. defer mutex.Unlock()
  3. for conditionNotMetToDo {
  4. cond.Wait()
  5. }
  6. doSomething
  7. if conditionNeedNotify {
  8. cond.Broadcast()
  9. // 有时可以优化为 cond.Signal()
  10. }
  1. 加锁
  2. 用一个循环判断是否能do, 如果不行就调用cond.Wait()进行等待
    1. 这里使用for是因为cond.Wait()获得权限后不一定就一定是可以继续do了, 所以要再次判断
  3. doSomething
  4. 如果挂起队列中的部分执行体满足了重新执行的条件, 就用 cond.Broadcastcond.Signal唤醒他们
    1. cond.Broadcast会唤醒所有在这个条件变量挂起的执行体
    2. cond.Signal只会唤醒一个
      1. 挂起在这个条件变量上的执行体,他们的条件是一致的
      2. 本次doSometing完成后, 所释放的资源只能够一个执行体来做事情

实现简易Channel的机制(不是真正的channel实现机制)

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type Queue struct {
  7. element []interface{}
  8. }
  9. //创建一个新队列
  10. func NewQueue()*Queue{
  11. return &Queue{}
  12. }
  13. //判断队列是否为空
  14. func (s *Queue)IsEmpty()bool{
  15. if len(s.element) == 0 {
  16. return true
  17. }else {
  18. return false
  19. }
  20. }
  21. //求队列的长度
  22. func (s *Queue)Len()int{
  23. return len(s.element)
  24. }
  25. //进队操作
  26. func (s *Queue)Push(value interface{}) {
  27. s.element = append(s.element, value)
  28. }
  29. //出队操作
  30. func (s *Queue)Pop()bool{
  31. if s.IsEmpty(){
  32. return false
  33. }else{
  34. s.element = s.element[1:]
  35. }
  36. return true
  37. }
  38. //打印队列
  39. func (s *Queue)Print(){
  40. for i := 0;i <= s.Len()-1;i++{
  41. fmt.Printf("%d ", s.element[i])
  42. }
  43. fmt.Printf("\n")
  44. }
  45. type Channel struct {
  46. mutex sync.Mutex
  47. cond *sync.Cond
  48. queue *Queue
  49. n int
  50. }
  51. func NewChannel(n int) *Channel{
  52. if n < 1 {
  53. panic("todo: support unbuffered channel")
  54. }
  55. c := new(Channel)
  56. c.cond = sync.NewCond(&c.mutex)
  57. c.queue = NewQueue()
  58. c.n = n
  59. return c
  60. }
  61. func (c *Channel) Push(v interface{}) {
  62. c.mutex.Lock()
  63. defer c.mutex.Unlock()
  64. for c.queue.Len() == c.n { // 等待队列不满
  65. c.cond.Wait()
  66. }
  67. if c.queue.Len() == 0 {// 队列为空, 可能有人在等待数据, 通知它们
  68. c.cond.Broadcast()
  69. }
  70. c.queue.Push(v)
  71. }
  72. func (c *Channel) Pop() (v interface{}) {
  73. c.mutex.Lock()
  74. defer c.mutex.Unlock()
  75. for c.queue.Len() == 0 { // 等待对列不为空
  76. c.cond.Wait()
  77. }
  78. if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们
  79. c.cond.Broadcast()
  80. }
  81. return c.queue.Pop()
  82. }
  83. func (c *Channel) TryPop() (v interface{}, ok bool) {
  84. c.mutex.Lock()
  85. defer c.mutex.Unlock()
  86. for c.queue.Len() == 0 { // 等待对列不为空
  87. return
  88. } // 队列空直接返回
  89. if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们
  90. c.cond.Broadcast()
  91. }
  92. return c.queue.Pop(), true
  93. }
  94. func (c *Channel) TryPush(v interface{})(ok bool) {
  95. c.mutex.Lock()
  96. defer c.mutex.Unlock()
  97. for c.queue.Len() == c.n{ // 等待对列不为空
  98. return
  99. }// 对列满 直接返回
  100. if c.queue.Len() == 0 {// 队列空, 可能有人在等待数据, 通知它们
  101. c.cond.Broadcast()
  102. }
  103. c.queue.Push(v)
  104. return true
  105. }
  106. func main() {
  107. }

这个是有缓冲的channel 不支持 0的情况

执行体的通讯

管道

大致如下

  1. func Pipe() (pr *PipeReader , pw PipeWriter)

先调用 pr, pw:= io.Pipe()得到管道写入端和读出端, 分别给两个并行的goroutine, 一个负责读 , 一个负责写

例:读写转换

现在我有一个算法

  1. func Foo(w io.Writer) error

但是这个数据流的输入是 io.Reader

  1. func Bar(r io.Reader)

使用pipe串联它们

  1. func FooReader() io.ReadCloser{
  2. pr , pw := io.Pipe()
  3. go func(){
  4. err := Foo(pw)
  5. pw.CloseWithError(err)
  6. }()
  7. return pr
  8. }