“高并发 高性能 高可用” 一直以来作为搬砖界用力搬砖的口号。由于 CPU 一次读取存储数据的长度有限,比如 32bit 的平台修改 int64 需要被拆分成两次写操作,更何况对于结构体的赋值,那么对于高并发场景下我们怎么才能保证数据的完整性和一致性呢?
所以今天我们来聊聊 Go 的 atomic
包,它提供了低级别原子内存原语,对于实现同步算法起到很大作用。可以说是 Go 并发编程的基石,比如 Mutex
、 RWMutex
、 WaitGroup
、 Once
等实现都依赖于 atomic
。当然其提供的功能需要格外小心才能正确使用, atomic
大致提供了 5 类原子操作,因为不会被 CPU 中断所以在多个 goroutine
之间访问是安全的。
由
SwapT
函数实现的交换操作, 在原子上等价于old = *addr *addr = new return old
由
CompareAndSwapT
函数实现的比较并交换操作, 在原子上等价于if *addr == old { *addr = new return true } return false
由
AddT
函数实现的加法操作,原子上等价于*addr += delta return *addr
LoadT
StoreT
return \*addr
\*addr=val
atomic.Value
int32
int64
uint32
uint64
uintptr
unsafe.Pointer
atomic 源码分析
然而我们在 atomic
包下只能看到类似如下的函数定义
`// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)
// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)
// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)
... ...
`
并没有发现函数实现部分,但是能够找到相应汇编代码
`TEXT ·SwapInt32(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Xchg(SB)
TEXT ·SwapUint32(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Xchg(SB)
TEXT ·SwapInt64(SB),NOSPLIT,$0
JMP runtime∕internal∕atomic·Xchg64(SB)
... ...
`
可见它们最终都是基于运行时中 runtime/internal/atomic
的实现。
提前剧透一下,原子操作其实最终依赖硬件指令的支持,但是因为原子操作可能会导致 goroutine
阻塞,所以它同时还需要运行时调度器的配合。
案例分析
以 atomic.CompareAndSwapPointer
为例,它只有函数定义没有函数体
通过查找,发现其本身由运行时实现
`//go:linkname sync_atomic_CompareAndSwapPointer sync/atomic.CompareAndSwapPointer
//go:nosplit
func sync_atomic_CompareAndSwapPointer(ptr *unsafe.Pointer, old, new unsafe.Pointer) bool {
if writeBarrier.enabled {
atomicwb(ptr, new)
}
return sync_atomic_CompareAndSwapUintptr((*uintptr)(noescape(unsafe.Pointer(ptr))), uintptr(old), uintptr(new))
}
`
可以看到其最终调用了 sync_atomic_CompareAndSwapUintptr
,且 sync_atomic_CompareAndSwapUintptr
也只有函数定义没有函数体,而且也找不到运行时的实现,说明它是由编译器完成。那么来通过一个栗子一窥究竟吧
`package main
import (
"fmt"
"sync/atomic"
"unsafe"
)
func main() {
var p unsafe.Pointer
newP := 23
atomic.CompareAndSwapPointer(&p, nil, unsafe.Pointer(&newP))
v := (*int)(p)
fmt.Println(*v)
}
`
先执行编译命令 go build -gcflags="-N -l -m" -o atomic atomic.go 得到二进制文件
atomic`
然后执行 go tool objdump -s “main.main” atomic 查看下
main.main`编译结果
`TEXT main.main(SB) /Users/shangyindong/mywork/workspace/workspace_github/go-snippets/atomic/atomic.go
.....
atomic.go:12 0x10a6ff3 e82869fbff CALL sync/atomic.CompareAndSwapPointer(SB)
......
`
go tool objdump -s "sync/atomic.CompareAndSwapPointer" atomic
接着看 CompareAndSwapPointer
`TEXT sync/atomic.CompareAndSwapPointer(SB) /usr/local/go/src/runtime/atomic_pointer.go
......
atomic_pointer.go:76 0x105d960 e85b8b0000 CALL sync/atomic.CompareAndSwapUintptr(SB)
......
`
可以看到 CompareAndSwapPointer
实际调用了 CompareAndSwapUintptr
接着看 go tool objdump -s "sync/atomic.CompareAndSwapUintptr" atomic
`TEXT sync/atomic.CompareAndSwapUintptr(SB) /usr/local/go/src/sync/atomic/asm.s
asm.s:31 0x10664c0 e93bbaf9ff JMP runtime/internal/atomic.Casuintptr(SB)
......
`
最终 JMP 跳转到了``, 这个方法为内置汇编,看下 asm_amd64.s 吧
`TEXT runtime∕internal∕atomic·Casuintptr(SB), NOSPLIT, $0-25
JMP runtime∕internal∕atomic·Cas64(SB)
`
进而调整到 runtime/internal/atomic.Cas64
`// bool runtime∕internal∕atomic·Cas64(uint64 *val, uint64 old, uint64 new)
// Atomically:
// if(*val == *old){
// *val = new;
// return 1;
// } else {
// return 0;
// }
TEXT runtime∕internal∕atomic·Cas64(SB), NOSPLIT, $0-25
MOVQ ptr+0(FP), BX
MOVQ old+8(FP), AX
MOVQ new+16(FP), CX
LOCK
CMPXCHGQ CX, 0(BX)
SETEQ ret+24(FP)
RET
`
到这里我们能够很清晰看到,本质上原子操作最终还是依赖于 CPU 的 Lock
+ CMPXCHGQ
指令, Cas64(SB)
总共包含 7 条指令
第一条指令:将 ptr 的值放入 BX
第二条指令:将假设的旧值放入 AX
第三条指令:将要比较的新值放入 CX
第四条指令: LOCK
并不是指令,而是作为指令前缀用来修饰 CMPXCHGQ CX, 0(BX)
的
大致有五类指令可强制使用 LOCK 语义,但当 LOCK 前缀被置于其他指令之前或者指令没有对内存进行写操作(目标操作数可能在寄存器中)时,会抛出一个 invalid-opcode 异常
- 位测试和修改指令 (BTS,BTR,BTC)
- 交换指令 (XADD,CMPXCHG,CMPXCHG8B)
- XCHG 指令自动使用 LOCK 前缀
- 单操作数算术和逻辑指令:INC,DEC,NOT,NEG
- 双操作数算术和逻辑指令:ADD,ADC,SUB,SBB,AND,OR,XOR
对于 Intel486 和 Pentium 处理器,在进行加锁操作时,LOCK# 信号总是在总线上发出,甚至锁定的内存区域已经缓存在处理器中。这种通过封锁总线来禁止其他 CPU 对内存修改进而达到原子性的效果,显然锁的力度过于粗糙。
所以在 Pentium4,Intel Xeon,P6 系列已经最近的处理器,如果加锁的内存区域已经缓存在处理器中,处理器可能并不对总线发出 LOCK# 信号,而是仅仅修改缓存中的数据,然后依赖缓存一致性协议 (MESI 详见《手摸手 Go 深入剖析 sync.Pool》有讲解) 来保证加锁操作的自动执行。缓存一致性协议会自动阻止两个或多个缓存了同一区域内存的处理器同时修改数据。
感兴趣的可以详细研究下 intel 开发手册卷 3
第五条指令:调用 CMPXCHGQ
,将指令第二个操作数与累加器 AX
比较 ,如果相等, CX
更新到 BX
,否则 BX 更新到
AX`
第六条指令: AX
和 CX
相等时将 1 写进 ret+16(FP)
否则写入 0
第七条指令:函数返回结束。
特殊的 atomic.Value
atomic.Value
是 Go 语言 1.4 版本的时候加入的,它相当于一个容器,可以原子的 Store
和 Load
任意类型的值。是对 int32
、 int64
、 uint32
、 uint64
、 uintptr
和 unsafe.Pointer
类型原子操作的补充。但它的实现不是通过汇编来完成,而是基于已有的 atomic
包。
看下它的基本结构
`// Value的零值为nil
// 使用后禁止拷贝
type Value struct {
v interface{}
}
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
`
因为 atomic.Value
被设计为存储任意类型的值,所以它内部只有一个 interface{}
类型的字段。并且在 atomic/value.go
文件中还定义了一个 ifaceWords
,之前我们讲过 Go 的接口结构,是不是跟 eface
很像
`type eface struct {
_type *_type
data unsafe.Pointer
}
`
其实 atomic.Value
的实现原理就是将 interface{}
类型分解,得到类型和数据这两个 unsafe.Pointer
类型字段,在针对它们进行原子操作来达到 interface{}
类型原子操作的目的。
unsafe.Pointer
我们知道 Go 语言的编译器会使用静态类型检查来保证程序运行的类型安全。但它的标准库中又提供了 unsafe.Pointer
, 可以让程序灵活的操作内存并且可以绕过 Go 语言的类型检查,从而可以跟任意的指针类型相互转换。
例如 字符串和 byte 切片之前的零拷贝转换
`type StringHeader struct {
Data uintptr
Len int
}
type SliceHeader struct {
Data uintptr
Len int
Cap int
}
`
我们看到 slice 和 string 的底层数据结构基本一样,虽然 Go 语言的类型检查禁止了它们之间相互转换。但是拥有了 unsafe.Pointer
这个黑魔法,你就可以零拷贝实现[]byte 和 string 之间的转换,只需共享底层的 Data 和 Len 即可
`func string2bytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}
func bytes2string(b []byte) string{
return *(*string)(unsafe.Pointer(&b))
}
`
如果你搞清楚了 unsafe.Pointer
, 那么接下来 atomic.Value
的神秘面纱也就很好揭开了。
atomic.Value
提供了 Load
和 Store
两个操作,完成数据的读取和存储。
写操作 Store
Store 大致逻辑:
将待存储的数据和当前的值分别转换为
*ifaceWords
进入一个无限 for 循环
2.1 先检查现有值的typ
,如果为 nil 表示这是第一次存储,则先调用runtime_procPin()
通过修改当前 g 关联 m 的 locks 属性来禁止 P 被抢占
2.2 尝试使用CompareAndSwapPointer
将现有值的typ
设置为unsafe.Pointer(^uintptr(0))
方便Load
操作时判断当前状态,如果失败则解除抢占回到 for 循环开始位置继续执行
2.3 如果设置成功,则可以完成第一次的数据存储自旋等待中的
gorountine
如果发现uintptr(typ) == ^uintptr(0)
表明第一次存储尚未完成则继续自旋等待到这里说明第一次存储已经完成,则检查 Value 从始至终是否都是保存同一类型数据,不是则 panic
非第一次存储,则更新数据
// Store 将Value的值设置为x // 给定值的所有Store调用都必须使用相同的具体类型否则会像存储nil值一样会发生panic func (v *Value) Store(x interface{}) { if x == nil { panic("sync/atomic: store of nil value into Value") } vp := (*ifaceWords)(unsafe.Pointer(v)) xp := (*ifaceWords)(unsafe.Pointer(&x)) for { typ := LoadPointer(&vp.typ) if typ == nil { // 尝试开始第一次存储 //禁止P被抢占 以便其他goroutine可以使用主动自旋等待来等待完成 //GC也互补偶然看到伪造的类型 runtime_procPin() // 加了个乐观锁 if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) { //没抢到机会 则退出 runtime_procUnpin() continue } // 完成第一次存储 StorePointer(&vp.data, xp.data) StorePointer(&vp.typ, xp.typ) runtime_procUnpin() return } if uintptr(typ) == ^uintptr(0) { // 第一次存储正在进行 继续等待 // 因为我们已经禁止了抢占所以我们可以继续自旋等待 continue } // 第一次存储完成 检查类型并且覆盖数据 if typ != xp.typ { panic("sync/atomic: store of inconsistently typed value into Value") } StorePointer(&vp.data, xp.data) return } }
读操作 Load
读取操作相对简单就不赘述。
`// Load 返回最近一次Store存储的数据
// 如果没有调用Store存储数据则返回nil
func (v *Value) Load() (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) { 未调用Store或第一次存储尚未完成 直接返回nil
// 第一次存储尚未完成
return nil
}
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}`
关于 bug
atomic
包中有一段这样的注释
BUG(rsc): On 386, the 64\-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64\-bit functions use instructions unavailable before the ARMv6k core. On ARM, 386, and 32\-bit MIPS, it is the caller's responsibility to arrange for 64\-bit alignment of 64\-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64\-bit aligned.
在《手摸手 Go 你的内存对齐了吗?》中有聊到过内存对齐的内容。(不同硬件平台并不是都可以在任意地址上访问任意数据;而且如果数据没有内存对齐可能会导致 CPU 访问两次内存才能拿到数据,如果内存对齐一次就能完成数据读取。)
这里大概是说在 ARM,386,和 32 位 MIPS,调用者有责任安排原子访问的 64 位字按照 8 字节对齐,否则程序会 panic。因为不同平台上的编译器有自己的对齐系数,32bit 平台上一般是 4 字节对齐,而在 64bit 平台上一般是 8 字节对齐。所以 32bit 平台上 8 字节数字可能会因为内存对齐拆分成 2 个 4 字节分布。
举个栗子
`package main
import (
"fmt"
"sync/atomic"
)
type M struct {
x int64
u uint32
v int64
}
func main() {
m := M{}
result := atomic.AddInt64(&m.v, 1)
fmt.Println(result)
}
`
GOARCH=amd64 go build pointer.go && ./pointer
执行正常
但是在 386 上 GOARCH=386 go build pointer.go && ./pointer
程序发生 panic
`panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x2f7c]
goroutine 1 [running]:
runtime/internal/atomic.Xadd64(0x1141612c, 0x1, 0x0, 0x53f8, 0x1141a230)
/usr/local/go/src/runtime/internal/atomic/asm_386.s:105 +0xc
main.main()
/Users/mywork/workspace/workspace_go/godemo/pointer/pointer.go:16 +0x40
`
总结
通过阅读源码,很显然 atomic
包中的原子操作均为底层硬件指令的协助完成,不需要加锁和解锁过程,所以对于单一变量更新保护,原子操作用起来更高效。文章篇幅关系我们这里只分析 CompareAndSwapPointer
, 至于其他原子操作具体底层的硬件指令感兴趣的童鞋可以继续探索。
https://www.tuicool.com/articles/u6FNjqe