golang 在1.6版本以后引入了并发安全的 map 类型 Sync Map,这里对这个可以支持并发安全的 map 做一个分析和源码解读。
原理
通过引入两个map,将读写分离到不同的map,其中read map只提供读,而dirty map则负责写. 这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数到达一定数量后,将dirty map上升为read map.
另外,虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值.
具体流程: 如插入key 1,2,3时均插入了dirty map中,此时read map没有key值,读取时从dirty map中读取,并记录miss数
当累积的 miss 数量大于 dirty map 的长度的时候,会将 dirty map 直接升级为 read map
当有新的key 4插入时,将read map中的key值拷贝到dirty map中,这样dirty map就含有所有的值,下次升级为read map时直接进行地址拷贝,这块逻辑的源码会在新元素存储的时候体现
源码分析
entry结构,用于保存value的interface指针,通过atomic进行原子操作.
type entry struct {
p unsafe.Pointer // *interface{}
}
Map结构, 主结构,提供对外的方法,以及数据存储.
type Map struct {
mu Mutex
//存储readOnly,不加锁的情况下,对其进行并发读取
read atomic.Value // readOnly
//dirty map用于存储写入的数据,能直接升级成read map.
dirty map[interface{}]*entry
//misses 主要记录read读取不到数据加锁读取read map以及dirty map的次数.
misses int
}
readOnly 结构, 主要用于存储
// readOnly 通过原子操作存储在Map.read中,
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
Load方法
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
//加锁,然后再读取一遍read map中内容,主要防止在加锁的过程中,dirty map转换成read map,从而导致读取不到数据.
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
//记录miss数, 在dirty map提升为read map之前,
//这个key值都必须在加锁的情况下在dirty map中读取到.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
Store方法
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
//如果在read map读取到值,则尝试使用原子操作直接对值进行更新,更新成功则返回
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
//如果未在read map中读取到值或读取到值进行更新时更新失败,则加锁进行后续处理
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
//在检查一遍read,如果读取到的值处于删除状态,将值写入dirty map中
if e.unexpungeLocked() {
m.dirty[key] = e
}
//使用原子操作更新key对应的值
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//如果在dirty map中读取到值,则直接使用原子操作更新值
e.storeLocked(&value)
} else {
//如果dirty map中不含有值,则说明dirty map已经升级为read map,或者第一次进入
//需要初始化dirty map,并将read map的key添加到新创建的dirty map中.
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
LoadOrStore方法
代码逻辑和Store类似
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// 不加锁的情况下读取read map
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
//如果读取到值则尝试对值进行更新或读取
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
// 在加锁的请求下在确定一次read map
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
Range 方法
func (m *Map) Range(f func(key, value interface{}) bool) {
//先获取read map中值
read, _ := m.read.Load().(readOnly)
//如果dirty map中还有值,则进行加锁检测
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
//将dirty map中赋给read,因为dirty map包含了所有的值
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
//进行遍历
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
Delete 方法
func (m *Map) Delete(key interface{}) {
//首先获取read map
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
//加锁二次检测
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//没有在read map中获取到值,到dirty map中删除
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
局限性
从以上的源码可知,sync.map并不适合同时存在大量读写的场景,大量的写会导致read map读取不到数据从而加锁进行进一步读取,同时dirty map不断升级为read map. 从而导致整体性能较低,特别是针对cache场景.针对append-only以及大量读,少量写场景使用sync.map则相对比较合适