“高并发 高性能 高可用” 一直以来作为搬砖界用力搬砖的口号。由于 CPU 一次读取存储数据的长度有限,比如 32bit 的平台修改 int64 需要被拆分成两次写操作,更何况对于结构体的赋值,那么对于高并发场景下我们怎么才能保证数据的完整性和一致性呢?

所以今天我们来聊聊 Go 的 atomic 包,它提供了低级别原子内存原语,对于实现同步算法起到很大作用。可以说是 Go 并发编程的基石,比如 MutexRWMutexWaitGroupOnce 等实现都依赖于 atomic 。当然其提供的功能需要格外小心才能正确使用, atomic 大致提供了 5 类原子操作,因为不会被 CPU 中断所以在多个 goroutine 之间访问是安全的。

  1. SwapT 函数实现的交换操作, 在原子上等价于
    old = *addr *addr = new return old

  2. CompareAndSwapT 函数实现的比较并交换操作, 在原子上等价于
    if *addr == old { *addr = new return true } return false

  3. AddT 函数实现的加法操作,原子上等价于
    *addr += delta return *addr

    1. LoadT
    2. StoreT
    3. return \*addr
    4. \*addr=val
    1. atomic.Value
    2. int32
    3. int64
    4. uint32
    5. uint64
    6. uintptr
    7. unsafe.Pointer

atomic 源码分析

然而我们在 atomic 包下只能看到类似如下的函数定义

  1. `// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
  2. func SwapInt32(addr *int32, new int32) (old int32)
  3. // SwapInt64 atomically stores new into *addr and returns the previous *addr value.
  4. func SwapInt64(addr *int64, new int64) (old int64)
  5. // SwapUint32 atomically stores new into *addr and returns the previous *addr value.
  6. func SwapUint32(addr *uint32, new uint32) (old uint32)
  7. ... ...
  8. `

并没有发现函数实现部分,但是能够找到相应汇编代码

  1. `TEXT ·SwapInt32(SB),NOSPLIT,$0
  2. JMP runtime∕internal∕atomic·Xchg(SB)
  3. TEXT ·SwapUint32(SB),NOSPLIT,$0
  4. JMP runtime∕internal∕atomic·Xchg(SB)
  5. TEXT ·SwapInt64(SB),NOSPLIT,$0
  6. JMP runtime∕internal∕atomic·Xchg64(SB)
  7. ... ...
  8. `

可见它们最终都是基于运行时中 runtime/internal/atomic 的实现。

提前剧透一下,原子操作其实最终依赖硬件指令的支持,但是因为原子操作可能会导致 goroutine 阻塞,所以它同时还需要运行时调度器的配合。

案例分析

atomic.CompareAndSwapPointer 为例,它只有函数定义没有函数体

通过查找,发现其本身由运行时实现

  1. `//go:linkname sync_atomic_CompareAndSwapPointer sync/atomic.CompareAndSwapPointer
  2. //go:nosplit
  3. func sync_atomic_CompareAndSwapPointer(ptr *unsafe.Pointer, old, new unsafe.Pointer) bool {
  4. if writeBarrier.enabled {
  5. atomicwb(ptr, new)
  6. }
  7. return sync_atomic_CompareAndSwapUintptr((*uintptr)(noescape(unsafe.Pointer(ptr))), uintptr(old), uintptr(new))
  8. }
  9. `

可以看到其最终调用了 sync_atomic_CompareAndSwapUintptr ,且 sync_atomic_CompareAndSwapUintptr 也只有函数定义没有函数体,而且也找不到运行时的实现,说明它是由编译器完成。那么来通过一个栗子一窥究竟吧

  1. `package main
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "unsafe"
  6. )
  7. func main() {
  8. var p unsafe.Pointer
  9. newP := 23
  10. atomic.CompareAndSwapPointer(&p, nil, unsafe.Pointer(&newP))
  11. v := (*int)(p)
  12. fmt.Println(*v)
  13. }
  14. `

先执行编译命令 go build -gcflags="-N -l -m" -o atomic atomic.go 得到二进制文件 atomic`

然后执行 go tool objdump -s “main.main” atomic 查看下 main.main`编译结果

  1. `TEXT main.main(SB) /Users/shangyindong/mywork/workspace/workspace_github/go-snippets/atomic/atomic.go
  2. .....
  3. atomic.go:12 0x10a6ff3 e82869fbff CALL sync/atomic.CompareAndSwapPointer(SB)
  4. ......
  5. `

go tool objdump -s "sync/atomic.CompareAndSwapPointer" atomic 接着看 CompareAndSwapPointer

  1. `TEXT sync/atomic.CompareAndSwapPointer(SB) /usr/local/go/src/runtime/atomic_pointer.go
  2. ......
  3. atomic_pointer.go:76 0x105d960 e85b8b0000 CALL sync/atomic.CompareAndSwapUintptr(SB)
  4. ......
  5. `

可以看到 CompareAndSwapPointer 实际调用了 CompareAndSwapUintptr 接着看 go tool objdump -s "sync/atomic.CompareAndSwapUintptr" atomic

  1. `TEXT sync/atomic.CompareAndSwapUintptr(SB) /usr/local/go/src/sync/atomic/asm.s
  2. asm.s:31 0x10664c0 e93bbaf9ff JMP runtime/internal/atomic.Casuintptr(SB)
  3. ......
  4. `

最终 JMP 跳转到了``, 这个方法为内置汇编,看下 asm_amd64.s 吧

  1. `TEXT runtime∕internal∕atomic·Casuintptr(SB), NOSPLIT, $0-25
  2. JMP runtime∕internal∕atomic·Cas64(SB)
  3. `

进而调整到 runtime/internal/atomic.Cas64

  1. `// bool runtime∕internal∕atomic·Cas64(uint64 *val, uint64 old, uint64 new)
  2. // Atomically:
  3. // if(*val == *old){
  4. // *val = new;
  5. // return 1;
  6. // } else {
  7. // return 0;
  8. // }
  9. TEXT runtime∕internal∕atomic·Cas64(SB), NOSPLIT, $0-25
  10. MOVQ ptr+0(FP), BX
  11. MOVQ old+8(FP), AX
  12. MOVQ new+16(FP), CX
  13. LOCK
  14. CMPXCHGQ CX, 0(BX)
  15. SETEQ ret+24(FP)
  16. RET
  17. `

到这里我们能够很清晰看到,本质上原子操作最终还是依赖于 CPU 的 Lock + CMPXCHGQ 指令, Cas64(SB) 总共包含 7 条指令

第一条指令:将 ptr 的值放入 BX

第二条指令:将假设的旧值放入 AX

第三条指令:将要比较的新值放入 CX

第四条指令: LOCK 并不是指令,而是作为指令前缀用来修饰 CMPXCHGQ CX, 0(BX)

大致有五类指令可强制使用 LOCK 语义,但当 LOCK 前缀被置于其他指令之前或者指令没有对内存进行写操作(目标操作数可能在寄存器中)时,会抛出一个 invalid-opcode 异常

  • 位测试和修改指令 (BTS,BTR,BTC)
  • 交换指令 (XADD,CMPXCHG,CMPXCHG8B)
  • XCHG 指令自动使用 LOCK 前缀
  • 单操作数算术和逻辑指令:INC,DEC,NOT,NEG
  • 双操作数算术和逻辑指令:ADD,ADC,SUB,SBB,AND,OR,XOR

对于 Intel486 和 Pentium 处理器,在进行加锁操作时,LOCK# 信号总是在总线上发出,甚至锁定的内存区域已经缓存在处理器中。这种通过封锁总线来禁止其他 CPU 对内存修改进而达到原子性的效果,显然锁的力度过于粗糙。

所以在 Pentium4,Intel Xeon,P6 系列已经最近的处理器,如果加锁的内存区域已经缓存在处理器中,处理器可能并不对总线发出 LOCK# 信号,而是仅仅修改缓存中的数据,然后依赖缓存一致性协议 (MESI 详见《手摸手 Go 深入剖析 sync.Pool》有讲解) 来保证加锁操作的自动执行。缓存一致性协议会自动阻止两个或多个缓存了同一区域内存的处理器同时修改数据。

感兴趣的可以详细研究下 intel 开发手册卷 3

第五条指令:调用 CMPXCHGQ ,将指令第二个操作数与累加器 AX 比较 ,如果相等, CX 更新到 BX ,否则 BX 更新到 AX`

第六条指令: AXCX 相等时将 1 写进 ret+16(FP) 否则写入 0

第七条指令:函数返回结束。

特殊的 atomic.Value

atomic.Value 是 Go 语言 1.4 版本的时候加入的,它相当于一个容器,可以原子的 StoreLoad 任意类型的值。是对 int32int64uint32uint64uintptrunsafe.Pointer 类型原子操作的补充。但它的实现不是通过汇编来完成,而是基于已有的 atomic 包。

看下它的基本结构

  1. `// Value的零值为nil
  2. // 使用后禁止拷贝
  3. type Value struct {
  4. v interface{}
  5. }
  6. // ifaceWords is interface{} internal representation.
  7. type ifaceWords struct {
  8. typ unsafe.Pointer
  9. data unsafe.Pointer
  10. }
  11. `

因为 atomic.Value 被设计为存储任意类型的值,所以它内部只有一个 interface{} 类型的字段。并且在 atomic/value.go 文件中还定义了一个 ifaceWords ,之前我们讲过 Go 的接口结构,是不是跟 eface 很像

  1. `type eface struct {
  2. _type *_type
  3. data unsafe.Pointer
  4. }
  5. `

其实 atomic.Value 的实现原理就是将 interface{} 类型分解,得到类型和数据这两个 unsafe.Pointer 类型字段,在针对它们进行原子操作来达到 interface{} 类型原子操作的目的。

unsafe.Pointer

我们知道 Go 语言的编译器会使用静态类型检查来保证程序运行的类型安全。但它的标准库中又提供了 unsafe.Pointer , 可以让程序灵活的操作内存并且可以绕过 Go 语言的类型检查,从而可以跟任意的指针类型相互转换。

例如 字符串和 byte 切片之前的零拷贝转换

  1. `type StringHeader struct {
  2. Data uintptr
  3. Len int
  4. }
  5. type SliceHeader struct {
  6. Data uintptr
  7. Len int
  8. Cap int
  9. }
  10. `

我们看到 slice 和 string 的底层数据结构基本一样,虽然 Go 语言的类型检查禁止了它们之间相互转换。但是拥有了 unsafe.Pointer 这个黑魔法,你就可以零拷贝实现[]byte 和 string 之间的转换,只需共享底层的 Data 和 Len 即可

  1. `func string2bytes(s string) []byte {
  2. return *(*[]byte)(unsafe.Pointer(&s))
  3. }
  4. func bytes2string(b []byte) string{
  5. return *(*string)(unsafe.Pointer(&b))
  6. }
  7. `

如果你搞清楚了 unsafe.Pointer , 那么接下来 atomic.Value 的神秘面纱也就很好揭开了。

atomic.Value 提供了 LoadStore 两个操作,完成数据的读取和存储。

写操作 Store

Store 大致逻辑:

  1. 将待存储的数据和当前的值分别转换为 *ifaceWords

  2. 进入一个无限 for 循环
    2.1 先检查现有值的 typ ,如果为 nil 表示这是第一次存储,则先调用 runtime_procPin() 通过修改当前 g 关联 m 的 locks 属性来禁止 P 被抢占
    2.2 尝试使用 CompareAndSwapPointer 将现有值的 typ 设置为 unsafe.Pointer(^uintptr(0)) 方便 Load 操作时判断当前状态,如果失败则解除抢占回到 for 循环开始位置继续执行
    2.3 如果设置成功,则可以完成第一次的数据存储

  3. 自旋等待中的 gorountine 如果发现 uintptr(typ) == ^uintptr(0) 表明第一次存储尚未完成则继续自旋等待

  4. 到这里说明第一次存储已经完成,则检查 Value 从始至终是否都是保存同一类型数据,不是则 panic

  5. 非第一次存储,则更新数据
    // Store 将Value的值设置为x // 给定值的所有Store调用都必须使用相同的具体类型否则会像存储nil值一样会发生panic func (v *Value) Store(x interface{}) { if x == nil { panic("sync/atomic: store of nil value into Value") } vp := (*ifaceWords)(unsafe.Pointer(v)) xp := (*ifaceWords)(unsafe.Pointer(&x)) for { typ := LoadPointer(&vp.typ) if typ == nil { // 尝试开始第一次存储 //禁止P被抢占 以便其他goroutine可以使用主动自旋等待来等待完成 //GC也互补偶然看到伪造的类型 runtime_procPin() // 加了个乐观锁 if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) { //没抢到机会 则退出 runtime_procUnpin() continue } // 完成第一次存储 StorePointer(&vp.data, xp.data) StorePointer(&vp.typ, xp.typ) runtime_procUnpin() return } if uintptr(typ) == ^uintptr(0) { // 第一次存储正在进行 继续等待 // 因为我们已经禁止了抢占所以我们可以继续自旋等待 continue } // 第一次存储完成 检查类型并且覆盖数据 if typ != xp.typ { panic("sync/atomic: store of inconsistently typed value into Value") } StorePointer(&vp.data, xp.data) return } }

读操作 Load

读取操作相对简单就不赘述。

  1. `// Load 返回最近一次Store存储的数据
  2. // 如果没有调用Store存储数据则返回nil
  3. func (v *Value) Load() (x interface{}) {
  4. vp := (*ifaceWords)(unsafe.Pointer(v))
  5. typ := LoadPointer(&vp.typ)
  6. if typ == nil || uintptr(typ) == ^uintptr(0) { 未调用Store或第一次存储尚未完成 直接返回nil
  7. // 第一次存储尚未完成
  8. return nil
  9. }
  10. data := LoadPointer(&vp.data)
  11. xp := (*ifaceWords)(unsafe.Pointer(&x))
  12. xp.typ = typ
  13. xp.data = data
  14. return
  15. }`

关于 bug

atomic 包中有一段这样的注释

  1. BUG(rsc): On 386, the 64\-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64\-bit functions use instructions unavailable before the ARMv6k core. On ARM, 386, and 32\-bit MIPS, it is the caller's responsibility to arrange for 64\-bit alignment of 64\-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64\-bit aligned.

在《手摸手 Go 你的内存对齐了吗?》中有聊到过内存对齐的内容。(不同硬件平台并不是都可以在任意地址上访问任意数据;而且如果数据没有内存对齐可能会导致 CPU 访问两次内存才能拿到数据,如果内存对齐一次就能完成数据读取。)

这里大概是说在 ARM,386,和 32 位 MIPS,调用者有责任安排原子访问的 64 位字按照 8 字节对齐,否则程序会 panic。因为不同平台上的编译器有自己的对齐系数,32bit 平台上一般是 4 字节对齐,而在 64bit 平台上一般是 8 字节对齐。所以 32bit 平台上 8 字节数字可能会因为内存对齐拆分成 2 个 4 字节分布。

举个栗子

  1. `package main
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. )
  6. type M struct {
  7. x int64
  8. u uint32
  9. v int64
  10. }
  11. func main() {
  12. m := M{}
  13. result := atomic.AddInt64(&m.v, 1)
  14. fmt.Println(result)
  15. }
  16. `

GOARCH=amd64 go build pointer.go && ./pointer 执行正常

但是在 386 上 GOARCH=386 go build pointer.go && ./pointer 程序发生 panic

  1. `panic: runtime error: invalid memory address or nil pointer dereference
  2. [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x2f7c]
  3. goroutine 1 [running]:
  4. runtime/internal/atomic.Xadd64(0x1141612c, 0x1, 0x0, 0x53f8, 0x1141a230)
  5. /usr/local/go/src/runtime/internal/atomic/asm_386.s:105 +0xc
  6. main.main()
  7. /Users/mywork/workspace/workspace_go/godemo/pointer/pointer.go:16 +0x40
  8. `

总结

通过阅读源码,很显然 atomic 包中的原子操作均为底层硬件指令的协助完成,不需要加锁和解锁过程,所以对于单一变量更新保护,原子操作用起来更高效。文章篇幅关系我们这里只分析 CompareAndSwapPointer , 至于其他原子操作具体底层的硬件指令感兴趣的童鞋可以继续探索。
https://www.tuicool.com/articles/u6FNjqe