案例一(累加)

    1. func main() {
    2. var counter Counter
    3. var wg sync.WaitGroup
    4. wg.Add(10)
    5. for i := 0; i < 10; i++ {
    6. go func() {
    7. defer wg.Done()
    8. for j := 0; j < 100000; j++ {
    9. counter.Lock()
    10. counter.Count++
    11. counter.Unlock()
    12. }
    13. }()
    14. }
    15. wg.Wait()
    16. fmt.Println(counter.Count)
    17. }
    18. type Counter struct {
    19. sync.Mutex
    20. Count uint64
    21. }
    1. func main() {
    2. // 封装好的计数器
    3. var counter Counter
    4. var wg sync.WaitGroup
    5. wg.Add(10)
    6. // 启动10个goroutine
    7. for i := 0; i < 10; i++ {
    8. go func() {
    9. defer wg.Done()
    10. // 执行10万次累加
    11. for j := 0; j < 100000; j++ {
    12. counter.Incr() // 受到锁保护的方法
    13. }
    14. }()
    15. }
    16. wg.Wait()
    17. fmt.Println(counter.Count())
    18. }
    19. // 线程安全的计数器类型
    20. type Counter struct {
    21. CounterType int
    22. Name string
    23. mu sync.Mutex
    24. count uint64
    25. }
    26. // 加1的方法,内部使用互斥锁保护
    27. func (c *Counter) Incr() {
    28. c.mu.Lock()
    29. c.count++
    30. c.mu.Unlock()
    31. }
    32. // 得到计数器的值,也需要锁保护
    33. func (c *Counter) Count() uint64 {
    34. c.mu.Lock()
    35. defer c.mu.Unlock()
    36. return c.count
    37. }

    案例二(Mutex)互斥锁,一定要遵循“谁申请,谁释放”的原则

    1. // CAS操作,当时还没有抽象出atomic包
    2. func cas(val *int32, old, new int32) bool
    3. func semacquire(*int32)
    4. func semrelease(*int32)
    5. // 互斥锁的结构,包含两个字段
    6. type Mutex struct {
    7. key int32 // 锁是否被持有的标识
    8. sema int32 // 信号量专用,用以阻塞/唤醒goroutine
    9. }
    10. // 保证成功在val上增加delta的值
    11. func xadd(val *int32, delta int32) (new int32) {
    12. for {
    13. v := *val
    14. if cas(val, v, v+delta) {
    15. return v + delta
    16. }
    17. }
    18. panic("unreached")
    19. }
    20. // 请求锁
    21. func (m *Mutex) Lock() {
    22. if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁
    23. return
    24. }
    25. semacquire(&m.sema) // 否则阻塞等待
    26. }
    27. func (m *Mutex) Unlock() {
    28. if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者
    29. return
    30. }
    31. semrelease(&m.sema) // 唤醒其它阻塞的goroutine
    32. }

    优化

    1. type Mutex struct {
    2. state int32
    3. sema uint32
    4. }
    5. const (
    6. mutexLocked = 1 << iota // mutex is locked
    7. mutexWoken
    8. mutexWaiterShift = iota
    9. )
    1. func (m *Mutex) Lock() {
    2. // Fast path: 幸运case,能够直接获取到锁
    3. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    4. return
    5. }
    6. awoke := false
    7. for {
    8. old := m.state
    9. new := old | mutexLocked // 新状态加锁
    10. if old&mutexLocked != 0 {
    11. new = old + 1<<mutexWaiterShift //等待者数量加一
    12. }
    13. if awoke {
    14. // goroutine是被唤醒的,
    15. // 新状态清除唤醒标志
    16. new &^= mutexWoken
    17. }
    18. if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
    19. if old&mutexLocked == 0 { // 锁原状态未加锁
    20. break
    21. }
    22. runtime.Semacquire(&m.sema) // 请求信号量
    23. awoke = true
    24. }
    25. }
    26. }
    1. func (m *Mutex) Unlock() {
    2. // Fast path: drop lock bit.
    3. new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志
    4. if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁
    5. panic("sync: unlock of unlocked mutex")
    6. }
    7. old := new
    8. for {
    9. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁
    10. return
    11. }
    12. new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志
    13. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    14. runtime.Semrelease(&m.sema)
    15. return
    16. }
    17. old = m.state
    18. }
    19. }

    自旋

    1. func (m *Mutex) Lock() {
    2. // Fast path: 幸运之路,正好获取到锁
    3. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    4. return
    5. }
    6. awoke := false
    7. iter := 0
    8. for { // 不管是新来的请求锁的goroutine, 还是被唤醒的goroutine,都不断尝试请求锁
    9. old := m.state // 先保存当前锁的状态
    10. new := old | mutexLocked // 新状态设置加锁标志
    11. if old&mutexLocked != 0 { // 锁还没被释放
    12. if runtime_canSpin(iter) { // 还可以自旋
    13. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    14. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    15. awoke = true
    16. }
    17. runtime_doSpin()
    18. iter++
    19. continue // 自旋,再次尝试请求锁
    20. }
    21. new = old + 1<<mutexWaiterShift
    22. }
    23. if awoke { // 唤醒状态
    24. if new&mutexWoken == 0 {
    25. panic("sync: inconsistent mutex state")
    26. }
    27. new &^= mutexWoken // 新状态清除唤醒标记
    28. }
    29. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    30. if old&mutexLocked == 0 { // 旧状态锁已释放,新状态成功持有了锁,直接返回
    31. break
    32. }
    33. runtime_Semacquire(&m.sema) // 阻塞等待
    34. awoke = true // 被唤醒
    35. iter = 0
    36. }
    37. }
    38. }

    饥饿模式
    只需要记住,Mutex 绝不容忍一个 goroutine 被落下,永远没有机会获取锁。不抛弃不放弃是它的宗旨,而且它也尽可能地让等待较长的 goroutine 更有机会获取到锁。

    1. type Mutex struct {
    2. state int32
    3. sema uint32
    4. }
    5. const (
    6. mutexLocked = 1 << iota // mutex is locked
    7. mutexWoken
    8. mutexStarving // 从state字段中分出一个饥饿标记
    9. mutexWaiterShift = iota
    10. starvationThresholdNs = 1e6
    11. )
    12. func (m *Mutex) Lock() {
    13. // Fast path: 幸运之路,一下就获取到了锁
    14. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    15. return
    16. }
    17. // Slow path:缓慢之路,尝试自旋竞争或饥饿状态下饥饿goroutine竞争
    18. m.lockSlow()
    19. }
    20. func (m *Mutex) lockSlow() {
    21. var waitStartTime int64
    22. starving := false // 此goroutine的饥饿标记
    23. awoke := false // 唤醒标记
    24. iter := 0 // 自旋次数
    25. old := m.state // 当前的锁的状态
    26. for {
    27. // 锁是非饥饿状态,锁还没被释放,尝试自旋
    28. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    29. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    30. atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    31. awoke = true
    32. }
    33. runtime_doSpin()
    34. iter++
    35. old = m.state // 再次获取锁的状态,之后会检查是否锁被释放了
    36. continue
    37. }
    38. new := old
    39. if old&mutexStarving == 0 {
    40. new |= mutexLocked // 非饥饿状态,加锁
    41. }
    42. if old&(mutexLocked|mutexStarving) != 0 {
    43. new += 1 << mutexWaiterShift // waiter数量加1
    44. }
    45. if starving && old&mutexLocked != 0 {
    46. new |= mutexStarving // 设置饥饿状态
    47. }
    48. if awoke {
    49. if new&mutexWoken == 0 {
    50. throw("sync: inconsistent mutex state")
    51. }
    52. new &^= mutexWoken // 新状态清除唤醒标记
    53. }
    54. // 成功设置新状态
    55. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    56. // 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
    57. if old&(mutexLocked|mutexStarving) == 0 {
    58. break // locked the mutex with CAS
    59. }
    60. // 处理饥饿状态
    61. // 如果以前就在队列里面,加入到队列头
    62. queueLifo := waitStartTime != 0
    63. if waitStartTime == 0 {
    64. waitStartTime = runtime_nanotime()
    65. }
    66. // 阻塞等待
    67. runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    68. // 唤醒之后检查锁是否应该处于饥饿状态
    69. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    70. old = m.state
    71. // 如果锁已经处于饥饿状态,直接抢到锁,返回
    72. if old&mutexStarving != 0 {
    73. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
    74. throw("sync: inconsistent mutex state")
    75. }
    76. // 有点绕,加锁并且将waiter数减1
    77. delta := int32(mutexLocked - 1<<mutexWaiterShift)
    78. if !starving || old>>mutexWaiterShift == 1 {
    79. delta -= mutexStarving // 最后一个waiter或者已经不饥饿了,清除饥饿标记
    80. }
    81. atomic.AddInt32(&m.state, delta)
    82. break
    83. }
    84. awoke = true
    85. iter = 0
    86. } else {
    87. old = m.state
    88. }
    89. }
    90. }
    91. func (m *Mutex) Unlock() {
    92. // Fast path: drop lock bit.
    93. new := atomic.AddInt32(&m.state, -mutexLocked)
    94. if new != 0 {
    95. m.unlockSlow(new)
    96. }
    97. }
    98. func (m *Mutex) unlockSlow(new int32) {
    99. if (new+mutexLocked)&mutexLocked == 0 {
    100. throw("sync: unlock of unlocked mutex")
    101. }
    102. if new&mutexStarving == 0 {
    103. old := new
    104. for {
    105. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    106. return
    107. }
    108. new = (old - 1<<mutexWaiterShift) | mutexWoken
    109. if atomic.CompareAndSwapInt32(&m.state, old, new) {
    110. runtime_Semrelease(&m.sema, false, 1)
    111. return
    112. }
    113. old = m.state
    114. }
    115. } else {
    116. runtime_Semrelease(&m.sema, true, 1)
    117. }
    118. }

    案例三 Mutex 4种错误场景

    1. 第一种
    2. 代码中有太多的 if-else 分支,可能在某个分支中漏写了 Unlock;
    3. 在重构的时候把 Unlock 给删除了;
    4. Unlock 误写成了 Lock;
    5. 第二种
    6. 误用是 Copy 已使用的 Mutex;
    7. 第三种
    8. Mutex 不是可重入的锁。
    9. 第四种
    10. 死锁。

    例如:

    1. type Counter struct {
    2. sync.Mutex
    3. Count int
    4. }
    5. func main() {
    6. var c Counter
    7. c.Lock()
    8. defer c.Unlock()
    9. c.Count++
    10. foo(c) // 复制锁
    11. }
    12. // 这里Counter的参数是通过复制的方式传入的
    13. func foo(c Counter) {
    14. c.Lock()
    15. defer c.Unlock()
    16. fmt.Println("in foo")
    17. }
    1. func foo(l sync.Locker) {
    2. fmt.Println("in foo")
    3. l.Lock()
    4. bar(l)
    5. l.Unlock()
    6. }
    7. func bar(l sync.Locker) {
    8. l.Lock()
    9. fmt.Println("in bar")
    10. l.Unlock()
    11. }
    12. func main() {
    13. l := &sync.Mutex{}
    14. foo(l)
    15. }

    重入锁 (goroutine id)

    1. 方案一:通过 hacker 的方式获取到 goroutine id,记录下获取锁的 goroutine id,它可以实现 Locker 接口。
    2. 方案二:调用 Lock/Unlock 方法时,由 goroutine 提供一个 token,用来标识它自己,而不是我们通过 hacker 的方式获取到 goroutine id,但是,这样一来,就不满足 Locker 接口了。

    获取:goroutine id
    https://github.com/petermattis/goid
    用 goroutine id 做 goroutine 的标识,我们也可以让 goroutine 自己来提供标识。不管怎么说,Go 开发者不期望你利用 goroutine id 做一些不确定的东西,所以,他们没有暴露获取 goroutine id 的方法。

    1. // RecursiveMutex 包装一个Mutex,实现可重入
    2. type RecursiveMutex struct {
    3. sync.Mutex
    4. owner int64 // 当前持有锁的goroutine id
    5. recursion int32 // 这个goroutine 重入的次数
    6. }
    7. func (m *RecursiveMutex) Lock() {
    8. gid := goid.Get()
    9. // 如果当前持有锁的goroutine就是这次调用的goroutine,说明是重入
    10. if atomic.LoadInt64(&m.owner) == gid {
    11. m.recursion++
    12. return
    13. }
    14. m.Mutex.Lock()
    15. // 获得锁的goroutine第一次调用,记录下它的goroutine id,调用次数加1
    16. atomic.StoreInt64(&m.owner, gid)
    17. m.recursion = 1
    18. }
    19. func (m *RecursiveMutex) Unlock() {
    20. gid := goid.Get()
    21. // 非持有锁的goroutine尝试释放锁,错误的使用
    22. if atomic.LoadInt64(&m.owner) != gid {
    23. panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    24. }
    25. // 调用次数减1
    26. m.recursion--
    27. if m.recursion != 0 { // 如果这个goroutine还没有完全释放,则直接返回
    28. return
    29. }
    30. // 此goroutine最后一次调用,需要释放锁
    31. atomic.StoreInt64(&m.owner, -1)
    32. m.Mutex.Unlock()
    33. }

    重入锁 (token)
    调用者自己提供一个 token,获取锁的时候把这个 token 传入,释放锁的时候也需要把这个 token 传入。通过用户传入的 token 替换方案一中 goroutine id,其它逻辑和方案一一致。

    1. // Token方式的递归锁
    2. type TokenRecursiveMutex struct {
    3. sync.Mutex
    4. token int64
    5. recursion int32
    6. }
    7. // 请求锁,需要传入token
    8. func (m *TokenRecursiveMutex) Lock(token int64) {
    9. if atomic.LoadInt64(&m.token) == token { //如果传入的token和持有锁的token一致,说明是递归调用
    10. m.recursion++
    11. return
    12. }
    13. m.Mutex.Lock() // 传入的token不一致,说明不是递归调用
    14. // 抢到锁之后记录这个token
    15. atomic.StoreInt64(&m.token, token)
    16. m.recursion = 1
    17. }
    18. // 释放锁
    19. func (m *TokenRecursiveMutex) Unlock(token int64) {
    20. if atomic.LoadInt64(&m.token) != token { // 释放其它token持有的锁
    21. panic(fmt.Sprintf("wrong the owner(%d): %d!", m.token, token))
    22. }
    23. m.recursion-- // 当前持有这个锁的token释放锁
    24. if m.recursion != 0 { // 还没有回退到最初的递归调用
    25. return
    26. }
    27. atomic.StoreInt64(&m.token, 0) // 没有递归调用了,释放锁
    28. m.Mutex.Unlock()
    29. }

    死锁

    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. "time"
    6. )
    7. func main() {
    8. // 派出所证明
    9. var psCertificate sync.Mutex
    10. // 物业证明
    11. var propertyCertificate sync.Mutex
    12. var wg sync.WaitGroup
    13. wg.Add(2) // 需要派出所和物业都处理
    14. // 派出所处理goroutine
    15. go func() {
    16. defer wg.Done() // 派出所处理完成
    17. psCertificate.Lock()
    18. defer psCertificate.Unlock()
    19. // 检查材料
    20. time.Sleep(5 * time.Second)
    21. // 请求物业的证明
    22. propertyCertificate.Lock()
    23. propertyCertificate.Unlock()
    24. }()
    25. // 物业处理goroutine
    26. go func() {
    27. defer wg.Done() // 物业处理完成
    28. propertyCertificate.Lock()
    29. defer propertyCertificate.Unlock()
    30. // 检查材料
    31. time.Sleep(5 * time.Second)
    32. // 请求派出所的证明
    33. psCertificate.Lock()
    34. psCertificate.Unlock()
    35. }()
    36. wg.Wait()
    37. fmt.Println("成功完成")
    38. }

    检测死锁工具
    go-deadlock:https://github.com/sasha-s/go-deadlock
    go-tools:https://github.com/dominikh/go-tools
    goroutine id:https://github.com/petermattis/goid
    获取后端返回数据:console.dir(res.data)
    分析代码工具:https://github.com/golang/tools/blob/master/go/analysis/passes/copylock/copylock.go
    vet 工具 : go vet copy.go
    死锁的检查机制:https://golang.org/src/runtime/proc.go?h=checkdead#L4345
    Go 提供了一个检测并发访问共享资源是否有问题的工具:https://blog.golang.org/race-detector
    go run -race counter.go
    Go pprof 工具分析,它提供了一个 block profiler 监控阻塞的 goroutine

    Mutex扩展功能(五)

    实现 TryLock
    当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。

    1. // 复制Mutex定义的常量
    2. const (
    3. mutexLocked = 1 << iota // 加锁标识位置
    4. mutexWoken // 唤醒标识位置
    5. mutexStarving // 锁饥饿标识位置
    6. mutexWaiterShift = iota // 标识waiter的起始bit位置
    7. )
    8. // 扩展一个Mutex结构
    9. type Mutex struct {
    10. sync.Mutex
    11. }
    12. // 尝试获取锁
    13. func (m *Mutex) TryLock() bool {
    14. // 如果能成功抢到锁
    15. if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
    16. return true
    17. }
    18. // 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
    19. old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    20. if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
    21. return false
    22. }
    23. // 尝试在竞争的状态下请求锁
    24. new := old | mutexLocked
    25. return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
    26. }

    实验

    1. func try() {
    2. var mu Mutex
    3. go func() { // 启动一个goroutine持有一段时间的锁
    4. mu.Lock()
    5. time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
    6. mu.Unlock()
    7. }()
    8. time.Sleep(time.Second)
    9. ok := mu.TryLock() // 尝试获取到锁
    10. if ok { // 获取成功
    11. fmt.Println("got the lock")
    12. // do something
    13. mu.Unlock()
    14. return
    15. }
    16. // 没有获取到
    17. fmt.Println("can't get the lock")
    18. }

    获取等待者的数量(获取未暴露字段)
    state 这个字段的第一位是用来标记锁是否被持有,第二位用来标记是否已经唤醒了一个等待者,第三位标记锁是否处于饥饿状态,通过分析这个 state 字段我们就可以得到这些状态信息。

    1. const (
    2. mutexLocked = 1 << iota // mutex is locked
    3. mutexWoken
    4. mutexStarving
    5. mutexWaiterShift = iota
    6. )
    7. type Mutex struct {
    8. sync.Mutex
    9. }
    10. func (m *Mutex) Count() int {
    11. // 获取state字段的值
    12. v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    13. v = v >> mutexWaiterShift //得到等待者的数值
    14. v = v + (v & mutexLocked) //再加上锁持有者的数量,0或者1
    15. return int(v)
    16. }
    1. // 锁是否被持有
    2. func (m *Mutex) IsLocked() bool {
    3. state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    4. return state&mutexLocked == mutexLocked
    5. }
    6. // 是否有等待者被唤醒
    7. func (m *Mutex) IsWoken() bool {
    8. state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    9. return state&mutexWoken == mutexWoken
    10. }
    11. // 锁是否处于饥饿状态
    12. func (m *Mutex) IsStarving() bool {
    13. state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    14. return state&mutexStarving == mutexStarving
    15. }

    一个线程的安全队列 (使用 Mutex 实现一个线程安全的队列)
    我们可以通过 Slice 来实现,但是通过 Slice 实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有 data race 的问题。

    1. type SliceQueue struct {
    2. data []interface{}
    3. mu sync.Mutex
    4. }
    5. func NewSliceQueue(n int) (q *SliceQueue) {
    6. return &SliceQueue{data: make([]interface{}, 0, n)}
    7. }
    8. // Enqueue 把值放在队尾
    9. func (q *SliceQueue) Enqueue(v interface{}) {
    10. q.mu.Lock()
    11. q.data = append(q.data, v)
    12. q.mu.Unlock()
    13. }
    14. // Dequeue 移去队头并返回
    15. func (q *SliceQueue) Dequeue() interface{} {
    16. q.mu.Lock()
    17. if len(q.data) == 0 {
    18. q.mu.Unlock()
    19. return nil
    20. }
    21. v := q.data[0]
    22. q.data = q.data[1:]
    23. q.mu.Unlock()
    24. return v
    25. }

    可以为 Mutex 获取锁时加上 Timeout

    1. 最简单直接的是采用channel实现,用select监听锁和timeout两个channel,不在今天的讨论范围内。
    2. 1. for循环+TryLock实现:
    3. 先记录开始的时间,用for循环判断是否超时:没有超时则反复尝试tryLock,直到获取成功;如果超时直接返回失败。
    4. 问题:高频的CAS自旋操作,如果失败的太多,会消耗大量的CPU
    5. 2. 优化1TryLockfast的拆分
    6. TryLock的抢占实现分为两部分,一个是fast path,另一个是竞争状态下的,后者的cas操作很多。我会考虑减少slow方法的频率,比如调用nfast path失败后,再调用一次整个Trylock
    7. 3. 优化2:借鉴TCP重试机制
    8. for循环中的重试增加休眠时间,每次失败将休眠时间乘以一个系数(如1.5),直到达到上限(如10ms),减少自旋带来的性能损耗

    查找知名的数据库系统 TiDB 的 issue,看看有没有 Mutex 相关的 issue,看看它们都是哪些相关的 Bug

    1. https://github.com/pingcap/tidb/pull/20381/files
    2. 这个问题是在当前的函数中Lock,然后在调用的函数中Unlock。这种方式会导致,如果运行子函数时panic了,而外部又有recover机制不希望程序崩溃,就触发不到Unlock,引起死锁。
    3. PR中加了个recover处理,并且判断recovererrorUnlock,这是一种处理方法。
    4. 理想的设计,是将子函数的Unlock挪到与Lock平级的代码,或者不进行recover处理,Let it panic后修复问题。但大型项目项目经常会因为逻辑错综复杂或者各种历史原因,不好改动吧,这种处理方式虽然不好看,但能解决问题,有时候也挺无奈的~

    目前 Mutex 的 state 字段有几个意义,这几个意义分别是由哪些字段表示的?等待一个 Mutex 的 goroutine 数最大是多少?是否能满足现实的需求?

    1. 1. 目前 Mutex state 字段有几个意义,这几个意义分别是由哪些字段表示的?
    2. 和第四个阶段的讲解基本一致:前三个bit分别为mutexLockedmutexWokenmutexStarving,剩余bit表示mutexWaiter
    3. 2. 等待一个 Mutex goroutine 数最大是多少?是否能满足现实的需求?
    4. 单从程序来看,可以支持 1<<(32-3) -1 ,约 0.5 Billion
    5. 其中32state的类型int323waiter字段的shift
    6. 考虑到实际goroutine初始化的空间为2K0.5Billin*2K达到了1TB,单从内存空间来说已经要求极高了,当前的设计肯定可以满足了。

    RWMutex
    Lock/Unlock:写操作时调用的方法。如果锁已经被 reader 或者 writer 持有,那么,Lock 方法会一直阻塞,直到能获取到锁;Unlock 则是配对的释放锁的方法。
    RLock/RUnlock:读操作时调用的方法。如果锁已经被 writer 持有的话,RLock 方法会一直阻塞,直到能获取到锁,否则就直接返回;而 RUnlock 是 reader 释放锁的方法。RLocker:这个方法的作用是为读操作返回一个 Locker 接口的对象。它的 Lock 方法会调用 RWMutex 的 RLock 方法,它的 Unlock 方法会调用 RWMutex 的 RUnlock 方法。

    1. func main() {
    2. var counter Counter
    3. for i := 0; i < 10; i++ { // 10个reader
    4. go func() {
    5. for {
    6. counter.Count() // 计数器读操作
    7. time.Sleep(time.Millisecond)
    8. }
    9. }()
    10. }
    11. for { // 一个writer
    12. counter.Incr() // 计数器写操作
    13. time.Sleep(time.Second)
    14. }
    15. }
    16. // 一个线程安全的计数器
    17. type Counter struct {
    18. mu sync.RWMutex
    19. count uint64
    20. }
    21. // 使用写锁保护
    22. func (c *Counter) Incr() {
    23. c.mu.Lock()
    24. c.count++
    25. c.mu.Unlock()
    26. }
    27. // 使用读锁保护
    28. func (c *Counter) Count() uint64 {
    29. c.mu.RLock()
    30. defer c.mu.RUnlock()
    31. return c.count
    32. }

    在实际使用 RWMutex 的时候,如果我们在 struct 中使用 RWMutex 保护某个字段,一般会把它和这个字段放在一起,用来指示两个字段是一组字段。除此之外,我们还可以采用匿名字段的方式嵌入 struct,这样,在使用这个 struct 时,我们就可以直接调用 Lock/Unlock、RLock/RUnlock 方法了。

    RWMutex 是很常见的并发原语,很多编程语言的库都提供了类似的并发类型。RWMutex 一般都是基于互斥锁、条件变量(condition variables)或者信号量(semaphores)等并发原语来实现。Go 标准库中的 RWMutex 是基于 Mutex 实现的。

    RLock/RUnlock 的实现

    1. func (rw *RWMutex) RLock() {
    2. if atomic.AddInt32(&rw.readerCount, 1) < 0 {
    3. // rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先级高,所以把后来的reader阻塞休眠
    4. runtime_SemacquireMutex(&rw.readerSem, false, 0)
    5. }
    6. }
    7. func (rw *RWMutex) RUnlock() {
    8. if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
    9. rw.rUnlockSlow(r) // 有等待的writer
    10. }
    11. }
    12. func (rw *RWMutex) rUnlockSlow(r int32) {
    13. if atomic.AddInt32(&rw.readerWait, -1) == 0 {
    14. // 最后一个reader了,writer终于有机会获得锁了
    15. runtime_Semrelease(&rw.writerSem, false, 1)
    16. }
    17. }

    Lock

    1. func (rw *RWMutex) Lock() {
    2. // 首先解决其他writer竞争问题
    3. rw.w.Lock()
    4. // 反转readerCount,告诉reader有writer竞争锁
    5. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    6. // 如果当前有reader持有锁,那么需要等待
    7. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    8. runtime_SemacquireMutex(&rw.writerSem, false, 0)
    9. }
    10. }

    Unlock

    1. func (rw *RWMutex) Unlock() {
    2. // 告诉reader没有活跃的writer了
    3. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    4. // 唤醒阻塞的reader们
    5. for i := 0; i < int(r); i++ {
    6. runtime_Semrelease(&rw.readerSem, false, 0)
    7. }
    8. // 释放内部的互斥锁
    9. rw.w.Unlock()
    10. }

    首先,你要理解 readerCount 这个字段的含义以及反转方式。其次,你还要注意字段的更改和内部互斥锁的顺序关系。在 Lock 方法中,是先获取内部互斥锁,才会修改的其他字段;而在 Unlock 方法中,是先修改的其他字段,才会释放内部互斥锁,这样才能保证字段的修改也受到互斥锁的保护。

    RWMutex 的 3 个踩坑点 (不遗漏不多余)
    坑点 1:不可复制
    坑点 2:重入导致死锁
    死锁实现

    1. func main() {
    2. var mu sync.RWMutex
    3. // writer,稍微等待,然后制造一个调用Lock的场景
    4. go func() {
    5. time.Sleep(200 * time.Millisecond)
    6. mu.Lock()
    7. fmt.Println("Lock")
    8. time.Sleep(100 * time.Millisecond)
    9. mu.Unlock()
    10. fmt.Println("Unlock")
    11. }()
    12. go func() {
    13. factorial(&mu, 10) // 计算10的阶乘, 10!
    14. }()
    15. select {}
    16. }
    17. // 递归调用计算阶乘
    18. func factorial(m *sync.RWMutex, n int) int {
    19. if n < 1 { // 阶乘退出条件
    20. return 0
    21. }
    22. fmt.Println("RLock")
    23. m.RLock()
    24. defer func() {
    25. fmt.Println("RUnlock")
    26. m.RUnlock()
    27. }()
    28. time.Sleep(100 * time.Millisecond)
    29. return factorial(m, n-1) * n // 递归调用
    30. }

    坑点 3:释放未加锁的 RWMutex
    场景:我就要实现一个线程安全的 map,那么,一开始你就可以考虑使用读写锁。
    我们也可以扩展 RWMutex,不过实现方法和互斥锁 Mutex 差不多,在技术上是一样的,都是通过 unsafe 来实现。
    请你写一个扩展的读写锁,比如提供 TryLock,查询当前是否有 writer、reader 的数量等方法。

    1. 获取两个关键变量,大致思路是根据 起始地址+偏移量,
    2. // readerCount 这个成员变量前有1个mutex+2个uint32
    3. readerCount := atomic.LoadInt32((*int32)(unsafe.Pointer(uintptr(unsafe.Pointer(&m)) + unsafe.Alignof(sync.Mutex{}) + 2*unsafe.Alignof(uint32(0)))))
    4. // readerWait 这个成员变量前有1个mutex+2个uint32+1个int32
    5. readerWait := atomic.LoadInt32((*int32)(unsafe.Pointer(uintptr(unsafe.Pointer(&m)) + unsafe.Alignof(sync.Mutex{}) + 2*unsafe.Alignof(uint32(0)) + unsafe.Alignof(int32(0)))))
    6. 剩下的,大量借鉴Mutex那块TryLock的实现了,大量地使用atomic原子操作:
    7. TryLock: 读取readerCount,小于0则返回false,否则尝试Lock
    8. 是否有writer:读取readerCount,小于0则有writer,否则没有。
    9. reader的数量:读取readerCount,小于0则加上rwmutexMaxReaders,结果即为reader数量。
    10. 另外还可以通过readerWait,查询当前Lock被多少个RLock阻塞着。
    11. 再谈一个感受,RWMutex的实现依赖于Mutex,这最大的好处是简化了代码,但同时,在读少写多的情况下,由于额外维护了4个变量,性能不如直接调用Mutex。这个读写比例的临界值,找个时间自己测试测试。 :)

    WaitGroup场景
    一是,性能比较低,因为三个小任务可能早就完成了,却要等很长时间才被轮询到;
    二是,会有很多无谓的轮询,空耗 CPU 资源。
    Linux 中的 barrier、Pthread(POSIX 线程)中的 barrier、C++ 中的 std::barrier、Java 中的 CyclicBarrier 和 CountDownLatch

    1. func (wg *WaitGroup) Add(delta int)
    2. func (wg *WaitGroup) Done()
    3. func (wg *WaitGroup) Wait()
    4. Add,用来设置 WaitGroup 的计数值;
    5. Done,用来将 WaitGroup 的计数值减 1,其实就是调用了 Add(-1);
    6. Wait,调用这个方法的 goroutine 会一直阻塞,直到 WaitGroup 的计数值变为 0
    1. // 线程安全的计数器
    2. type Counter struct {
    3. mu sync.Mutex
    4. count uint64
    5. }
    6. // 对计数值加一
    7. func (c *Counter) Incr() {
    8. c.mu.Lock()
    9. c.count++
    10. c.mu.Unlock()
    11. }
    12. // 获取当前的计数值
    13. func (c *Counter) Count() uint64 {
    14. c.mu.Lock()
    15. defer c.mu.Unlock()
    16. return c.count
    17. }
    18. // sleep 1秒,然后计数值加1
    19. func worker(c *Counter, wg *sync.WaitGroup) {
    20. defer wg.Done()
    21. time.Sleep(time.Second)
    22. c.Incr()
    23. }
    24. func main() {
    25. var counter Counter
    26. var wg sync.WaitGroup
    27. wg.Add(10) // WaitGroup的值设置为10
    28. for i := 0; i < 10; i++ { // 启动10个goroutine执行加1任务
    29. go worker(&counter, &wg)
    30. }
    31. // 检查点,等待goroutine都完成任务
    32. wg.Wait()
    33. // 输出当前计数器的值
    34. fmt.Println(counter.Count())
    35. }

    WaitGroup实现

    1. type WaitGroup struct {
    2. // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则
    3. noCopy noCopy
    4. // 64bit(8bytes)的值分成两段,高32bit是计数值,低32bit是waiter的计数
    5. // 另外32bit是用作信号量的
    6. // 因为64bit值的原子操作需要64bit对齐,但是32bit编译器不支持,所以数组中的元素在不同的架构中不一样,具体处理看下面的方法
    7. // 总之,会找到对齐的那64bit作为state,其余的32bit做信号量
    8. state1 [3]uint32
    9. }
    10. // 得到state的地址和信号量的地址
    11. func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    12. if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    13. // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
    14. return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
    15. } else {
    16. // 如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
    17. return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
    18. }
    19. }

    Add方法的逻辑

    1. func (wg *WaitGroup) Add(delta int) {
    2. statep, semap := wg.state()
    3. // 高32bit是计数值v,所以把delta左移32,增加到计数上
    4. state := atomic.AddUint64(statep, uint64(delta)<<32)
    5. v := int32(state >> 32) // 当前计数值
    6. w := uint32(state) // waiter count
    7. if v > 0 || w == 0 {
    8. return
    9. }
    10. // 如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
    11. // 将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
    12. *statep = 0
    13. for ; w != 0; w-- {
    14. runtime_Semrelease(semap, false, 0)
    15. }
    16. }
    17. // Done方法实际就是计数器减1
    18. func (wg *WaitGroup) Done() {
    19. wg.Add(-1)
    20. }

    Wait方法的逻辑

    1. func (wg *WaitGroup) Wait() {
    2. statep, semap := wg.state()
    3. for {
    4. state := atomic.LoadUint64(statep)
    5. v := int32(state >> 32) // 当前计数值
    6. w := uint32(state) // waiter的数量
    7. if v == 0 {
    8. // 如果计数值为0, 调用这个方法的goroutine不必再等待,继续执行它后面的逻辑即可
    9. return
    10. }
    11. // 否则把waiter数量加1。期间可能有并发调用Wait的情况,所以最外层使用了一个for循环
    12. if atomic.CompareAndSwapUint64(statep, state, state+1) {
    13. // 阻塞休眠等待
    14. runtime_Semacquire(semap)
    15. // 被唤醒,不再阻塞,返回
    16. return
    17. }
    18. }
    19. }

    WaitGroup常见问题
    常见问题一:计数器设置为负值
    第一种方法是:调用 Add 的时候传递一个负数。如果你能保证当前的计数器加上这个负数后还是大于等于 0 的话,也没有问题,否则就会导致 panic。
    第二个方法是:调用 Done 方法的次数过多,超过了 WaitGroup 的计数值。
    使用 WaitGroup 的正确姿势是,预先确定好 WaitGroup 的计数值,然后调用相同次数的 Done 完成相应的任务。
    比如,在 WaitGroup 变量声明之后,就立即设置它的计数值,或者在 goroutine 启动之前增加 1,然后在 goroutine 中调用 Done
    常见问题二:不期望的 Add 时机
    等所有的 Add 方法调用之后再调用 Wait,否则就可能导致 panic 或者不期望的结果。
    没有遵循先完成所有的 Add 之后才 Wait。
    常见问题三:前一个 Wait 还没结束就重用 WaitGroup

    1. func main() {
    2. var wg sync.WaitGroup
    3. wg.Add(10)
    4. wg.Add(-10)//将-10作为参数调用Add,计数值被设置为0
    5. wg.Add(-1)//将-1作为参数调用Add,如果加上-1计数值就会变为负数。这是不对的,所以会触发panic
    6. }
    1. func main() {
    2. var wg sync.WaitGroup
    3. wg.Add(1)
    4. wg.Done()
    5. wg.Done()
    6. }

    WaitGroup 虽然可以重用,但是是有一个前提的,那就是必须等到上一轮的 Wait 完成之后,才能重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 方法,就有可能出现 panic。
    noCopy:辅助 vet 检查
    如果你想要自己定义的数据结构不被复制使用,或者说,不能通过 vet 工具检查出复制使用的报警,就可以通过嵌入 noCopy 这个数据类型来实现。

    1. 不重用 WaitGroup。新建一个 WaitGroup 不会带来多大的资源开销,重用反而更容易出错。
    2. 保证所有的 Add 方法调用都在 Wait 之前。
    3. 不传递负数给 Add 方法,只通过 Done 来给计数值减 1
    4. 不做多余的 Done 方法调用,保证 Add 的计数值和 Done 方法调用的数量是一样的。
    5. 不遗漏 Done 方法的调用,否则会导致 Wait hang 住无法返回。

    通常我们可以把 WaitGroup 的计数值,理解为等待要完成的 waiter 的数量。你可以试着扩展下 WaitGroup,来查询 WaitGroup 的当前的计数值吗?
    实现:
    func getStateAndWait(wgp sync.WaitGroup) (uint32, uint32) {
    var statep
    uint64
    if uintptr(unsafe.Pointer(wgp))%8 == 0 {
    statep = (uint64)(unsafe.Pointer(wgp))
    } else {
    statep = (
    uint64)(unsafe.Pointer(uintptr(unsafe.Pointer(wgp)) + unsafe.Sizeof(uint32(0))))
    }
    return uint32(statep >> 32), uint32(statep)
    }

    注意点:
    1. 这里用了一个函数来实现,更常见的可以自己封一个类。用函数实现时注意用指针传递wg
    2. 返回的两个值分别是state和wait,state是要完成的waiter计数值(即等待多少个goroutine完成);wait是指有多少个sync.Wait在等待(和前面的waiter不是一个概念)。

    Cond和Channel
    实现

    1. type Cond
    2. func NeWCond(l Locker) *Cond
    3. func (c *Cond) Broadcast()
    4. func (c *Cond) Signal()
    5. func (c *Cond) Wait()

    Signal 方法,允许调用者 Caller 唤醒一个等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则需要从等待队列中移除第一个 goroutine 并把它唤醒。在其他编程语言中,比如 Java 语言中,Signal 方法也被叫做 notify 方法。
    Broadcast 方法,允许调用者 Caller 唤醒所有等待此 Cond 的 goroutine。如果此时没有等待的 goroutine,显然无需通知 waiter;如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒。在其他编程语言中,比如 Java 语言中,Broadcast 方法也被叫做 notifyAll 方法。
    Wait 方法,会把调用者 Caller 放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒。调用 Wait 方法时必须要持有 c.L 的锁。

    1. func main() {
    2. c := sync.NewCond(&sync.Mutex{})
    3. var ready int
    4. for i := 0; i < 10; i++ {
    5. go func(i int) {
    6. time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)
    7. // 加锁更改等待条件
    8. c.L.Lock()
    9. ready++
    10. c.L.Unlock()
    11. log.Printf("运动员#%d 已准备就绪\n", i)
    12. // 广播唤醒所有的等待者
    13. c.Broadcast()
    14. }(i)
    15. }
    16. c.L.Lock()
    17. for ready != 10 {
    18. c.Wait()
    19. log.Println("裁判员被唤醒一次")
    20. }
    21. c.L.Unlock()
    22. //所有的运动员是否就绪
    23. log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
    24. }

    Cond 的使用其实没那么简单。它的复杂在于:一,这段代码有时候需要加锁,有时候可以不加;二,Wait 唤醒后需要检查条件;三,条件变量的更改,其实是需要原子操作或者互斥锁保护的。所以,有的开发者会认为,Cond 是唯一难以掌握的 Go 并发原语。
    Cond的实现原理

    1. type Cond struct {
    2. noCopy noCopy
    3. // 当观察或者修改等待条件的时候需要加锁
    4. L Locker
    5. // 等待队列
    6. notify notifyList
    7. checker copyChecker
    8. }
    9. func NewCond(l Locker) *Cond {
    10. return &Cond{L: l}
    11. }
    12. func (c *Cond) Wait() {
    13. c.checker.check()
    14. // 增加到等待队列中
    15. t := runtime_notifyListAdd(&c.notify)
    16. c.L.Unlock()
    17. // 阻塞休眠直到被唤醒
    18. runtime_notifyListWait(&c.notify, t)
    19. c.L.Lock()
    20. }
    21. func (c *Cond) Signal() {
    22. c.checker.check()
    23. runtime_notifyListNotifyOne(&c.notify)
    24. }
    25. func (c *Cond) Broadcast() {
    26. c.checker.check()
    27. runtime_notifyListNotifyAll(&c.notify
    28. }

    Cond 常见的两个错误,一个是调用 Wait 的时候没有加锁,另一个是没有检查条件是否满足程序就继续执行了。
    Cond的使用
    第一,同样的场景我们会使用其他的并发原语来替代。Go 特有的 Channel 类型,有一个应用很广泛的模式就是通知机制,这个模式使用起来也特别简单。所以很多情况下,我们会使用 Channel 而不是 Cond 实现 wait/notify 机制。
    第二,对于简单的 wait/notify 场景,比如等待一组 goroutine 完成之后继续执行余下的代码,我们会使用 WaitGroup 来实现。因为 WaitGroup 的使用方法更简单,而且不容易出错。比如,上面百米赛跑的问题,就可以很方便地使用 WaitGroup 来实现。

    一个 Cond 的 waiter 被唤醒的时候,为什么需要再检查等待条件,而不是唤醒后进行下一步?

    1. 唤醒的方式有broadcast,第N个waiter被唤醒后需要检查等待条件,因为不知道前N-1个被唤醒的waiter所作的修改是否使等待条件再次成立。

    你能否利用 Cond 实现一个容量有限的 queue?

    1. package main
    2. improt(
    3. "fmt"
    4. "math/rand"
    5. "strings"
    6. "sync"
    7. )
    8. tyep Queue struct{
    9. cond *sync.cond
    10. data []interface{}
    11. capc int
    12. logs []string
    13. }
    14. func NewQueue(capacity int) *Queue {
    15. return &Queue{cond: &sync.Cond{L: &sync.Mutex{}}, data: make([]interface{}, 0), capc: capacity, logs: make([]string, 0)}
    16. }
    17. func (q *Queue) Enqueue(d interface{}) {
    18. q.cond.L.Lock()
    19. defer q.cond.L.Unlock()
    20. for len(q.data) == q.capc {
    21. q.cond.Wait()
    22. }
    23. // FIFO入队
    24. q.data = append(q.data, d)
    25. // 记录操作日志
    26. q.logs = append(q.logs, fmt.Sprintf("En %v\n", d))
    27. // 通知其他waiter进行Dequeue或Enqueue操作
    28. q.cond.Broadcast()
    29. }
    30. func (q *Queue) Dequeue() (d interface{}) {
    31. q.cond.L.Lock()
    32. defer q.cond.L.Unlock()
    33. for len(q.data) == 0 {
    34. q.cond.Wait()
    35. }
    36. // FIFO出队
    37. d = q.data[0]
    38. q.data = q.data[1:]
    39. // 记录操作日志
    40. q.logs = append(q.logs, fmt.Sprintf("De %v\n", d))
    41. // 通知其他waiter进行Dequeue或Enqueue操作
    42. q.cond.Broadcast()
    43. return
    44. }
    45. func (q *Queue) Len() int {
    46. q.cond.L.Lock()
    47. defer q.cond.L.Unlock()
    48. return len(q.data)
    49. }
    50. func (q *Queue) String() string {
    51. var b strings.Builder
    52. for _, log := range q.logs {
    53. //fmt.Fprint(&b, log)
    54. b.WriteString(log)
    55. }
    56. return b.String()
    57. }

    Once
    初始化操作

    1. package abc
    2. import time
    3. var startTime = time.Now()
    1. package abc
    2. var startTime time.Time
    3. func init() {
    4. startTime = time.Now()
    5. }
    1. package abc
    2. var startTime time.Tim
    3. func initApp() {
    4. startTime = time.Now()
    5. }
    6. func main() {
    7. initApp()
    8. }

    单例资源的初始化

    1. package main
    2. import (
    3. "net"
    4. "sync"
    5. "time"
    6. )
    7. // 使用互斥锁保证线程(goroutine)安全
    8. var connMu sync.Mutex
    9. var conn net.Conn
    10. func getConn() net.Conn {
    11. connMu.Lock()
    12. defer connMu.Unlock()
    13. // 返回已创建好的连接
    14. if conn != nil {
    15. return conn
    16. }
    17. // 创建连接
    18. conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
    19. return conn
    20. }
    21. // 使用连接
    22. func main() {
    23. conn := getConn()
    24. if conn == nil {
    25. panic("conn is nil")
    26. }
    27. }
    1. package main
    2. import (
    3. "fmt"
    4. "sync"
    5. )
    6. func main() {
    7. var once sync.Once
    8. // 第一个初始化函数
    9. f1 := func() {
    10. fmt.Println("in f1")
    11. }
    12. once.Do(f1) // 打印出 in f1
    13. // 第二个初始化函数
    14. f2 := func() {
    15. fmt.Println("in f2")
    16. }
    17. once.Do(f2) // 无输出
    18. }

    Once 常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源。

    1. // 值是3.0或者0.0的一个数据结构
    2. var threeOnce struct {
    3. sync.Once
    4. v *Float
    5. }
    6. // 返回此数据结构的值,如果还没有初始化为3.0,则初始化
    7. func three() *Float {
    8. threeOnce.Do(func() { // 使用Once初始化
    9. threeOnce.v = NewFloat(3.0)
    10. })
    11. return threeOnce.v
    12. }

    Once 实现
    一个正确的 Once 实现要使用一个互斥锁,这样初始化的时候如果有并发的 goroutine,就会进入doSlow 方法。互斥锁的机制保证只有一个 goroutine 进行初始化,同时利用双检查的机制(double-checking),再次判断 o.done 是否为 0,如果为 0,则是第一次执行,执行完毕后,就将 o.done 设置为 1,然后释放锁。

    1. type Once struct {
    2. done uint32
    3. m Mutex
    4. }
    5. func (o *Once) Do(f func()) {
    6. if atomic.LoadUint32(&o.done) == 0 {
    7. o.doSlow(f)
    8. }
    9. }
    10. func (o *Once) doSlow(f func()) {
    11. o.m.Lock()
    12. defer o.m.Unlock()
    13. // 双检查
    14. if o.done == 0 {
    15. defer atomic.StoreUint32(&o.done, 1)
    16. f()
    17. }
    18. }

    Once 可能出现的 2 种错误
    第一种错误:死锁你已经知道了 Do 方法会执行一次 f,但是如果 f 中再次调用这个 Once 的 Do 方法的话,就会导致死锁的情况出现。这还不是无限递归的情况,而是的的确确的 Lock 的递归调用导致的死锁。

    1. func main() {
    2. var once sync.Once
    3. var googleConn net.Conn // 到Google网站的一个连接
    4. once.Do(func() {
    5. // 建立到google.com的连接,有可能因为网络的原因,googleConn并没有建立成功,此时它的值为nil
    6. googleConn, _ = net.Dial("tcp", "google.com:80")
    7. })
    8. // 发送http请求
    9. googleConn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))
    10. io.Copy(os.Stdout, googleConn)
    11. }

    第二种错误:未初始化
    由于一些防火墙的原因,googleConn 并没有被正确的初始化,后面如果想当然认为既然执行了 Do 方法 googleConn 就已经初始化的话,会抛出空指针的错误

    1. // 一个功能更加强大的Once
    2. type Once struct {
    3. m sync.Mutex
    4. done uint32
    5. }
    6. // 传入的函数f有返回值error,如果初始化失败,需要返回失败的error
    7. // Do方法会把这个error返回给调用者
    8. func (o *Once) Do(f func() error) error {
    9. if atomic.LoadUint32(&o.done) == 1 { //fast path
    10. return nil
    11. }
    12. return o.slowDo(f)
    13. }
    14. // 如果还没有初始化
    15. func (o *Once) slowDo(f func() error) error {
    16. o.m.Lock()
    17. defer o.m.Unlock()
    18. var err error
    19. if o.done == 0 { // 双检查,还没有初始化
    20. err = f()
    21. if err == nil { // 初始化成功才将标记置为已初始化
    22. atomic.StoreUint32(&o.done, 1)
    23. }
    24. }
    25. return err
    26. }

    重写Once

    1. // 一个功能更加强大的Once
    2. type Once struct {
    3. m sync.Mutex
    4. done uint32
    5. }
    6. // 传入的函数f有返回值error,如果初始化失败,需要返回失败的error
    7. // Do方法会把这个error返回给调用者
    8. func (o *Once) Do(f func() error) error {
    9. if atomic.LoadUint32(&o.done) == 1 { //fast path
    10. return nil
    11. }
    12. return o.slowDo(f)
    13. }
    14. // 如果还没有初始化
    15. func (o *Once) slowDo(f func() error) error {
    16. o.m.Lock()
    17. defer o.m.Unlock()
    18. var err error
    19. if o.done == 0 { // 双检查,还没有初始化
    20. err = f()
    21. if err == nil { // 初始化成功才将标记置为已初始化
    22. atomic.StoreUint32(&o.done, 1)
    23. }
    24. }
    25. return err
    26. }
    1. // Once 是一个扩展的sync.Once类型,提供了一个Done方法
    2. type Once struct {
    3. sync.Once
    4. }
    5. // Done 返回此Once是否执行过
    6. // 如果执行过则返回true
    7. // 如果没有执行过或者正在执行,返回false
    8. func (o *Once) Done() bool {
    9. return atomic.LoadUint32((*uint32)(unsafe.Pointer(&o.Once))) == 1
    10. }
    11. func main() {
    12. var flag Once
    13. fmt.Println(flag.Done()) //false
    14. flag.Do(func() {
    15. time.Sleep(time.Second)
    16. })
    17. fmt.Println(flag.Done()) //true
    18. }

    我已经分析了几个并发原语的实现,你可能注意到总是有些 slowXXXX 的方法,从 XXXX 方法中单独抽取出来,你明白为什么要这么做吗,有什么好处?
    分离固定内容和非固定内容,使得固定的内容能被内联调用,从而优化执行过程。
    Once 在第一次使用之后,还能复制给其它变量使用吗?
    Once被拷贝的过程中内部的已执行状态不会改变,所以Once不能通过拷贝多次执行。
    Map
    注意,如果使用 struct 类型做 key 其实是有坑的,因为如果 struct 的某个字段值修改了,查询 map 时无法获取它 add 进去的值

    1. type mapKey struct {
    2. key int
    3. }
    4. func main() {
    5. var m = make(map[mapKey]string)
    6. var key = mapKey{10}
    7. m[key] = "hello"
    8. fmt.Printf("m[key]=%s\n", m[key])
    9. // 修改key的字段的值后再次查询map,无法获取刚才add进去的值
    10. key.key = 100
    11. fmt.Printf("再次查询m[key]=%s\n", m[key])
    12. }
    1. func main() {
    2. var m = make(map[string]int)
    3. m["a"] = 0
    4. fmt.Printf("a=%d; b=%d\n", m["a"], m["b"])
    5. av, aexisted := m["a"]
    6. bv, bexisted := m["b"]
    7. fmt.Printf("a=%d, existed: %t; b=%d, existed: %t\n", av, aexisted, bv, bexisted)
    8. }

    orderedmap:https://github.com/elliotchance/orderedmap
    使用 map 的 2 种常见错误
    常见错误一:
    未初始化和 slice 或者 Mutex、RWmutex 等 struct 类型不同,map 对象必须在使用之前初始化。如果不初始化就直接赋值的话,会出现 panic 异常,比如下面的例子,m 实例还没有初始化就直接进行操作会导致 panic

    1. func main() {
    2. var m map[int]int
    3. m[100] = 100
    4. }

    解决办法就是在第 2 行初始化这个实例(m := make(map[int]int))。从一个 nil 的 map 对象中获取值不会 panic,而是会得到零值,所以下面的代码不会报错:

    1. func main() {
    2. var m map[int]int
    3. fmt.Println(m[100])
    4. }

    常见错误二:
    并发读写,对于 map 类型,另一个很容易犯的错误就是并发访问问题。这个易错点,相当令人讨厌,如果没有注意到并发问题,程序在运行的时候就有可能出现并发读写导致的 panic。Go 内建的 map 对象不是线程(goroutine)安全的,并发读写的时候运行时会有检查,遇到并发问题就会导致 panic

    1. func main() {
    2. var m = make(map[int]int,10) // 初始化一个map
    3. go func() {
    4. for {
    5. m[1] = 1 //设置key
    6. }
    7. }()
    8. go func() {
    9. for {
    10. _ = m[2] //访问这个map
    11. }
    12. }()
    13. select {}
    14. }

    如何实现线程安全的 map 类型
    加读写锁:扩展 map,支持并发读写

    1. type RWMap struct { // 一个读写锁保护的线程安全的map
    2. sync.RWMutex // 读写锁保护下面的map字段
    3. m map[int]int
    4. }
    5. // 新建一个RWMap
    6. func NewRWMap(n int) *RWMap {
    7. return &RWMap{
    8. m: make(map[int]int, n),
    9. }
    10. }
    11. func (m *RWMap) Get(k int) (int, bool) { //从map中读取一个值
    12. m.RLock()
    13. defer m.RUnlock()
    14. v, existed := m.m[k] // 在锁的保护下从map中读取
    15. return v, existed
    16. }
    17. func (m *RWMap) Set(k int, v int) { // 设置一个键值对
    18. m.Lock() // 锁保护
    19. defer m.Unlock()
    20. m.m[k] = v
    21. }
    22. func (m *RWMap) Delete(k int) { //删除一个键
    23. m.Lock() // 锁保护
    24. defer m.Unlock()
    25. delete(m.m, k)
    26. }
    27. func (m *RWMap) Len() int { // map的长度
    28. m.RLock() // 锁保护
    29. defer m.RUnlock()
    30. return len(m.m)
    31. }
    32. func (m *RWMap) Each(f func(k, v int) bool) { // 遍历map
    33. m.RLock() //遍历期间一直持有读锁
    34. defer m.RUnlock()
    35. for k, v := range m.m {
    36. if !f(k, v) {
    37. return
    38. }
    39. }
    40. }

    分片加锁:更高效的并发 map
    orcaman/concurrent-map:https://github.com/orcaman/concurrent-map

    1. var SHARD_COUNT = 32
    2. // 分成SHARD_COUNT个分片的map
    3. type ConcurrentMap []*ConcurrentMapShared
    4. // 通过RWMutex保护的线程安全的分片,包含一个map
    5. type ConcurrentMapShared struct {
    6. items map[string]interface{}
    7. sync.RWMutex // Read Write mutex, guards access to internal map.
    8. }
    9. // 创建并发map
    10. func New() ConcurrentMap {
    11. m := make(ConcurrentMap, SHARD_COUNT)
    12. for i := 0; i < SHARD_COUNT; i++ {
    13. m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
    14. }
    15. return m
    16. }
    17. // 根据key计算分片索引
    18. func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
    19. return m[uint(fnv32(key))%uint(SHARD_COUNT)]
    20. }

    增加或者查询的时候,首先根据分片索引得到分片对象,然后对分片对象加锁进行操作

    1. func (m ConcurrentMap) Set(key string, value interface{}) {
    2. // 根据key计算出对应的分片
    3. shard := m.GetShard(key)
    4. shard.Lock() //对这个分片加锁,执行业务操作
    5. shard.items[key] = value
    6. shard.Unlock()
    7. }
    8. func (m ConcurrentMap) Get(key string) (interface{}, bool) {
    9. // 根据key计算出对应的分片
    10. shard := m.GetShard(key)
    11. shard.RLock()
    12. // 从这个分片读取key的值
    13. val, ok := shard.items[key]
    14. shard.RUnlock()
    15. return val, ok
    16. }

    在我个人使用并发 map 的过程中,加锁和分片加锁这两种方案都比较常用,如果是追求更高的性能,显然是分片加锁更好,因为它可以降低锁的粒度,进而提高访问此 map 对象的吞吐。如果并发性能要求不是那么高的场景,简单加锁方式更简单。
    sync.Map的实现
    1、只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
    2、多个 goroutine 为不相交的键集读、写和重写键值对。
    优点
    空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。
    map数据结构

    1. type Map struct {
    2. mu Mutex
    3. // 基本上你可以把它看成一个安全的只读的map
    4. // 它包含的元素其实也是通过原子操作更新的,但是已删除的entry就需要加锁操作了
    5. read atomic.Value // readOnly
    6. // 包含需要加锁才能访问的元素
    7. // 包括所有在read字段中但未被expunged(删除)的元素以及新加的元素
    8. dirty map[interface{}]*entry
    9. // 记录从read中读取miss的次数,一旦miss数和dirty长度一样了,就会把dirty提升为read,并把dirty置空
    10. misses int
    11. }
    12. type readOnly struct {
    13. m map[interface{}]*entry
    14. amended bool // 当dirty中包含read没有的数据时为true,比如新增一条数据
    15. }
    16. // expunged是用来标识此项已经删掉的指针
    17. // 当map中的一个项目被删除了,只是把它的值标记为expunged,以后才有机会真正删除此项
    18. var expunged = unsafe.Pointer(new(interface{}))
    19. // entry代表一个值
    20. type entry struct {
    21. p unsafe.Pointer // *interface{}
    22. }

    Store 方法

    1. func (m *Map) Store(key, value interface{}) {
    2. read, _ := m.read.Load().(readOnly)
    3. // 如果read字段包含这个项,说明是更新,cas更新项目的值即可
    4. if e, ok := read.m[key]; ok && e.tryStore(&value) {
    5. return
    6. }
    7. // read中不存在,或者cas更新失败,就需要加锁访问dirty了
    8. m.mu.Lock()
    9. read, _ = m.read.Load().(readOnly)
    10. if e, ok := read.m[key]; ok { // 双检查,看看read是否已经存在了
    11. if e.unexpungeLocked() {
    12. // 此项目先前已经被删除了,通过将它的值设置为nil,标记为unexpunged
    13. m.dirty[key] = e
    14. }
    15. e.storeLocked(&value) // 更新
    16. } else if e, ok := m.dirty[key]; ok { // 如果dirty中有此项
    17. e.storeLocked(&value) // 直接更新
    18. } else { // 否则就是一个新的key
    19. if !read.amended { //如果dirty为nil
    20. // 需要创建dirty对象,并且标记read的amended为true,
    21. // 说明有元素它不包含而dirty包含
    22. m.dirtyLocked()
    23. m.read.Store(readOnly{m: read.m, amended: true})
    24. }
    25. m.dirty[key] = newEntry(value) //将新值增加到dirty对象中
    26. }
    27. m.mu.Unlock()
    28. }

    dirty方法

    1. func (m *Map) dirtyLocked() {
    2. if m.dirty != nil { // 如果dirty字段已经存在,不需要创建了
    3. return
    4. }
    5. read, _ := m.read.Load().(readOnly) // 获取read字段
    6. m.dirty = make(map[interface{}]*entry, len(read.m))
    7. for k, e := range read.m { // 遍历read字段
    8. if !e.tryExpungeLocked() { // 把非punged的键值对复制到dirty中
    9. m.dirty[k] = e
    10. }
    11. }
    12. }

    Load 方法

    1. func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    2. // 首先从read处理
    3. read, _ := m.read.Load().(readOnly)
    4. e, ok := read.m[key]
    5. if !ok && read.amended { // 如果不存在并且dirty不为nil(有新的元素)
    6. m.mu.Lock()
    7. // 双检查,看看read中现在是否存在此key
    8. read, _ = m.read.Load().(readOnly)
    9. e, ok = read.m[key]
    10. if !ok && read.amended {//依然不存在,并且dirty不为nil
    11. e, ok = m.dirty[key]// 从dirty中读取
    12. // 不管dirty中存不存在,miss数都加1
    13. m.missLocked()
    14. }
    15. m.mu.Unlock()
    16. }
    17. if !ok {
    18. return nil, false
    19. }
    20. return e.load() //返回读取的对象,e既可能是从read中获得的,也可能是从dirty中获得的
    21. }

    missLocked方法

    1. func (m *Map) missLocked() {
    2. m.misses++ // misses计数加一
    3. if m.misses < len(m.dirty) { // 如果没达到阈值(dirty字段的长度),返回
    4. return
    5. }
    6. m.read.Store(readOnly{m: m.dirty}) //把dirty字段的内存提升为read字段
    7. m.dirty = nil // 清空dirty
    8. m.misses = 0 // misses数重置为0
    9. }

    Delete 方法

    1. func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    2. read, _ := m.read.Load().(readOnly)
    3. e, ok := read.m[key]
    4. if !ok && read.amended {
    5. m.mu.Lock()
    6. // 双检查
    7. read, _ = m.read.Load().(readOnly)
    8. e, ok = read.m[key]
    9. if !ok && read.amended {
    10. e, ok = m.dirty[key]
    11. // 这一行长坤在1.15中实现的时候忘记加上了,导致在特殊的场景下有些key总是没有被回收
    12. delete(m.dirty, key)
    13. // miss数加1
    14. m.missLocked()
    15. }
    16. m.mu.Unlock()
    17. }
    18. if ok {
    19. return e.delete()
    20. }
    21. return nil, false
    22. }
    23. func (m *Map) Delete(key interface{}) {
    24. m.LoadAndDelete(key)
    25. }
    26. func (e *entry) delete() (value interface{}, ok bool) {
    27. for {
    28. p := atomic.LoadPointer(&e.p)
    29. if p == nil || p == expunged {
    30. return nil, false
    31. }
    32. if atomic.CompareAndSwapPointer(&e.p, p, nil) {
    33. return *(*interface{})(p), true
    34. }
    35. }
    36. }

    timedmap:https://github.com/zekroTJA/timedmap
    treemap:https://godoc.org/github.com/emirpasic/gods/maps/treemap
    为什么 sync.Map 中的集合核心方法的实现中,如果 read 中项目不存在,加锁后还要双检查,再检查一次 read?
    第一次先用CAS快速尝试,失败后进行加锁,然后进行第二次CAS检查,再进行修改;
    在高并发的情况下,存在多个goroutine在修改同一个Key,第一次CAS都失败了,在竞争锁;如果不进行第二次CAS检查就直接修改,这个Key就会被多次修改
    你看到 sync.map 元素删除的时候只是把它的值设置为 nil,那么什么时候这个 key 才会真正从 map 对象中删除?
    真正删除key的操作是在数据从read往dirty迁移的过程中(往dirty写数据时,发现dirty没有数据,就会触发迁移),只迁移没有被标记为删除的KV
    read 中 key 被删除会有两个状态:nil 和 expunged。我会有些不明白,要么都用 nil 或者都用 expunged,这样会不会更好一些?
    nil和expunged都代表元素被删除了,只不过expunged比较特殊,如果被删除的元素是expunged,代表它只存在于readonly之中,不存在于dirty中。这样如果重新设置这个key的话,需要往dirty增加key

    sync.Pool
    三色并发标记算法标记对象并回收 https://blog.golang.org/ismmkeynote
    Go 的自动垃圾回收机制还是有一个 STW(stop-the-world,程序暂停)的时间,而且,大量地创建在堆上的对象,也会影响垃圾回收标记的时间。
    1、sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;
    2、sync.Pool 不可在使用之后再复制使用。

    1. func (p *Pool) Get() interface{} {
    2. // 把当前goroutine固定在当前的P上
    3. l, pid := p.pin()
    4. x := l.private // 优先从local的private字段取,快速
    5. l.private = nil
    6. if x == nil {
    7. // 从当前的local.shared弹出一个,注意是从head读取并移除
    8. x, _ = l.shared.popHead()
    9. if x == nil { // 如果没有,则去偷一个
    10. x = p.getSlow(pid)
    11. }
    12. }
    13. runtime_procUnpin()
    14. // 如果没有获取到,尝试使用New函数生成一个新的
    15. if x == nil && p.New != nil {
    16. x = p.New()
    17. }
    18. return x
    19. }
    1. func (p *Pool) getSlow(pid int) interface{} {
    2. size := atomic.LoadUintptr(&p.localSize)
    3. locals := p.local
    4. // 从其它proc中尝试偷取一个元素
    5. for i := 0; i < int(size); i++ {
    6. l := indexLocal(locals, (pid+i+1)%int(size))
    7. if x, _ := l.shared.popTail(); x != nil {
    8. return x
    9. }
    10. }
    11. // 如果其它proc也没有可用元素,那么尝试从vintim中获取
    12. size = atomic.LoadUintptr(&p.victimSize)
    13. if uintptr(pid) >= size {
    14. return nil
    15. }
    16. locals = p.victim
    17. l := indexLocal(locals, pid)
    18. if x := l.private; x != nil { // 同样的逻辑,先从vintim中的local private获取
    19. l.private = nil
    20. return x
    21. }
    22. for i := 0; i < int(size); i++ { // 从vintim其它proc尝试偷取
    23. l := indexLocal(locals, (pid+i)%int(size))
    24. if x, _ := l.shared.popTail(); x != nil {
    25. return x
    26. }
    27. }
    28. // 如果victim中都没有,则把这个victim标记为空,以后的查找可以快速跳过了
    29. atomic.StoreUintptr(&p.victimSize, 0)
    30. return nil
    31. }

    New方法
    Pool struct 包含一个 New 字段,这个字段的类型是函数 func() interface{}。当调用 Pool 的 Get 方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个 New 方法来创建新的元素。如果你没有设置 New 字段,没有更多的空闲元素可返回时,Get 方法将返回 nil,表明当前没有可用的元素。有趣的是,New 是可变的字段。这就意味着,你可以在程序运行的时候改变创建元素的方法。当然,很少有人会这么做,因为一般我们创建元素的逻辑都是一致的,要创建的也是同一类的元素,所以你在使用 Pool 的时候也没必要玩一些“花活”,在程序运行时更改 New 的值。

    1. func (p *Pool) Put(x interface{}) {
    2. if x == nil { // nil值直接丢弃
    3. return
    4. }
    5. l, _ := p.pin()
    6. if l.private == nil { // 如果本地private没有值,直接设置这个值即可
    7. l.private = x
    8. x = nil
    9. }
    10. if x != nil { // 否则加入到本地队列中
    11. l.shared.pushHead(x)
    12. }
    13. runtime_procUnpin()
    14. }

    Get方法
    如果调用这个方法,就会从 Pool取走一个元素,这也就意味着,这个元素会从 Pool 中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get 方法的返回值还可能会是一个 nil(Pool.New 字段没有设置,又没有空闲元素可以返回),所以你在使用的时候,可能需要判断。
    Put方法
    这个方法用于将一个元素返还给 Pool,Pool 会把这个元素保存到池中,并且可以复用。但如果 Put 一个 nil 值,Pool 就会忽略这个值。
    bufpool:https://github.com/gohugoio/hugo/blob/master/bufferpool/bufpool.go

    1. var buffers = sync.Pool{
    2. New: func() interface{} {
    3. return new(bytes.Buffer)
    4. },
    5. }
    6. func GetBuffer() *bytes.Buffer {
    7. return buffers.Get().(*bytes.Buffer)
    8. }
    9. func PutBuffer(buf *bytes.Buffer) {
    10. buf.Reset()
    11. buffers.Put(buf)
    12. }

    除了 Hugo,这段 buffer 池的代码非常常用。很可能你在阅读其它项目的代码的时候就碰到过,或者是你自己实现 buffer 池的时候也会这么去实现,但是请你注意了,这段代码是有问题的,你一定不要将上面的代码应用到实际的产品中。它可能会有内存泄漏的问题。
    sync.Pool实现原理
    1. 每次 GC 都会回收创建的对象。
    2. 底层实现使用了 Mutex,对这个锁并发请求竞争激烈的时候,会导致性能的下降。
    Go 对 Pool 的优化就是避免使用锁,同时将加锁的 queue 改成 lock-free 的 queue 的实现,给即将移除的元素再多一次“复活”的机会。

    1. func poolCleanup() {
    2. // 丢弃当前victim, STW所以不用加锁
    3. for _, p := range oldPools {
    4. p.victim = nil
    5. p.victimSize = 0
    6. }
    7. // 将local复制给victim, 并将原local置为nil
    8. for _, p := range allPools {
    9. p.victim = p.local
    10. p.victimSize = p.localSize
    11. p.local = nil
    12. p.localSize = 0
    13. }
    14. oldPools, allPools = allPools, nil
    15. }

    sync.Pool 的坑
    1、内存泄漏
    取出来的 bytes.Buffer 在使用的时候,我们可以往这个元素中增加大量的 byte 数据,这会导致底层的 byte slice 的容量可能会变得很大。这个时候,即使 Reset 再放回到池子中,这些 byte slice 的容量不会改变,所占的空间依然很大。而且,因为 Pool 回收的机制,这些大的 Buffer 可能不被回收,而是会一直占用很大的空间,这属于内存泄漏的问题。
    package fmt 中也有这个问题,修改方法是一样的,超过一定大小的 buffer,就直接丢弃了
    在使用 sync.Pool 回收 buffer 的时候,一定要检查回收的对象的大小。
    2、内存浪费
    是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象。
    我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了。
    bucketpool:https://github.com/vitessio/vitess/blob/master/go/bucketpool/bucketpool.go
    net/http/server.go:https://github.com/golang/go/blob/617f2c3e35cdc8483b950aa3ef18d92965d63197/src/net/http/server.go
    bytebufferpool:https://github.com/valyala/bytebufferpool
    oxtoacart/bpool:https://github.com/oxtoacart/bpool
    bpool 是基于 Channel 实现的,不像 sync.Pool 为了提高性能而做了很多优化,所以,在性能上比不过 sync.Pool。不过,它提供了限制 Pool 容量的功能,所以,如果你想控制 Pool 的容量的话,可以考虑这个库。
    fatih/pool:https://github.com/fatih/pool

    1. // 工厂模式,提供创建连接的工厂方法
    2. factory := func() (net.Conn, error) { return net.Dial("tcp", "127.0.0.1:4000") }
    3. // 创建一个tcp池,提供初始容量和最大容量以及工厂方法
    4. p, err := pool.NewChannelPool(5, 30, factory)
    5. // 获取一个连接
    6. conn, err := p.Get()
    7. // Close并不会真正关闭这个连接,而是把它放回池子,所以你不必显式地Put这个对象到池子中
    8. conn.Close()
    9. // 通过调用MarkUnusable, Close的时候就会真正关闭底层的tcp的连接了
    10. if pc, ok := conn.(*pool.PoolConn); ok {
    11. pc.MarkUnusable()
    12. pc.Close()
    13. }
    14. // 关闭池子就会关闭=池子中的所有的tcp连接
    15. p.Close()
    16. // 当前池子中的连接的数量
    17. current := p.Len()

    采用 Mutex+Slice 实现 Pool

    1. // 放回一个待重用的连接
    2. func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
    3. c.lk.Lock()
    4. defer c.lk.Unlock()
    5. if c.freeconn == nil { // 如果对象为空,创建一个map对象
    6. c.freeconn = make(map[string][]*conn)
    7. }
    8. freelist := c.freeconn[addr.String()] //得到此地址的连接列表
    9. if len(freelist) >= c.maxIdleConns() {//如果连接已满,关闭,不再放入
    10. cn.nc.Close()
    11. return
    12. }
    13. c.freeconn[addr.String()] = append(freelist, cn) // 加入到空闲列表中
    14. }
    15. // 得到一个空闲连接
    16. func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
    17. c.lk.Lock()
    18. defer c.lk.Unlock()
    19. if c.freeconn == nil {
    20. return nil, false
    21. }
    22. freelist, ok := c.freeconn[addr.String()]
    23. if !ok || len(freelist) == 0 { // 没有此地址的空闲列表,或者列表为空
    24. return nil, false
    25. }
    26. cn = freelist[len(freelist)-1] // 取出尾部的空闲连接
    27. c.freeconn[addr.String()] = freelist[:len(freelist)-1]
    28. return cn, true
    29. }

    Worker Pool:https://github.com/valyala/fasthttp/blob/9f11af296864153ee45341d3f2fe0f5178fd6210/workerpool.go#L16
    gammazero/workerpool:https://godoc.org/github.com/gammazero/workerpool
    gammazero/workerpool 可以无限制地提交任务,提供了更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前的 worker 数和任务数以及关闭 Pool 的功能。
    ivpusic/grpool:https://godoc.org/github.com/ivpusic/grpool
    grpool 创建 Pool 的时候需要提供 Worker 的数量和等待执行的任务的最大数量,任务的提交是直接往 Channel 放入任务。
    dpaks/goworkers:https://pkg.go.dev/github.com/dpaks/goworkers?utm_source=godoc
    dpaks/goworkers 提供了更便利的 Submit 方法提交任务以及 Worker 数、任务数等查询方法、关闭 Pool 的方法。它的任务的执行结果需要在 ResultChan 和 ErrChan 中去获取,没有提供阻塞的方法,但是它可以在初始化的时候设置 Worker 的数量和任务数。
    类似的 Worker Pool 的实现非常多,比如还有panjf2000/ants、Jeffail/tunny 、benmanns/goworker、go-playground/pool、Sherifabdlnaby/gpool等第三方库。pond也是一个非常不错的 Worker Pool,关注度目前不是很高,但是功能非常齐全
    在标准库 net/rpc 包中,Server 端需要解析大量客户端的请求(Request),这些短暂使用的 Request 是可以重用的。

    Context
    服务端接收到客户端的 HTTP 请求之后,可以把客户端的 IP 地址和端口、客户端的身份信息、请求接收的时间、Trace ID 等信息放入到上下文中,这个上下文可以在后端的方法调用中传递,后端的业务方法除了利用正常的参数做一些业务处理(如订单处理)之外,还可以从上下文读取到消息请求的时间、Trace ID 等信息,把服务处理的时间推送到 Trace 服务中。Trace 服务可以把同一 Trace ID 的不同方法的调用顺序和调用时间展示成流程图,方便跟踪。
    Go 标准库中的 Context 功能还不止于此,它还提供了超时(Timeout)和取消(Cancel)的机制。
    Context的问题:
    Context 包名导致使用的时候重复 ctx context.Context;
    Context.WithValue 可以接受任何类型的值,非类型安全;
    Context 包名容易误导人,实际上,Context 最主要的功能是取消 goroutine 的执行;
    Context 漫天飞,函数污染。
    Context使用场景:
    上下文信息传递 (request-scoped),比如处理 http 请求、在请求处理链路上传递信息;
    控制子 goroutine 的运行;
    超时控制的方法调用;
    可以取消的方法调用。
    Context使用方法

    1. type Context interface {
    2. Deadline() (deadline time.Time, ok bool)
    3. Done() <-chan struct{}
    4. Err() error
    5. Value(key interface{}) interface{}
    6. }

    Deadline 方法会返回这个 Context 被取消的截止日期。如果没有设置截止日期,ok 的值是 false。后续每次调用这个对象的 Deadline 方法时,都会返回和第一次调用相同的结果。
    Done 方法返回一个 Channel 对象。在 Context 被取消时,此 Channel 会被 close,如果没被取消,可能会返回 nil。后续的 Done 调用总是返回相同的结果。当 Done 被 close 的时候,你可以通过 ctx.Err 获取错误信息。Done 这个方法名其实起得并不好,因为名字太过笼统,不能明确反映 Done 被 close 的原因,因为 cancel、timeout、deadline 都可能导致 Done 被 close,不过,目前还没有一个更合适的方法名称。关于 Done 方法,你必须要记住的知识点就是:如果 Done 没有被 close,Err 方法返回 nil;如果 Done 被 close,Err 方法会返回 Done 被 close 的原因。
    Value 返回此 ctx 中和指定的 key 相关联的 value。
    context.Background():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。一般用在主函数、初始化、测试以及创建根 Context 的时候。
    context.TODO():返回一个非 nil 的、空的 Context,没有任何值,不会被 cancel,不会超时,没有截止日期。当你不清楚是否该用 Context,或者目前还不知道要传递一些什么上下文信息的时候,就可以使用这个方法。
    Context 的时候,有一些约定俗成的规则:
    1、一般函数使用 Context 的时候,会把这个参数放在第一个参数的位置。
    2、从来不把 nil 当做 Context 类型的参数值,可以使用 context.Background() 创建一个空的上下文对象,也不要使用 nil。
    3、Context 只用来临时做函数之间的上下文透传,不能持久化 Context 或者把 Context 长久保存。把 Context 持久化到数据库、本地文件或者全局变量、缓存中都是错误的用法。
    4、key 的类型不应该是字符串类型或者其它内建类型,否则容易在包之间使用 Context 时候产生冲突。使用 WithValue 时,key 的类型应该是自己定义的类型。
    5、常常使用 struct{}作为底层类型定义 key 的类型。对于 exported key 的静态类型,常常是接口或者指针。这样可以尽量减少内存分配。
    我们经常使用 Context 来取消一个 goroutine 的运行,这是 Context 最常用的场景之一,Context 也被称为 goroutine 生命周期范围(goroutine-scoped)的 Context,把 Context 传递给 goroutine。但是,goroutine 需要尝试检查 Context 的 Done 是否关闭了。
    WithValue方法
    WithValue 基于 parent Context 生成一个新的 Context,保存了一个 key-value 键值对。它常常用来传递上下文。

    1. type valueCtx struct {
    2. Context
    3. key, val interface{}
    4. }
    5. ctx = context.TODO()
    6. ctx = context.WithValue(ctx, "key1", "0001")
    7. ctx = context.WithValue(ctx, "key2", "0001")
    8. ctx = context.WithValue(ctx, "key3", "0001")
    9. ctx = context.WithValue(ctx, "key4", "0004")
    10. fmt.Println(ctx.Value("key1"))

    WithCancel方法
    WithCancel 方法返回 parent 的副本,只是副本中的 Done Channel 是新建的对象,它的类型是 cancelCtx。

    1. func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    2. c := newCancelCtx(parent)
    3. propagateCancel(parent, &c)// 把c朝上传播
    4. return &c, func() { c.cancel(true, Canceled) }
    5. }
    6. // newCancelCtx returns an initialized cancelCtx.
    7. func newCancelCtx(parent Context) cancelCtx {
    8. return cancelCtx{Context: parent}
    9. }

    WithTimeout方法

    1. func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    2. // 当前时间+timeout就是deadline
    3. return WithDeadline(parent, time.Now().Add(timeout))
    4. }

    WithDeadline方法

    1. func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    2. // 如果parent的截止时间更早,直接返回一个cancelCtx即可
    3. if cur, ok := parent.Deadline(); ok && cur.Before(d) {
    4. return WithCancel(parent)
    5. }
    6. c := &timerCtx{
    7. cancelCtx: newCancelCtx(parent),
    8. deadline: d,
    9. }
    10. propagateCancel(parent, c) // 同cancelCtx的处理逻辑
    11. dur := time.Until(d)
    12. if dur <= 0 { //当前时间已经超过了截止时间,直接cancel
    13. c.cancel(true, DeadlineExceeded)
    14. return c, func() { c.cancel(false, Canceled) }
    15. }
    16. c.mu.Lock()
    17. defer c.mu.Unlock()
    18. if c.err == nil {
    19. // 设置一个定时器,到截止时间后取消
    20. c.timer = time.AfterFunc(dur, func() {
    21. c.cancel(true, DeadlineExceeded)
    22. })
    23. }
    24. return c, func() { c.cancel(true, Canceled) }
    25. }
    1. func slowOperationWithTimeout(ctx context.Context) (Result, error) {
    2. ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
    3. defer cancel() // 一旦慢操作完成就立马调用cancel
    4. return slowOperation(ctx)
    5. }
    1. func main() {
    2. ctx, cancel := context.WithCancel(context.Background())
    3. go func() {
    4. defer func() {
    5. fmt.Println("goroutine exit")
    6. }()
    7. for {
    8. select {
    9. case <-ctx.Done():
    10. return
    11. default:
    12. time.Sleep(time.Second)
    13. }
    14. }
    15. }()
    16. time.Sleep(time.Second)
    17. cancel()
    18. time.Sleep(2 * time.Second)
    19. }

    如果你要为 Context 实现一个带超时功能的调用,比如访问远程的一个微服务,超时并不意味着你会通知远程微服务已经取消了这次调用,大概率的实现只是避免客户端的长时间等待,远程的服务器依然还执行着你的请求。所以,有时候,Context 并不会减少对服务器的请求负担。如果在 Context 被 cancel 的时候,你能关闭和服务器的连接,中断和数据库服务器的通讯、停止对本地文件的读写,那么,这样的超时处理,同时能减少对服务调用的压力,但是这依赖于你对超时的底层处理机制。
    使用 WithCancel 和 WithValue 写一个级联的使用 Context 的例子

    1. package test
    2. import (
    3. "context"
    4. "fmt"
    5. "time"
    6. )
    7. func main() {
    8. parent := context.Background()
    9. ctx, cancel := context.WithCancel(parent)
    10. child := context.WithValue(ctx, "name", "wuqq")
    11. go func() {
    12. for {
    13. select {
    14. case <-child.Done():
    15. fmt.Println("it's over")
    16. return
    17. default:
    18. res := child.Value("name")
    19. fmt.Println("name:", res)
    20. time.Sleep(1 * time.Second)
    21. }
    22. }
    23. }()
    24. go func() {
    25. time.Sleep(3 * time.Second)
    26. cancel()
    27. }()
    28. time.Sleep(5 * time.Second)
    29. }

    Go标准库 http 与 fasthttp 服务端性能比较https://mp.weixin.qq.com/s/67uobPK10n-3xWCYUtgtFg
    k8s技术学习,实验笔记整理、分享https://www.yuque.com/swcloud/k8s
    Go中的map有缩容机制吗?https://mp.weixin.qq.com/s/4OP2zMERTQGcpvZB36Otkw
    使用uptimed命令监视Linux系统正常运行时间https://mp.weixin.qq.com/s/Ipk-9J0TNoXnBSLqcySGOg
    历史归档 : https://iofunc.cn/index.php/category/DS/
    关于LeetCode的归档地址: https://iofunc.com/index.php/category/daykill/