atomic
atomic read-modify-write : https://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/
Load 和 Store:https://preshing.com/20130618/atomic-vs-non-atomic-operations/
假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?
1.使用加锁方式(Mutex和RWMutex)。
2.atomic (你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了)。
3.atomic 实现自己定义的基本并发原语:CondMutex、Mutex.LockContext、WaitGroup.Go
4.atomic 原子操作还是实现 lock-free 数据结构的基石。
lock-free:https://docs.microsoft.com/zh-cn/windows/win32/dxtecharts/lockless-programming
5.atomic 为了支持 int32、int64、uint32、uint64、uintptr、Pointer(Add 方法不支持)类型,分别提供了 AddXXX、CompareAndSwapXXX、SwapXXX、LoadXXX、StoreXXX 等方法。
6.atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。
7.uint32 = AddUint32(&x, ^uint32(c-1))和uint64 = AddUint32(&x, ^uint32(0))
8.CAS = func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)
9.Swap = (old = addr *addr = new return old)
Value
type Config struct {
NodeName string
Addr string
Count int32
}
func loadNewConfig() Config {
return Config{
NodeName: "北京",
Addr: "10.77.95.27",
Count: rand.Int31(),
}
}
func main() {
var config atomic.Value
config.Store(loadNewConfig())
var cond = sync.NewCond(&sync.Mutex{})
// 设置新的config
go func() {
for {
time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
config.Store(loadNewConfig())
cond.Broadcast() // 通知等待着配置已变更
}
}()
go func() {
for {
cond.L.Lock()
cond.Wait() // 等待变更信号
c := config.Load().(Config) // 读取新的配置
fmt.Printf("new config: %+v\n", c)
cond.L.Unlock()
}
}()
select {}
}
uber-go/atomic:https://github.com/uber-go/atomic
Lock-Free queue
package queue
import (
"sync/atomic"
"unsafe"
)
// lock-free的queue
type LKQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
value interface{}
next unsafe.Pointer
}
func NewLKQueue() *LKQueue {
n := unsafe.Pointer(&node{})
return &LKQueue{head: n, tail: n}
}
// 入队
func (q *LKQueue) Enqueue(v interface{}) {
n := &node{value: v}
for {
tail := load(&q.tail)
next := load(&tail.next)
if tail == load(&q.tail) { // 尾还是尾
if next == nil { // 还没有新数据入队
if cas(&tail.next, next, n) { //增加到队尾
cas(&q.tail, tail, n) //入队成功,移动尾巴指针
return
}
} else { // 已有新数据加到队列后面,需要移动尾指针
cas(&q.tail, tail, next)
}
}
}
}
// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
for {
head := load(&q.head)
tail := load(&q.tail)
next := load(&head.next)
if head == load(&q.head) { // head还是那个head
if head == tail { // head和tail一样
if next == nil { // 说明是空队列
return nil
}
// 只是尾指针还没有调整,尝试调整它指向下一个
cas(&q.tail, tail, next)
} else {
// 读取出队的数据
v := next.value
// 既然要出队了,头指针移动到下一个
if cas(&q.head, head, next) {
return v // Dequeue is done. return
}
}
}
}
}
// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) {
return (*node)(atomic.LoadPointer(p))
}
// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
return atomic.CompareAndSwapPointer(
p, unsafe.Pointer(old), unsafe.Pointer(new))
}
参考:https://github.com/golang/go/issues/39351
参考:https://dave.cheney.net/2018/01/06/if-aligned-memory-writes-are-atomic-why-do-we-need-the-sync-atomic-package
恰好老婆大人是做芯片MMU相关工作的,咨询了一下她,她告诉我现代的CPU基本上都在硬件层面保证了多核之间数据视图的一致性,也就是说普通的LOAD/STORE命令在硬件层面处理器就可以保证cache的一致性。如果是这样的话,那是不是可以理解为atomic包对指针的作用,主要是防止编译器做指令重排呢?因为编译器在这些现代架构上没必要使用特殊的指令了。
Channel
https://github.com/docker/libchan
https://github.com/tylertreat/chan
论文:https://www.cs.cmu.edu/~crary/819-f09/Hoare78.pdf
历史:https://swtch.com/~rsc/thread/
1.CSP 允许使用进程组件来描述系统,它们独立运行,并且只通过消息传递的方式通信。
2.Channel 类型是 Go 语言内置的类型,你无需引入某个包,就能使用它。
3.执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分享数据。
4.数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
5.数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
6.信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
7.任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
8.锁:利用 Channel 也可以实现互斥锁的机制。
9.(为了说起来方便,我们下面都把 Channel 叫做 chan)分为只能接收、只能发送、既可以接收又可以发送三种类型(
ChannelType = ( “chan” | “chan” “<-“ | “<-“ “chan” ) ElementType )
10.这个箭头总是射向左边的,元素类型总在最右边。如果箭头指向 chan,就表示可以往 chan 中塞数据;如果箭头远离 chan,就表示 chan 会往外吐数据。
11.“<-”有个规则,总是尽量和左边的 chan 结合(The <- operator associates with the leftmost chan possible:)
chan<- (chan int) // <- 和第一个chan结合
chan<- (<-chan int) // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
<-chan (<-chan int) // 第一个<-和最左边的chan结合,第二个<-和左边第二个chan结合
chan (<-chan int) // 因为括号的原因,<-和括号内第一个chan结合
12.nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。
13.发送数据(
ch <- 2000)
14.接收数据(
x := <-ch // 把接收的一条数据赋值给变量x
;foo(<-ch) // 把接收的一个的数据作为参数传给函数
;<-ch // 丢弃接收的一条数据)
15.send 和 recv 都可以作为 select 语句的 case clause
func main() {
var ch = make(chan int, 10)
for i := 0; i < 10; i++ {
select {
case ch <- i:
case v := <-ch:
fmt.Println(v)
}
}
}
16.chan 还可以应用于 for-range
for v := range ch {
fmt.Println(v)
}
17.或者是忽略读取的值,只是清空 chan
for range ch {
}
Channel 的实现原理
qcount:代表 chan 中已经接收但还没被取走的元素的个数。内建函数 len 可以返回这个字段的值。
dataqsiz:队列的大小。chan 使用一个循环队列来存放元素,循环队列很适合这种生产者 - 消费者的场景(我很好奇为什么这个字段省略 size 中的 e)。
buf:存放元素的循环队列的 buffer。
elemtype 和 elemsize:chan 中元素的类型和 size。因为 chan 一旦声明,它的元素类型是固定的,即普通类型或者指针类型,所以元素大小也是固定的。
sendx:处理发送数据的指针在 buf 中的位置。一旦接收了新的数据,指针就会加上 elemsize,移向下一个位置。buf 的总大小是 elemsize 的整数倍,而且 buf 是一个循环列表。
recvx:处理接收请求时的指针在 buf 中的位置。一旦取出数据,此指针会移动到下一个位置。
recvq:chan 是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到 recvq 队列中。
sendq:如果生产者因为 buf 满了而阻塞,会被加入到 sendq 队列中。
makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 略去检查代码
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
//
var c *hchan
switch {
case mem == 0:
// chan的size或者元素的size是0,不必创建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// hchan数据结构后面紧接着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针,那么单独分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小、类型、容量都记录下来
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
send
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 第一部分
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......
}
recv
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 第一部分,chan为nil
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
close
func closechan(c *hchan) {
if c == nil { // chan为nil, panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {// chan已经closed, panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// 释放所有的reader
for {
sg := c.recvq.dequeue()
......
gp := sg.g
......
glist.push(gp)
}
// 释放所有的writer (它们会panic)
for {
sg := c.sendq.dequeue()
......
gp := sg.g
......
glist.push(gp)
}
unlock(&c.lock)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
论文:https://songlh.github.io/paper/go-study.pdf
使用 Channel 最常见的错误是 panic 和 goroutine 泄漏。
1.close 为 nil 的 chan;
2.send 已经 close 的 chan;
3.close 已经 close 的 chan。
并发:
共享资源的并发访问使用传统并发原语;
复杂的任务编排和消息传递使用 Channel;
消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
需要和 Select 语句结合,使用 Channel;
需要和超时配合时,使用 Channel 和 Context。
注意:只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。
问:有一道经典的使用 Channel 进行任务编排的题,你可以尝试做一下:有四个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4、……的顺序打印出来。
func main() {
ch := make(chan struct{})
for i := 1; i <= 4; i++ {
go func(index int) {
time.Sleep(time.Duration(index*10) * time.Millisecond)
for {
<-ch
fmt.Printf("I am No %d Goroutine\n", index)
time.Sleep(time.Second)
ch <- struct{}{}
}
}(i)
}
ch <- struct{}{}
time.Sleep(time.Minute)
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
go func() {
for {
fmt.Println("I'm goroutine 1")
time.Sleep(1 * time.Second)
ch2 <-1 //I'm done, you turn
<-ch1
}
}()
go func() {
for {
<-ch2
fmt.Println("I'm goroutine 2")
time.Sleep(1 * time.Second)
ch3 <-1
}
}()
go func() {
for {
<-ch3
fmt.Println("I'm goroutine 3")
time.Sleep(1 * time.Second)
ch4 <-1
}
}()
go func() {
for {
<-ch4
fmt.Println("I'm goroutine 4")
time.Sleep(1 * time.Second)
ch1 <-1
}
}()
select {}
}
问:chan T 是否可以给 <- chan T 和 chan<- T 类型的变量赋值?反过来呢?
答:双向通道可以赋值给单向,反过来不可以.
反射Channel
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
Select 的方法
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
处理不定数据的Channel
func main() {
var ch1 = make(chan int, 10)
var ch2 = make(chan int, 10)
// 创建SelectCase
var cases = createCases(ch1, ch2)
// 执行10次select
for i := 0; i < 10; i++ {
chosen, recv, ok := reflect.Select(cases)
if recv.IsValid() { // recv case
fmt.Println("recv:", cases[chosen].Dir, recv, ok)
} else { // send case
fmt.Println("send:", cases[chosen].Dir, ok)
}
}
}
func createCases(chs ...chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 创建recv case
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
// 创建send case
for i, ch := range chs {
v := reflect.ValueOf(i)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: v,
})
}
return cases
}
worker 池:http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
“击鼓传花”
type Token struct{}
func newWorker(id int, ch chan Token, nextCh chan Token) {
for {
token := <-ch // 取得令牌
fmt.Println((id + 1)) // id从1开始
time.Sleep(time.Second)
nextCh <- token
}
}
func main() {
chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)}
// 创建4个worker
for i := 0; i < 4; i++ {
go newWorker(i, chs[i], chs[(i+1)%4])
}
//首先把令牌交给第一个worker
chs[0] <- struct{}{}
select {}
}
信号通知
func main() {
go func() {
...... // 执行业务处理
}()
// 处理CTRL+C等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
// 执行退出之前的清理动作
doCleanup()
fmt.Println("优雅退出")
}
程序退出
1.closing,代表程序退出,但是清理工作还没做;
2.closed,代表清理工作已经做完。
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
// 模拟业务处理
for {
select {
case <-closing:
return
default:
// ....... 业务计算
time.Sleep(100 * time.Millisecond)
}
}
}()
// 处理CTRL+C等中断信号
termChan := make(chan os.Signal)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
close(closing)
// 执行退出之前的清理动作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
fmt.Println("清理超时,不等了")
}
fmt.Println("优雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep((time.Minute))
close(closed)
}
锁
一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。
// 使用chan实现互斥锁
type Mutex struct {
ch chan struct{}
}
// 使用锁需要初始化
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 请求锁,直到获取到
func (m *Mutex) Lock() {
<-m.ch
}
// 解锁
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
fmt.Printf("locked v %v\n", ok)
ok = m.TryLock()
fmt.Printf("locked %v\n", ok)
}
你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。
任务编排(总共 5 种,分别是 Or-Done 模式、扇入模式、扇出模式、Stream 和 map-reduce)
Or-Done 模式(你发送同一个请求到多个微服务节点,只要任意一个微服务节点返回结果,就算成功)
func or(channels ...<-chan interface{}) <-chan interface{} {
// 特殊情况,只有零个或者1个chan
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2: // 2个也是一种特殊情况
select {
case <-channels[0]:
case <-channels[1]:
}
default: //超过两个,二分法递归处理
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}
当 chan 的数量大于 2 时,使用递归的方式等待信号。
func or(channels ...<-chan interface{}) <-chan interface{} {
//特殊情况,只有0个或者1个
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
// 利用反射构建SelectCase
var cases []reflect.SelectCase
for _, c := range channels {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 随机选择一个可用的case
reflect.Select(cases)
}()
return orDone
}
扇入模式(扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。)
(扇入模式也可以使用反射、递归,或者是用最笨的每个 goroutine 处理一个 Channel 的方式来实现。)
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 构造SelectCase slice
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 循环,从cases中选择一个可用的
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok { // 此channel已经close
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}
递归模式也是在 Channel 大于 2 时,采用二分法递归 merge。
func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
m := len(chans) / 2
return mergeTwo(
fanInRec(chans[:m]...),
fanInRec(chans[m:]...))
}
}
这里有一个 mergeTwo 的方法,是将两个 Channel 合并成一个 Channel,是扇入形式的一种特例(只处理两个 Channel)
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { //只要还有可读的chan
select {
case v, ok := <-a:
if !ok { // a 已关闭,设置为nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已关闭,设置为nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
扇出模式(有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的)观察者模式
从源 Channel 取出一个数据后,依次发送给目标 Channel。在发送给目标 Channel 的时候,可以同步发送,也可以异步发送:
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
go func() {
defer func() { //退出时关闭所有的输出chan
for i := 0; i < len(out); i++ {
close(out[i])
}
}()
for v := range ch { // 从输入chan中读取数据
v := v
for i := 0; i < len(out); i++ {
i := i
if async { //异步
go func() {
out[i] <- v // 放入到输出chan中,异步方式
}()
} else {
out[i] <- v // 放入到输出chan中,同步方式
}
}
}
}()
}
Stream
func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
s := make(chan interface{}) //创建一个unbuffered的channel
go func() { // 启动一个goroutine,往s中塞数据
defer close(s) // 退出时关闭chan
for _, v := range values { // 遍历数组
select {
case <-done:
return
case s <- v: // 将数组元素塞入到chan中
}
}
}()
return s
}
takeN:只取流中的前 n 个数据;
takeFn:筛选流中的数据,只保留满足条件的数据;
takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;
skipN:跳过流中前几个数据;
skipFn:跳过满足条件的数据;
skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都会输出给 Channel 的 receiver。
takeN
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{}) // 创建输出流
go func() {
defer close(takeStream)
for i := 0; i < num; i++ { // 只读取前num个元素
select {
case <-done:
return
case takeStream <- <-valueStream: //从输入流中读取元素
}
}
}()
return takeStream
}
map-reduce
map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约(reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。
就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。
map
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{}) //创建一个输出chan
if in == nil { // 异常检查
close(out)
return out
}
go func() { // 启动一个goroutine,实现map的主要逻辑
defer close(out)
for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
out <- fn(v)
}
}()
return out
}
reduce
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
if in == nil { // 异常检查
return nil
}
out := <-in // 先读取第一个元素
for v := range in { // 实现reduce的主要逻辑
out = fn(out, v)
}
return out
}
这个程序使用 map-reduce 模式处理一组整数,map 函数就是为每个整数乘以 10,reduce 函数就是把 map 处理的结果累加起来
// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
s := make(chan interface{})
values := []int{1, 2, 3, 4, 5}
go func() {
defer close(s)
for _, v := range values { // 从数组生成
select {
case <-done:
return
case s <- v:
}
}
}()
return s
}
func main() {
in := asStream(nil)
// map操作: 乘以10
mapFn := func(v interface{}) interface{} {
return v.(int) * 10
}
// reduce操作: 对map的结果进行累加
reduceFn := func(r, v interface{}) interface{} {
return r.(int) + v.(int)
}
sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
fmt.Println(sum)
}