atomic
atomic read-modify-write : https://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/
Load 和 Store:https://preshing.com/20130618/atomic-vs-non-atomic-operations/

假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?

1.使用加锁方式(Mutex和RWMutex)。
2.atomic (你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了)。
3.atomic 实现自己定义的基本并发原语:CondMutex、Mutex.LockContext、WaitGroup.Go
4.atomic 原子操作还是实现 lock-free 数据结构的基石。
lock-free:https://docs.microsoft.com/zh-cn/windows/win32/dxtecharts/lockless-programming
5.atomic 为了支持 int32、int64、uint32、uint64、uintptr、Pointer(Add 方法不支持)类型,分别提供了 AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等方法。
6.atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。
7.uint32 = AddUint32(&x, ^uint32(c-1))和uint64 = AddUint32(&x, ^uint32(0))
8.CAS = func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
9.Swap = (old =
addr *addr = new return old)
Value

  1. type Config struct {
  2. NodeName string
  3. Addr string
  4. Count int32
  5. }
  6. func loadNewConfig() Config {
  7. return Config{
  8. NodeName: "北京",
  9. Addr: "10.77.95.27",
  10. Count: rand.Int31(),
  11. }
  12. }
  13. func main() {
  14. var config atomic.Value
  15. config.Store(loadNewConfig())
  16. var cond = sync.NewCond(&sync.Mutex{})
  17. // 设置新的config
  18. go func() {
  19. for {
  20. time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
  21. config.Store(loadNewConfig())
  22. cond.Broadcast() // 通知等待着配置已变更
  23. }
  24. }()
  25. go func() {
  26. for {
  27. cond.L.Lock()
  28. cond.Wait() // 等待变更信号
  29. c := config.Load().(Config) // 读取新的配置
  30. fmt.Printf("new config: %+v\n", c)
  31. cond.L.Unlock()
  32. }
  33. }()
  34. select {}
  35. }

uber-go/atomic:https://github.com/uber-go/atomic
Lock-Free queue

  1. package queue
  2. import (
  3. "sync/atomic"
  4. "unsafe"
  5. )
  6. // lock-free的queue
  7. type LKQueue struct {
  8. head unsafe.Pointer
  9. tail unsafe.Pointer
  10. }
  11. // 通过链表实现,这个数据结构代表链表中的节点
  12. type node struct {
  13. value interface{}
  14. next unsafe.Pointer
  15. }
  16. func NewLKQueue() *LKQueue {
  17. n := unsafe.Pointer(&node{})
  18. return &LKQueue{head: n, tail: n}
  19. }
  20. // 入队
  21. func (q *LKQueue) Enqueue(v interface{}) {
  22. n := &node{value: v}
  23. for {
  24. tail := load(&q.tail)
  25. next := load(&tail.next)
  26. if tail == load(&q.tail) { // 尾还是尾
  27. if next == nil { // 还没有新数据入队
  28. if cas(&tail.next, next, n) { //增加到队尾
  29. cas(&q.tail, tail, n) //入队成功,移动尾巴指针
  30. return
  31. }
  32. } else { // 已有新数据加到队列后面,需要移动尾指针
  33. cas(&q.tail, tail, next)
  34. }
  35. }
  36. }
  37. }
  38. // 出队,没有元素则返回nil
  39. func (q *LKQueue) Dequeue() interface{} {
  40. for {
  41. head := load(&q.head)
  42. tail := load(&q.tail)
  43. next := load(&head.next)
  44. if head == load(&q.head) { // head还是那个head
  45. if head == tail { // head和tail一样
  46. if next == nil { // 说明是空队列
  47. return nil
  48. }
  49. // 只是尾指针还没有调整,尝试调整它指向下一个
  50. cas(&q.tail, tail, next)
  51. } else {
  52. // 读取出队的数据
  53. v := next.value
  54. // 既然要出队了,头指针移动到下一个
  55. if cas(&q.head, head, next) {
  56. return v // Dequeue is done. return
  57. }
  58. }
  59. }
  60. }
  61. }
  62. // 将unsafe.Pointer原子加载转换成node
  63. func load(p *unsafe.Pointer) (n *node) {
  64. return (*node)(atomic.LoadPointer(p))
  65. }
  66. // 封装CAS,避免直接将*node转换成unsafe.Pointer
  67. func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
  68. return atomic.CompareAndSwapPointer(
  69. p, unsafe.Pointer(old), unsafe.Pointer(new))
  70. }

参考:https://github.com/golang/go/issues/39351
参考:https://dave.cheney.net/2018/01/06/if-aligned-memory-writes-are-atomic-why-do-we-need-the-sync-atomic-package
恰好老婆大人是做芯片MMU相关工作的,咨询了一下她,她告诉我现代的CPU基本上都在硬件层面保证了多核之间数据视图的一致性,也就是说普通的LOAD/STORE命令在硬件层面处理器就可以保证cache的一致性。如果是这样的话,那是不是可以理解为atomic包对指针的作用,主要是防止编译器做指令重排呢?因为编译器在这些现代架构上没必要使用特殊的指令了。

Channel
https://github.com/docker/libchan
https://github.com/tylertreat/chan
论文:https://www.cs.cmu.edu/~crary/819-f09/Hoare78.pdf
历史:https://swtch.com/~rsc/thread/
1.CSP 允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信。
2.Channel 类型是 Go 语言内置的类型,你无需引入某个包,就能使用它。
3.执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分享数据。
4.数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
5.数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
6.信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
7.任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
8.锁:利用 Channel 也可以实现互斥锁的机制。
9.(为了说起来方便,我们下面都把 Channel 叫做 chan)分为只能接收、只能发送、既可以接收又可以发送三种类型( ChannelType = ( “chan” | “chan” “<-“ | “<-“ “chan” ) ElementType )
10.这个箭头总是射向左边的,元素类型总在最右边。如果箭头指向 chan,就表示可以往 chan 中塞数据;如果箭头远离 chan,就表示 chan 会往外吐数据。
11.“<-”有个规则,总是尽量和左边的 chan 结合(The <- operator associates with the leftmost chan possible:)

  1. chan<- chan int // <- 和第一个chan结合
  2. chan<- (<-chan int // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
  3. <-chan (<-chan int // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
  4. chan (<-chan int) // 因为括号的原因,<-和括号内第一个chan结合

12.nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。
13.发送数据( ch <- 2000)
14.接收数据( x := <-ch // 把接收的一条数据赋值给变量x ;foo(<-ch) // 把接收的一个的数据作为参数传给函数 ;<-ch // 丢弃接收的一条数据)
15.send 和 recv 都可以作为 select 语句的 case clause

  1. func main() {
  2. var ch = make(chan int, 10)
  3. for i := 0; i < 10; i++ {
  4. select {
  5. case ch <- i:
  6. case v := <-ch:
  7. fmt.Println(v)
  8. }
  9. }
  10. }

16.chan 还可以应用于 for-range

  1. for v := range ch {
  2. fmt.Println(v)
  3. }

17.或者是忽略读取的值,只是清空 chan

  1. for range ch {
  2. }

Channel 的实现原理

  1. qcount:代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。
  2. dataqsiz:队列的大小。chan 使用一个循环队列来存放元素,循环队列很适合这种生产者 - 消费者的场景(我很好奇为什么这个字段省略 size 中的 e)。
  3. buf:存放元素的循环队列的 buffer
  4. elemtype elemsizechan 中元素的类型和 size。因为 chan 一旦声明,它的元素类型是固定的,即普通类型或者指针类型,所以元素大小也是固定的。
  5. sendx:处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置。buf 的总大小是 elemsize 的整数倍,而且 buf 是一个循环列表。
  6. recvx:处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置。
  7. recvqchan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中。
  8. sendq:如果生产者因为 buf 满了而阻塞,会被加入到 sendq 队列中。

makechan

  1. func makechan(t *chantype, size int) *hchan {
  2. elem := t.elem
  3. // 略去检查代码
  4. mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  5. //
  6. var c *hchan
  7. switch {
  8. case mem == 0:
  9. // chan的size或者元素的size是0,不必创建buf
  10. c = (*hchan)(mallocgc(hchanSize, nil, true))
  11. c.buf = c.raceaddr()
  12. case elem.ptrdata == 0:
  13. // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
  14. c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  15. // hchan数据结构后面紧接着就是buf
  16. c.buf = add(unsafe.Pointer(c), hchanSize)
  17. default:
  18. // 元素包含指针,那么单独分配buf
  19. c = new(hchan)
  20. c.buf = mallocgc(mem, elem, true)
  21. }
  22. // 元素大小、类型、容量都记录下来
  23. c.elemsize = uint16(elem.size)
  24. c.elemtype = elem
  25. c.dataqsiz = uint(size)
  26. lockInit(&c.lock, lockRankHchan)
  27. return c
  28. }

send

  1. func chansend1(c *hchan, elem unsafe.Pointer) {
  2. chansend(c, elem, true, getcallerpc())
  3. }
  4. func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  5. // 第一部分
  6. if c == nil {
  7. if !block {
  8. return false
  9. }
  10. gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  11. throw("unreachable")
  12. }
  13. ......
  14. }

recv

  1. func chanrecv1(c *hchan, elem unsafe.Pointer) {
  2. chanrecv(c, elem, true)
  3. }
  4. func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
  5. _, received = chanrecv(c, elem, true)
  6. return
  7. }
  8. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  9. // 第一部分,chan为nil
  10. if c == nil {
  11. if !block {
  12. return
  13. }
  14. gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  15. throw("unreachable")
  16. }

close

  1. func closechan(c *hchan) {
  2. if c == nil { // chan为nil, panic
  3. panic(plainError("close of nil channel"))
  4. }
  5. lock(&c.lock)
  6. if c.closed != 0 {// chan已经closed, panic
  7. unlock(&c.lock)
  8. panic(plainError("close of closed channel"))
  9. }
  10. c.closed = 1
  11. var glist gList
  12. // 释放所有的reader
  13. for {
  14. sg := c.recvq.dequeue()
  15. ......
  16. gp := sg.g
  17. ......
  18. glist.push(gp)
  19. }
  20. // 释放所有的writer (它们会panic)
  21. for {
  22. sg := c.sendq.dequeue()
  23. ......
  24. gp := sg.g
  25. ......
  26. glist.push(gp)
  27. }
  28. unlock(&c.lock)
  29. for !glist.empty() {
  30. gp := glist.pop()
  31. gp.schedlink = 0
  32. goready(gp, 3)
  33. }
  34. }

论文:https://songlh.github.io/paper/go-study.pdf
使用 Channel 最常见的错误是 panic 和 goroutine 泄漏。
1.close 为 nil 的 chan;
2.send 已经 close 的 chan;
3.close 已经 close 的 chan。
并发:

  1. 共享资源的并发访问使用传统并发原语;
  2. 复杂的任务编排和消息传递使用 Channel
  3. 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond
  4. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 语句结合,使用 Channel
  6. 需要和超时配合时,使用 Channel Context

image.png
注意:只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。

问:有一道经典的使用 Channel 进行任务编排的题,你可以尝试做一下:有四个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4、……的顺序打印出来。

  1. func main() {
  2. ch := make(chan struct{})
  3. for i := 1; i <= 4; i++ {
  4. go func(index int) {
  5. time.Sleep(time.Duration(index*10) * time.Millisecond)
  6. for {
  7. <-ch
  8. fmt.Printf("I am No %d Goroutine\n", index)
  9. time.Sleep(time.Second)
  10. ch <- struct{}{}
  11. }
  12. }(i)
  13. }
  14. ch <- struct{}{}
  15. time.Sleep(time.Minute)
  16. }
  1. func main() {
  2. ch1 := make(chan int)
  3. ch2 := make(chan int)
  4. ch3 := make(chan int)
  5. ch4 := make(chan int)
  6. go func() {
  7. for {
  8. fmt.Println("I'm goroutine 1")
  9. time.Sleep(1 * time.Second)
  10. ch2 <-1 //I'm done, you turn
  11. <-ch1
  12. }
  13. }()
  14. go func() {
  15. for {
  16. <-ch2
  17. fmt.Println("I'm goroutine 2")
  18. time.Sleep(1 * time.Second)
  19. ch3 <-1
  20. }
  21. }()
  22. go func() {
  23. for {
  24. <-ch3
  25. fmt.Println("I'm goroutine 3")
  26. time.Sleep(1 * time.Second)
  27. ch4 <-1
  28. }
  29. }()
  30. go func() {
  31. for {
  32. <-ch4
  33. fmt.Println("I'm goroutine 4")
  34. time.Sleep(1 * time.Second)
  35. ch1 <-1
  36. }
  37. }()
  38. select {}
  39. }

问:chan T 是否可以给 <- chan T 和 chan<- T 类型的变量赋值?反过来呢?
答:双向通道可以赋值给单向,反过来不可以.

反射Channel

  1. select {
  2. case v := <-ch1:
  3. fmt.Println(v)
  4. case v := <-ch2:
  5. fmt.Println(v)
  6. }

Select 的方法

  1. func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

处理不定数据的Channel

  1. func main() {
  2. var ch1 = make(chan int, 10)
  3. var ch2 = make(chan int, 10)
  4. // 创建SelectCase
  5. var cases = createCases(ch1, ch2)
  6. // 执行10次select
  7. for i := 0; i < 10; i++ {
  8. chosen, recv, ok := reflect.Select(cases)
  9. if recv.IsValid() { // recv case
  10. fmt.Println("recv:", cases[chosen].Dir, recv, ok)
  11. } else { // send case
  12. fmt.Println("send:", cases[chosen].Dir, ok)
  13. }
  14. }
  15. }
  16. func createCases(chs ...chan int) []reflect.SelectCase {
  17. var cases []reflect.SelectCase
  18. // 创建recv case
  19. for _, ch := range chs {
  20. cases = append(cases, reflect.SelectCase{
  21. Dir: reflect.SelectRecv,
  22. Chan: reflect.ValueOf(ch),
  23. })
  24. }
  25. // 创建send case
  26. for i, ch := range chs {
  27. v := reflect.ValueOf(i)
  28. cases = append(cases, reflect.SelectCase{
  29. Dir: reflect.SelectSend,
  30. Chan: reflect.ValueOf(ch),
  31. Send: v,
  32. })
  33. }
  34. return cases
  35. }

worker 池:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
“击鼓传花”

  1. type Token struct{}
  2. func newWorker(id int, ch chan Token, nextCh chan Token) {
  3. for {
  4. token := <-ch // 取得令牌
  5. fmt.Println((id + 1)) // id从1开始
  6. time.Sleep(time.Second)
  7. nextCh <- token
  8. }
  9. }
  10. func main() {
  11. chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}
  12. // 创建4个worker
  13. for i := 0; i < 4; i++ {
  14. go newWorker(i, chs[i], chs[(i+1)%4])
  15. }
  16. //首先把令牌交给第一个worker
  17. chs[0] <- struct{}{}
  18. select {}
  19. }

信号通知

  1. func main() {
  2. go func() {
  3. ...... // 执行业务处理
  4. }()
  5. // 处理CTRL+C等中断信号
  6. termChan := make(chan os.Signal)
  7. signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  8. <-termChan
  9. // 执行退出之前的清理动作
  10. doCleanup()
  11. fmt.Println("优雅退出")
  12. }

程序退出
1.closing,代表程序退出,但是清理工作还没做;
2.closed,代表清理工作已经做完。

  1. func main() {
  2. var closing = make(chan struct{})
  3. var closed = make(chan struct{})
  4. go func() {
  5. // 模拟业务处理
  6. for {
  7. select {
  8. case <-closing:
  9. return
  10. default:
  11. // ....... 业务计算
  12. time.Sleep(100 * time.Millisecond)
  13. }
  14. }
  15. }()
  16. // 处理CTRL+C等中断信号
  17. termChan := make(chan os.Signal)
  18. signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  19. <-termChan
  20. close(closing)
  21. // 执行退出之前的清理动作
  22. go doCleanup(closed)
  23. select {
  24. case <-closed:
  25. case <-time.After(time.Second):
  26. fmt.Println("清理超时,不等了")
  27. }
  28. fmt.Println("优雅退出")
  29. }
  30. func doCleanup(closed chan struct{}) {
  31. time.Sleep((time.Minute))
  32. close(closed)
  33. }


一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。

  1. // 使用chan实现互斥锁
  2. type Mutex struct {
  3. ch chan struct{}
  4. }
  5. // 使用锁需要初始化
  6. func NewMutex() *Mutex {
  7. mu := &Mutex{make(chan struct{}, 1)}
  8. mu.ch <- struct{}{}
  9. return mu
  10. }
  11. // 请求锁,直到获取到
  12. func (m *Mutex) Lock() {
  13. <-m.ch
  14. }
  15. // 解锁
  16. func (m *Mutex) Unlock() {
  17. select {
  18. case m.ch <- struct{}{}:
  19. default:
  20. panic("unlock of unlocked mutex")
  21. }
  22. }
  23. // 尝试获取锁
  24. func (m *Mutex) TryLock() bool {
  25. select {
  26. case <-m.ch:
  27. return true
  28. default:
  29. }
  30. return false
  31. }
  32. // 加入一个超时的设置
  33. func (m *Mutex) LockTimeout(timeout time.Duration) bool {
  34. timer := time.NewTimer(timeout)
  35. select {
  36. case <-m.ch:
  37. timer.Stop()
  38. return true
  39. case <-timer.C:
  40. }
  41. return false
  42. }
  43. // 锁是否已被持有
  44. func (m *Mutex) IsLocked() bool {
  45. return len(m.ch) == 0
  46. }
  47. func main() {
  48. m := NewMutex()
  49. ok := m.TryLock()
  50. fmt.Printf("locked v %v\n", ok)
  51. ok = m.TryLock()
  52. fmt.Printf("locked %v\n", ok)
  53. }

你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。

任务编排(总共 5 种,分别是 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce)

Or-Done 模式(你发送同一个请求到多个微服务节点,只要任意一个微服务节点返回结果,就算成功)

  1. func or(channels ...<-chan interface{}) <-chan interface{} {
  2. // 特殊情况,只有零个或者1个chan
  3. switch len(channels) {
  4. case 0:
  5. return nil
  6. case 1:
  7. return channels[0]
  8. }
  9. orDone := make(chan interface{})
  10. go func() {
  11. defer close(orDone)
  12. switch len(channels) {
  13. case 2: // 2个也是一种特殊情况
  14. select {
  15. case <-channels[0]:
  16. case <-channels[1]:
  17. }
  18. default: //超过两个,二分法递归处理
  19. m := len(channels) / 2
  20. select {
  21. case <-or(channels[:m]...):
  22. case <-or(channels[m:]...):
  23. }
  24. }
  25. }()
  26. return orDone
  27. }

当 chan 的数量大于 2 时,使用递归的方式等待信号。

  1. func or(channels ...<-chan interface{}) <-chan interface{} {
  2. //特殊情况,只有0个或者1个
  3. switch len(channels) {
  4. case 0:
  5. return nil
  6. case 1:
  7. return channels[0]
  8. }
  9. orDone := make(chan interface{})
  10. go func() {
  11. defer close(orDone)
  12. // 利用反射构建SelectCase
  13. var cases []reflect.SelectCase
  14. for _, c := range channels {
  15. cases = append(cases, reflect.SelectCase{
  16. Dir: reflect.SelectRecv,
  17. Chan: reflect.ValueOf(c),
  18. })
  19. }
  20. // 随机选择一个可用的case
  21. reflect.Select(cases)
  22. }()
  23. return orDone
  24. }

扇入模式(扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。)
(扇入模式也可以使用反射、递归,或者是用最笨的每个 goroutine 处理一个 Channel 的方式来实现。)

  1. func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
  2. out := make(chan interface{})
  3. go func() {
  4. defer close(out)
  5. // 构造SelectCase slice
  6. var cases []reflect.SelectCase
  7. for _, c := range chans {
  8. cases = append(cases, reflect.SelectCase{
  9. Dir: reflect.SelectRecv,
  10. Chan: reflect.ValueOf(c),
  11. })
  12. }
  13. // 循环,从cases中选择一个可用的
  14. for len(cases) > 0 {
  15. i, v, ok := reflect.Select(cases)
  16. if !ok { // 此channel已经close
  17. cases = append(cases[:i], cases[i+1:]...)
  18. continue
  19. }
  20. out <- v.Interface()
  21. }
  22. }()
  23. return out
  24. }

递归模式也是在 Channel 大于 2 时,采用二分法递归 merge。

  1. func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
  2. switch len(chans) {
  3. case 0:
  4. c := make(chan interface{})
  5. close(c)
  6. return c
  7. case 1:
  8. return chans[0]
  9. case 2:
  10. return mergeTwo(chans[0], chans[1])
  11. default:
  12. m := len(chans) / 2
  13. return mergeTwo(
  14. fanInRec(chans[:m]...),
  15. fanInRec(chans[m:]...))
  16. }
  17. }

这里有一个 mergeTwo 的方法,是将两个 Channel 合并成一个 Channel,是扇入形式的一种特例(只处理两个 Channel)

  1. func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
  2. c := make(chan interface{})
  3. go func() {
  4. defer close(c)
  5. for a != nil || b != nil { //只要还有可读的chan
  6. select {
  7. case v, ok := <-a:
  8. if !ok { // a 已关闭,设置为nil
  9. a = nil
  10. continue
  11. }
  12. c <- v
  13. case v, ok := <-b:
  14. if !ok { // b 已关闭,设置为nil
  15. b = nil
  16. continue
  17. }
  18. c <- v
  19. }
  20. }
  21. }()
  22. return c
  23. }

扇出模式(有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的)观察者模式
从源 Channel 取出一个数据后,依次发送给目标 Channel。在发送给目标 Channel 的时候,可以同步发送,也可以异步发送:

  1. func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
  2. go func() {
  3. defer func() { //退出时关闭所有的输出chan
  4. for i := 0; i < len(out); i++ {
  5. close(out[i])
  6. }
  7. }()
  8. for v := range ch { // 从输入chan中读取数据
  9. v := v
  10. for i := 0; i < len(out); i++ {
  11. i := i
  12. if async { //异步
  13. go func() {
  14. out[i] <- v // 放入到输出chan中,异步方式
  15. }()
  16. } else {
  17. out[i] <- v // 放入到输出chan中,同步方式
  18. }
  19. }
  20. }
  21. }()
  22. }

Stream

  1. func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
  2. s := make(chan interface{}) //创建一个unbuffered的channel
  3. go func() { // 启动一个goroutine,往s中塞数据
  4. defer close(s) // 退出时关闭chan
  5. for _, v := range values { // 遍历数组
  6. select {
  7. case <-done:
  8. return
  9. case s <- v: // 将数组元素塞入到chan中
  10. }
  11. }
  12. }()
  13. return s
  14. }
  1. takeN:只取流中的前 n 个数据;
  2. takeFn:筛选流中的数据,只保留满足条件的数据;
  3. takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;
  4. skipN:跳过流中前几个数据;
  5. skipFn:跳过满足条件的数据;
  6. skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都会输出给 Channel receiver

takeN

  1. func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
  2. takeStream := make(chan interface{}) // 创建输出流
  3. go func() {
  4. defer close(takeStream)
  5. for i := 0; i < num; i++ { // 只读取前num个元素
  6. select {
  7. case <-done:
  8. return
  9. case takeStream <- <-valueStream: //从输入流中读取元素
  10. }
  11. }
  12. }()
  13. return takeStream
  14. }

map-reduce
map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约(reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。
就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。
map

  1. func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
  2. out := make(chan interface{}) //创建一个输出chan
  3. if in == nil { // 异常检查
  4. close(out)
  5. return out
  6. }
  7. go func() { // 启动一个goroutine,实现map的主要逻辑
  8. defer close(out)
  9. for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
  10. out <- fn(v)
  11. }
  12. }()
  13. return out
  14. }

reduce

  1. func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
  2. if in == nil { // 异常检查
  3. return nil
  4. }
  5. out := <-in // 先读取第一个元素
  6. for v := range in { // 实现reduce的主要逻辑
  7. out = fn(out, v)
  8. }
  9. return out
  10. }

这个程序使用 map-reduce 模式处理一组整数,map 函数就是为每个整数乘以 10,reduce 函数就是把 map 处理的结果累加起来

  1. // 生成一个数据流
  2. func asStream(done <-chan struct{}) <-chan interface{} {
  3. s := make(chan interface{})
  4. values := []int{1, 2, 3, 4, 5}
  5. go func() {
  6. defer close(s)
  7. for _, v := range values { // 从数组生成
  8. select {
  9. case <-done:
  10. return
  11. case s <- v:
  12. }
  13. }
  14. }()
  15. return s
  16. }
  17. func main() {
  18. in := asStream(nil)
  19. // map操作: 乘以10
  20. mapFn := func(v interface{}) interface{} {
  21. return v.(int) * 10
  22. }
  23. // reduce操作: 对map的结果进行累加
  24. reduceFn := func(r, v interface{}) interface{} {
  25. return r.(int) + v.(int)
  26. }
  27. sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
  28. fmt.Println(sum)
  29. }