atomic
atomic read-modify-write : https://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/
Load 和 Store:https://preshing.com/20130618/atomic-vs-non-atomic-operations/
假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?
1.使用加锁方式(Mutex和RWMutex)。
2.atomic (你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了)。
3.atomic 实现自己定义的基本并发原语:CondMutex、Mutex.LockContext、WaitGroup.Go
4.atomic 原子操作还是实现 lock-free 数据结构的基石。
lock-free:https://docs.microsoft.com/zh-cn/windows/win32/dxtecharts/lockless-programming
5.atomic 为了支持 int32、int64、uint32、uint64、uintptr、Pointer(Add 方法不支持)类型,分别提供了 AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等方法。
6.atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。
7.uint32 = AddUint32(&x, ^uint32(c-1))和uint64 = AddUint32(&x, ^uint32(0))
8.CAS = func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
9.Swap = (old = addr *addr = new return old)
Value
type Config struct {NodeName stringAddr stringCount int32}func loadNewConfig() Config {return Config{NodeName: "北京",Addr: "10.77.95.27",Count: rand.Int31(),}}func main() {var config atomic.Valueconfig.Store(loadNewConfig())var cond = sync.NewCond(&sync.Mutex{})// 设置新的configgo func() {for {time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)config.Store(loadNewConfig())cond.Broadcast() // 通知等待着配置已变更}}()go func() {for {cond.L.Lock()cond.Wait() // 等待变更信号c := config.Load().(Config) // 读取新的配置fmt.Printf("new config: %+v\n", c)cond.L.Unlock()}}()select {}}
uber-go/atomic:https://github.com/uber-go/atomic
Lock-Free queue
package queueimport ("sync/atomic""unsafe")// lock-free的queuetype LKQueue struct {head unsafe.Pointertail unsafe.Pointer}// 通过链表实现,这个数据结构代表链表中的节点type node struct {value interface{}next unsafe.Pointer}func NewLKQueue() *LKQueue {n := unsafe.Pointer(&node{})return &LKQueue{head: n, tail: n}}// 入队func (q *LKQueue) Enqueue(v interface{}) {n := &node{value: v}for {tail := load(&q.tail)next := load(&tail.next)if tail == load(&q.tail) { // 尾还是尾if next == nil { // 还没有新数据入队if cas(&tail.next, next, n) { //增加到队尾cas(&q.tail, tail, n) //入队成功,移动尾巴指针return}} else { // 已有新数据加到队列后面,需要移动尾指针cas(&q.tail, tail, next)}}}}// 出队,没有元素则返回nilfunc (q *LKQueue) Dequeue() interface{} {for {head := load(&q.head)tail := load(&q.tail)next := load(&head.next)if head == load(&q.head) { // head还是那个headif head == tail { // head和tail一样if next == nil { // 说明是空队列return nil}// 只是尾指针还没有调整,尝试调整它指向下一个cas(&q.tail, tail, next)} else {// 读取出队的数据v := next.value// 既然要出队了,头指针移动到下一个if cas(&q.head, head, next) {return v // Dequeue is done. return}}}}}// 将unsafe.Pointer原子加载转换成nodefunc load(p *unsafe.Pointer) (n *node) {return (*node)(atomic.LoadPointer(p))}// 封装CAS,避免直接将*node转换成unsafe.Pointerfunc cas(p *unsafe.Pointer, old, new *node) (ok bool) {return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))}
参考:https://github.com/golang/go/issues/39351
参考:https://dave.cheney.net/2018/01/06/if-aligned-memory-writes-are-atomic-why-do-we-need-the-sync-atomic-package
恰好老婆大人是做芯片MMU相关工作的,咨询了一下她,她告诉我现代的CPU基本上都在硬件层面保证了多核之间数据视图的一致性,也就是说普通的LOAD/STORE命令在硬件层面处理器就可以保证cache的一致性。如果是这样的话,那是不是可以理解为atomic包对指针的作用,主要是防止编译器做指令重排呢?因为编译器在这些现代架构上没必要使用特殊的指令了。
Channel
https://github.com/docker/libchan
https://github.com/tylertreat/chan
论文:https://www.cs.cmu.edu/~crary/819-f09/Hoare78.pdf
历史:https://swtch.com/~rsc/thread/
1.CSP 允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信。
2.Channel 类型是 Go 语言内置的类型,你无需引入某个包,就能使用它。
3.执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分享数据。
4.数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
5.数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
6.信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
7.任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
8.锁:利用 Channel 也可以实现互斥锁的机制。
9.(为了说起来方便,我们下面都把 Channel 叫做 chan)分为只能接收、只能发送、既可以接收又可以发送三种类型(
ChannelType = ( “chan” | “chan” “<-“ | “<-“ “chan” ) ElementType )
10.这个箭头总是射向左边的,元素类型总在最右边。如果箭头指向 chan,就表示可以往 chan 中塞数据;如果箭头远离 chan,就表示 chan 会往外吐数据。
11.“<-”有个规则,总是尽量和左边的 chan 结合(The <- operator associates with the leftmost chan possible:)
chan<- (chan int) // <- 和第一个chan结合chan<- (<-chan int) // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合<-chan (<-chan int) // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合chan (<-chan int) // 因为括号的原因,<-和括号内第一个chan结合
12.nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。
13.发送数据(
ch <- 2000)
14.接收数据(
x := <-ch // 把接收的一条数据赋值给变量x
;foo(<-ch) // 把接收的一个的数据作为参数传给函数
;<-ch // 丢弃接收的一条数据)
15.send 和 recv 都可以作为 select 语句的 case clause
func main() {var ch = make(chan int, 10)for i := 0; i < 10; i++ {select {case ch <- i:case v := <-ch:fmt.Println(v)}}}
16.chan 还可以应用于 for-range
for v := range ch {fmt.Println(v)}
17.或者是忽略读取的值,只是清空 chan
for range ch {}
Channel 的实现原理
qcount:代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。dataqsiz:队列的大小。chan 使用一个循环队列来存放元素,循环队列很适合这种生产者 - 消费者的场景(我很好奇为什么这个字段省略 size 中的 e)。buf:存放元素的循环队列的 buffer。elemtype 和 elemsize:chan 中元素的类型和 size。因为 chan 一旦声明,它的元素类型是固定的,即普通类型或者指针类型,所以元素大小也是固定的。sendx:处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置。buf 的总大小是 elemsize 的整数倍,而且 buf 是一个循环列表。recvx:处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置。recvq:chan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中。sendq:如果生产者因为 buf 满了而阻塞,会被加入到 sendq 队列中。
makechan
func makechan(t *chantype, size int) *hchan {elem := t.elem// 略去检查代码mem, overflow := math.MulUintptr(elem.size, uintptr(size))//var c *hchanswitch {case mem == 0:// chan的size或者元素的size是0,不必创建bufc = (*hchan)(mallocgc(hchanSize, nil, true))c.buf = c.raceaddr()case elem.ptrdata == 0:// 元素不是指针,分配一块连续的内存给hchan数据结构和bufc = (*hchan)(mallocgc(hchanSize+mem, nil, true))// hchan数据结构后面紧接着就是bufc.buf = add(unsafe.Pointer(c), hchanSize)default:// 元素包含指针,那么单独分配bufc = new(hchan)c.buf = mallocgc(mem, elem, true)}// 元素大小、类型、容量都记录下来c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)return c}
send
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())}func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 第一部分if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}......}
recv
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return}func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 第一部分,chan为nilif c == nil {if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}
close
func closechan(c *hchan) {if c == nil { // chan为nil, panicpanic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {// chan已经closed, panicunlock(&c.lock)panic(plainError("close of closed channel"))}c.closed = 1var glist gList// 释放所有的readerfor {sg := c.recvq.dequeue()......gp := sg.g......glist.push(gp)}// 释放所有的writer (它们会panic)for {sg := c.sendq.dequeue()......gp := sg.g......glist.push(gp)}unlock(&c.lock)for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}}
论文:https://songlh.github.io/paper/go-study.pdf
使用 Channel 最常见的错误是 panic 和 goroutine 泄漏。
1.close 为 nil 的 chan;
2.send 已经 close 的 chan;
3.close 已经 close 的 chan。
并发:
共享资源的并发访问使用传统并发原语;复杂的任务编排和消息传递使用 Channel;消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;需要和 Select 语句结合,使用 Channel;需要和超时配合时,使用 Channel 和 Context。

注意:只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。
问:有一道经典的使用 Channel 进行任务编排的题,你可以尝试做一下:有四个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4、……的顺序打印出来。
func main() {ch := make(chan struct{})for i := 1; i <= 4; i++ {go func(index int) {time.Sleep(time.Duration(index*10) * time.Millisecond)for {<-chfmt.Printf("I am No %d Goroutine\n", index)time.Sleep(time.Second)ch <- struct{}{}}}(i)}ch <- struct{}{}time.Sleep(time.Minute)}
func main() {ch1 := make(chan int)ch2 := make(chan int)ch3 := make(chan int)ch4 := make(chan int)go func() {for {fmt.Println("I'm goroutine 1")time.Sleep(1 * time.Second)ch2 <-1 //I'm done, you turn<-ch1}}()go func() {for {<-ch2fmt.Println("I'm goroutine 2")time.Sleep(1 * time.Second)ch3 <-1}}()go func() {for {<-ch3fmt.Println("I'm goroutine 3")time.Sleep(1 * time.Second)ch4 <-1}}()go func() {for {<-ch4fmt.Println("I'm goroutine 4")time.Sleep(1 * time.Second)ch1 <-1}}()select {}}
问:chan T 是否可以给 <- chan T 和 chan<- T 类型的变量赋值?反过来呢?
答:双向通道可以赋值给单向,反过来不可以.
反射Channel
select {case v := <-ch1:fmt.Println(v)case v := <-ch2:fmt.Println(v)}
Select 的方法
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
处理不定数据的Channel
func main() {var ch1 = make(chan int, 10)var ch2 = make(chan int, 10)// 创建SelectCasevar cases = createCases(ch1, ch2)// 执行10次selectfor i := 0; i < 10; i++ {chosen, recv, ok := reflect.Select(cases)if recv.IsValid() { // recv casefmt.Println("recv:", cases[chosen].Dir, recv, ok)} else { // send casefmt.Println("send:", cases[chosen].Dir, ok)}}}func createCases(chs ...chan int) []reflect.SelectCase {var cases []reflect.SelectCase// 创建recv casefor _, ch := range chs {cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv,Chan: reflect.ValueOf(ch),})}// 创建send casefor i, ch := range chs {v := reflect.ValueOf(i)cases = append(cases, reflect.SelectCase{Dir: reflect.SelectSend,Chan: reflect.ValueOf(ch),Send: v,})}return cases}
worker 池:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
“击鼓传花”
type Token struct{}func newWorker(id int, ch chan Token, nextCh chan Token) {for {token := <-ch // 取得令牌fmt.Println((id + 1)) // id从1开始time.Sleep(time.Second)nextCh <- token}}func main() {chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}// 创建4个workerfor i := 0; i < 4; i++ {go newWorker(i, chs[i], chs[(i+1)%4])}//首先把令牌交给第一个workerchs[0] <- struct{}{}select {}}
信号通知
func main() {go func() {...... // 执行业务处理}()// 处理CTRL+C等中断信号termChan := make(chan os.Signal)signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)<-termChan// 执行退出之前的清理动作doCleanup()fmt.Println("优雅退出")}
程序退出
1.closing,代表程序退出,但是清理工作还没做;
2.closed,代表清理工作已经做完。
func main() {var closing = make(chan struct{})var closed = make(chan struct{})go func() {// 模拟业务处理for {select {case <-closing:returndefault:// ....... 业务计算time.Sleep(100 * time.Millisecond)}}}()// 处理CTRL+C等中断信号termChan := make(chan os.Signal)signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)<-termChanclose(closing)// 执行退出之前的清理动作go doCleanup(closed)select {case <-closed:case <-time.After(time.Second):fmt.Println("清理超时,不等了")}fmt.Println("优雅退出")}func doCleanup(closed chan struct{}) {time.Sleep((time.Minute))close(closed)}
锁
一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。
// 使用chan实现互斥锁type Mutex struct {ch chan struct{}}// 使用锁需要初始化func NewMutex() *Mutex {mu := &Mutex{make(chan struct{}, 1)}mu.ch <- struct{}{}return mu}// 请求锁,直到获取到func (m *Mutex) Lock() {<-m.ch}// 解锁func (m *Mutex) Unlock() {select {case m.ch <- struct{}{}:default:panic("unlock of unlocked mutex")}}// 尝试获取锁func (m *Mutex) TryLock() bool {select {case <-m.ch:return truedefault:}return false}// 加入一个超时的设置func (m *Mutex) LockTimeout(timeout time.Duration) bool {timer := time.NewTimer(timeout)select {case <-m.ch:timer.Stop()return truecase <-timer.C:}return false}// 锁是否已被持有func (m *Mutex) IsLocked() bool {return len(m.ch) == 0}func main() {m := NewMutex()ok := m.TryLock()fmt.Printf("locked v %v\n", ok)ok = m.TryLock()fmt.Printf("locked %v\n", ok)}
你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。
任务编排(总共 5 种,分别是 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce)
Or-Done 模式(你发送同一个请求到多个微服务节点,只要任意一个微服务节点返回结果,就算成功)
func or(channels ...<-chan interface{}) <-chan interface{} {// 特殊情况,只有零个或者1个chanswitch len(channels) {case 0:return nilcase 1:return channels[0]}orDone := make(chan interface{})go func() {defer close(orDone)switch len(channels) {case 2: // 2个也是一种特殊情况select {case <-channels[0]:case <-channels[1]:}default: //超过两个,二分法递归处理m := len(channels) / 2select {case <-or(channels[:m]...):case <-or(channels[m:]...):}}}()return orDone}
当 chan 的数量大于 2 时,使用递归的方式等待信号。
func or(channels ...<-chan interface{}) <-chan interface{} {//特殊情况,只有0个或者1个switch len(channels) {case 0:return nilcase 1:return channels[0]}orDone := make(chan interface{})go func() {defer close(orDone)// 利用反射构建SelectCasevar cases []reflect.SelectCasefor _, c := range channels {cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv,Chan: reflect.ValueOf(c),})}// 随机选择一个可用的casereflect.Select(cases)}()return orDone}
扇入模式(扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。)
(扇入模式也可以使用反射、递归,或者是用最笨的每个 goroutine 处理一个 Channel 的方式来实现。)
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {out := make(chan interface{})go func() {defer close(out)// 构造SelectCase slicevar cases []reflect.SelectCasefor _, c := range chans {cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv,Chan: reflect.ValueOf(c),})}// 循环,从cases中选择一个可用的for len(cases) > 0 {i, v, ok := reflect.Select(cases)if !ok { // 此channel已经closecases = append(cases[:i], cases[i+1:]...)continue}out <- v.Interface()}}()return out}
递归模式也是在 Channel 大于 2 时,采用二分法递归 merge。
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {switch len(chans) {case 0:c := make(chan interface{})close(c)return ccase 1:return chans[0]case 2:return mergeTwo(chans[0], chans[1])default:m := len(chans) / 2return mergeTwo(fanInRec(chans[:m]...),fanInRec(chans[m:]...))}}
这里有一个 mergeTwo 的方法,是将两个 Channel 合并成一个 Channel,是扇入形式的一种特例(只处理两个 Channel)
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {c := make(chan interface{})go func() {defer close(c)for a != nil || b != nil { //只要还有可读的chanselect {case v, ok := <-a:if !ok { // a 已关闭,设置为nila = nilcontinue}c <- vcase v, ok := <-b:if !ok { // b 已关闭,设置为nilb = nilcontinue}c <- v}}}()return c}
扇出模式(有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的)观察者模式
从源 Channel 取出一个数据后,依次发送给目标 Channel。在发送给目标 Channel 的时候,可以同步发送,也可以异步发送:
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {go func() {defer func() { //退出时关闭所有的输出chanfor i := 0; i < len(out); i++ {close(out[i])}}()for v := range ch { // 从输入chan中读取数据v := vfor i := 0; i < len(out); i++ {i := iif async { //异步go func() {out[i] <- v // 放入到输出chan中,异步方式}()} else {out[i] <- v // 放入到输出chan中,同步方式}}}}()}
Stream
func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {s := make(chan interface{}) //创建一个unbuffered的channelgo func() { // 启动一个goroutine,往s中塞数据defer close(s) // 退出时关闭chanfor _, v := range values { // 遍历数组select {case <-done:returncase s <- v: // 将数组元素塞入到chan中}}}()return s}
takeN:只取流中的前 n 个数据;takeFn:筛选流中的数据,只保留满足条件的数据;takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;skipN:跳过流中前几个数据;skipFn:跳过满足条件的数据;skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都会输出给 Channel 的 receiver。
takeN
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {takeStream := make(chan interface{}) // 创建输出流go func() {defer close(takeStream)for i := 0; i < num; i++ { // 只读取前num个元素select {case <-done:returncase takeStream <- <-valueStream: //从输入流中读取元素}}}()return takeStream}
map-reduce
map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约(reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。
就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。
map
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {out := make(chan interface{}) //创建一个输出chanif in == nil { // 异常检查close(out)return out}go func() { // 启动一个goroutine,实现map的主要逻辑defer close(out)for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作out <- fn(v)}}()return out}
reduce
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {if in == nil { // 异常检查return nil}out := <-in // 先读取第一个元素for v := range in { // 实现reduce的主要逻辑out = fn(out, v)}return out}
这个程序使用 map-reduce 模式处理一组整数,map 函数就是为每个整数乘以 10,reduce 函数就是把 map 处理的结果累加起来
// 生成一个数据流func asStream(done <-chan struct{}) <-chan interface{} {s := make(chan interface{})values := []int{1, 2, 3, 4, 5}go func() {defer close(s)for _, v := range values { // 从数组生成select {case <-done:returncase s <- v:}}}()return s}func main() {in := asStream(nil)// map操作: 乘以10mapFn := func(v interface{}) interface{} {return v.(int) * 10}// reduce操作: 对map的结果进行累加reduceFn := func(r, v interface{}) interface{} {return r.(int) + v.(int)}sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果fmt.Println(sum)}
