并发与并行的区别

并行:多核cpu在同一时间片内并行处理多个任务。
并发:如单核cpu在多个任务间进行时间片切换,并非同一时间片执行多个任务,只是上下文切换时间很短,看似多个任务并行。

多线程和多线程是并行的基本前提条件,单线程也可用协程做到并发。
在golang中是通过goroutine来实现并发的,goroutine并不能简单的归纳为协程,其运行时会创建多个线程来实现并发任务,且任务单元可被调度到其他线程并行执行。所以goroutine更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

goroutine

关键字go并非执行并发操作,而是创建一个并发任务单元。新建任务被放置在系统队列中,等待调度器安排合适的系统线程去获取执行权。

当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行顺序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。相比系统默认MB级别的线程栈,goroutine自定义栈仅需2KB,所以才能创建成千上万的并发任务。自定义栈采取按需分配策略,在需要时仅需扩容,最大能到GB规模。

与defer一样,goroutine也会因延迟执行而立即计算并复制执行参数。

  1. var c int
  2. func counter()int{
  3. c++
  4. return c
  5. }
  6. func main() {
  7. a:=100
  8. go func(x,y int) {
  9. time.Sleep(time.Second)
  10. fmt.Println("go:",x,y)
  11. }(a,counter())
  12. a+=100
  13. fmt.Println("main:",a,counter())
  14. time.Sleep(time.Second*3)
  15. }

输出:

  1. main: 200 2
  2. go: 100 1

wait

进程退出时不会等待并发任务结束,可用channel阻塞,然后发出退出信号。

  1. func main() {
  2. exit:=make(chan interface{}) //创建通道。因为仅是通知,此处channel可为任何类型。
  3. go func() {
  4. time.Sleep(time.Second)
  5. fmt.Println("goroutine done")
  6. close(exit) //关闭通道,发出信号。
  7. }()
  8. fmt.Println("main...")
  9. <-exit //通道关闭则立即解除。
  10. fmt.Println("main exit")
  11. }

输出:

  1. main...
  2. goroutine done
  3. main exit

除了关闭通道外,向通道内写入数据也可解除阻塞。channel的更多信息,后面再做详述。
如要等待多个任务结束,推荐使用sync.WaitGruop。通过设定计数器,让每个goroutine在退出前递减,直至归零时解除阻塞。

  1. func main() {
  2. var wg sync.WaitGroup
  3. for i:=0;i<10;i++{
  4. wg.Add(1) //累加计数
  5. go func(id int) {
  6. defer wg.Done() //递减计数
  7. time.Sleep(time.Second)
  8. fmt.Println("goroutine",id,"done")
  9. }(i)
  10. }
  11. fmt.Println("main...")
  12. wg.Wait() //阻塞,直到计数归零
  13. fmt.Println("main exit")
  14. }

尽管WaitGroup.Add实现了原子操作,但建议在goroutine外累加计数器,以免Add尚未执行,Wait以及推出。

  1. func main() {
  2. var wg sync.WaitGroup
  3. go func() {
  4. wg.Add(1) //可以运行试一下,不是每次都能设置上
  5. defer wg.Done() //递减计数
  6. fmt.Println("goroutine", "done")
  7. }()
  8. fmt.Println("main...")
  9. wg.Wait() //阻塞,直到计数归零
  10. fmt.Println("main exit")
  11. }

可在多处用Wait阻塞,他们都能接收到通知。上例就可在go func前加wg.Wait().

GOMAXPROCS

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。该数量默认与CPU核数相等,可用runtime.GOMAXPROCS函数(或环境变量)修改。

如参数小于1,GOMAXPROCS仅返回当前设置值,不做任何调整。

  1. import (
  2. "math"
  3. "fmt"
  4. "sync"
  5. "runtime"
  6. )
  7. //测试目标函数
  8. func count(){
  9. x:=0
  10. for i:=0;i<math.MaxUint32;i++{
  11. x+=i
  12. }
  13. fmt.Println(x)
  14. }
  15. //循环执行
  16. func test(n int){
  17. for i:=0;i<n;i++{
  18. count()
  19. }
  20. }
  21. //并发执行
  22. func test2(n int){
  23. var wg sync.WaitGroup
  24. wg.Add(n)
  25. for i:=0;i<n;i++{
  26. go func() {
  27. count()
  28. wg.Done()
  29. }()
  30. }
  31. wg.Wait()
  32. }
  33. func main() {
  34. n:=runtime.GOMAXPROCS(0)
  35. n1:=runtime.NumCPU()
  36. fmt.Println(n1)
  37. test(n)
  38. }
  1. n:=runtime.GOMAXPROCS(0)
  2. n1:=runtime.NumCPU()

上述两个都可用来获取当前系统的cpu核数。

Local Storage

与线程不同,goroutine任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但除优先级外,其他功能都很容易实现。

  1. func main() {
  2. var wg sync.WaitGroup
  3. var gs [5]struct{ //用于实现类似TLS功能
  4. id int //编号
  5. result int //返回值
  6. }
  7. for i:=0;i<len(gs);i++{
  8. wg.Add(1)
  9. go func(id int) { //使用参数避免参数闭包延迟求值
  10. defer wg.Done()
  11. gs[id].id = id
  12. gs[id].result=(id + 1) * 100
  13. }(i)
  14. }
  15. wg.Wait()
  16. fmt.Printf("%+v\n",gs)
  17. }

输出:

  1. [{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]

如使用map作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查

Gosched

  1. 暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行。
  1. func main() {
  2. runtime.GOMAXPROCS(1)
  3. exit:=make(chan struct{})
  4. go func() { //任务a
  5. defer close(exit)
  6. go func() { //任务b,放在此处是为了确保a先执行。
  7. fmt.Println("b")
  8. }()
  9. for i:=0;i<4;i++{
  10. fmt.Println("a:", i)
  11. if i==1{
  12. runtime.Gosched()
  13. }
  14. }
  15. }()
  16. <-exit
  17. }

输出:

  1. a: 0
  2. a: 1
  3. b
  4. a: 2
  5. a: 3

该函数很少被使用,因为运行时会主动向长时间运行(10ms)的任务发出抢占调度。

Goexit

Goexit立即终止当前任务,运行时确保所有已注册延迟调用被执行。该函数不会影响其他并发任务,不会引发panic,自然也就无法捕获。

  1. func main() {
  2. exit:=make(chan struct{})
  3. go func() {
  4. defer close(exit)
  5. defer println("a")
  6. func(){
  7. defer func() {
  8. println("b",recover() ==nil) //执行recover返回nil
  9. }()
  10. func(){ //在多层调用中执行Goexit
  11. println("c")
  12. runtime.Goexit() //立即终止整个调用堆栈
  13. println("c done.") //不会执行
  14. }()
  15. println("b done.") //不会被执行
  16. }()
  17. println("a done.") //不会执行
  18. }()
  19. <- exit
  20. println("main exit.")
  21. }

输出:

  1. c
  2. b true
  3. a
  4. main exit.

如果在main.main里调用Goexit,它会等待其他任务结束,然后让进程直接崩溃。
无论身处哪一层,Goexit都能立即终止整个调用堆栈,这与return仅退出当前函数不同。 标准库函数os.Exit可终止进程,但不会执行延迟调用。

通道

Go并未实现严格的并发安全。

允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致性和完整性。Go鼓励使用CSP通道,以通信代替内存共享,实现并发安全。

通过消息来避免竟态的模型除了CSP,还有Actor。但两者区别较大。

  1. 作为CSP核心,通道是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可如果另一端未准备妥当,或消息未能及时处理时,会阻塞当前端。
  2. 相比起来,Actor是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。默认就是异步方式,发送方对消息是否被接收和处理并不关心。
  3. 从底层实现上来说,通道只是一个队列。同步模式下,发送和接收双方配对,然后直接赋值数据给对方。如配对失败,则置入等待队列,直到另一方出现后才被唤醒。异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有另一方写入数据或腾出空槽后被唤醒。

除传递消息外,通道还被用作时间通知。

  1. func main() {
  2. done:=make(chan struct{})
  3. c:=make(chan string)
  4. go func() {
  5. s:=<-c
  6. fmt.Println(s)
  7. close(done)
  8. }()
  9. c<-"hello!"
  10. <-done //阻塞,直到有数据或通道关闭
  11. }

同步模式必须有配对操作的goroutine出现,否则会一直阻塞。而异步模式在缓冲区未满或数据未读完前,不会阻塞。

  1. func main() {
  2. c:=make(chan int,3) //创建带三个缓冲槽的异步通道
  3. c<-1 //缓冲区未满,不会阻塞
  4. c<-2
  5. println(<-c)
  6. println(<-c)
  7. }
  8. 输出:
  9. 1
  10. 2

多数时候,异步通道有助于提升性能,减少排队阻塞。

缓冲区大小仅仅是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或nil。

  1. func main(){
  2. var a,b chan int = make(chan int,3),make(chan int)
  3. var c chan bool
  4. fmt.Println(a==b)
  5. fmt.Println(c==nil)
  6. fmt.Printf("%p,%d\n",a,unsafe.Sizeof(a))
  7. }
  8. 输出:
  9. false
  10. true
  11. 0xc04207a000,8

虽然可传递指针来避免数据复制,但须额外注意数据并发安全。

内置函数cap和len返回缓冲区大小和当前已缓冲数量;而对于同步通道则都返回0,据此可判断通道是同步还是异步。

  1. func main(){
  2. a,b:=make(chan int),make(chan int,3)
  3. b<-1
  4. b<-2
  5. println("a",len(a),cap(a))
  6. println("b",len(b),cap(b))
  7. }

输出:

  1. a 0 0
  2. b 2 3

收发

除使用简单的发送和接收操作符外,还可用ok-idom或range模式处理数据。

  1. func main() {
  2. done :=make(chan struct{})
  3. c:=make(chan int)
  4. go func() {
  5. defer close(done)
  6. for{
  7. x,ok:=<-c
  8. if !ok{ //据此判断通道是否关闭
  9. return
  10. }
  11. fmt.Println(x)
  12. }
  13. }()
  14. c<-1
  15. c<-2
  16. c<-3
  17. close(c)
  18. <-done
  19. }

输出:1,2,3

对于循环接收数据,range模式更简洁一些。

  1. [...]
  2. go func() {
  3. defer close(done)
  4. for x:=range c{ //循环获取消息,直到通道被关闭。
  5. println(x)
  6. }
  7. }()
  8. [...]

及时用close函数关闭通道引发结束通知,否则可能会导致死锁。
通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。
一次性事件用close效率更好,没有多余开销。连续或多样性事件,可传递不同数据标志实现。还可使用sync.Cond实现单播或广播事件。
对于closed或nil通道,发送或接收操作都有相应规则:

  • 向已关闭通道发送数据,引发panic。
  • 从已关闭通道接收数据,返回已缓冲数据或零值。
  • 无论收发,nil通道都会阻塞。
  1. func main(){
  2. c:=make(chan int,3)
  3. c<-10
  4. c<-20
  5. close(c)
  6. for i:=0;i<cap(c)+1;i++{
  7. x,ok:=<-c
  8. println(i,":",ok,x)
  9. }
  10. }

输出:

  1. 0 : true 10
  2. 1 : true 20
  3. 2 : false 0
  4. 3 : false 0

重复关闭,或关闭nil通道都会引发panic错误。

单向

通道默认都是双向的,并不区分发送和接收端。但某些时候,我们可限制收发操作的方向来获得更严谨的操作逻辑。
尽管可用make创建单向通道,但那没有任何意义。通常使用类型转换来获取单向通道,并分别赋予操作双方。

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(2)
  4. c:=make(chan int)
  5. var send chan<- int =c
  6. var recv <-chan int =c
  7. go func() {
  8. defer wg.Done()
  9. for x:=range recv{
  10. println(x)
  11. }
  12. }()
  13. go func() {
  14. defer wg.Done()
  15. defer close(c)
  16. for i:=0;i<3;i++{
  17. send<-i
  18. }
  19. }()
  20. wg.Wait()
  21. }

不能在单向通道上做逆向操作。close也不能用于接收端。也无法将单向通道重新转换回去。

选择

如要同时处理多个通道,可选用Select语句。它会随机选择一个可用通道做收发操作。

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(2)
  4. a, b := make(chan int), make(chan int)
  5. go func() { //接收端
  6. defer wg.Done()
  7. for {
  8. var (
  9. name string
  10. x int
  11. ok bool
  12. )
  13. select {
  14. case x, ok = <-a: //随机选择可用channel接收数据
  15. name = "a"
  16. case x, ok = <-b:
  17. name = "b"
  18. }
  19. if !ok { //如果任一通道关闭则终止接收。
  20. return
  21. }
  22. println(name, x) //输出接收的数据信息
  23. }
  24. }()
  25. go func() { //发送端
  26. defer wg.Done()
  27. defer close(a)
  28. defer close(b)
  29. for i := 0; i < 10; i++ {
  30. select {
  31. case a <- i:
  32. case b <- i * 10:
  33. }
  34. }
  35. }()
  36. wg.Wait()
  37. }

输出:

  1. b 0
  2. a 1
  3. a 2
  4. b 30
  5. a 4
  6. b 50
  7. a 6
  8. a 7
  9. a 8
  10. b 90

如果要等全部的通道消息处理结束,可将已完成的通道设置为nil,这样她就会被阻塞,而不再被Select选中。

以下示例是两个独立的通道,逻辑是等两个通道都结束了收发才最终close,哪个先完成哪个阻塞住在那等待。

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(3)
  4. a, b := make(chan int), make(chan int)
  5. go func() { //接收端
  6. defer wg.Done()
  7. for {
  8. select {
  9. case x, ok := <-a:
  10. if !ok {
  11. a = nil
  12. break
  13. }
  14. println("a",x)
  15. case x, ok := <-b:
  16. if !ok {
  17. b = nil
  18. break
  19. }
  20. println("b", x)
  21. }
  22. if a == nil && b == nil {
  23. return
  24. }
  25. }
  26. }()
  27. go func() {
  28. defer wg.Done()
  29. defer close(a)
  30. for i := 0; i < 10; i++ {
  31. a <- i
  32. }
  33. }()
  34. go func() {
  35. defer wg.Done()
  36. defer close(b)
  37. for i := 100; i < 105; i++ {
  38. b <- i
  39. }
  40. }()
  41. wg.Wait()
  42. }

输出:

  1. a 0
  2. a 1
  3. b 100
  4. a 2
  5. a 3
  6. a 4
  7. b 101
  8. b 102
  9. b 103
  10. a 5
  11. b 104
  12. a 6
  13. a 7
  14. a 8
  15. a 9

即使是同一通道,也会随机选择case执行。

  1. func main() {
  2. var wg sync.WaitGroup
  3. wg.Add(2)
  4. c := make(chan int)
  5. go func() { //接收端
  6. defer wg.Done()
  7. for {
  8. var x int
  9. var ok bool
  10. select {
  11. case x, ok = <-c:
  12. println("a1",x)
  13. case x, ok = <-c:
  14. println("a2", x)
  15. }
  16. if !ok {
  17. return
  18. }
  19. }
  20. }()
  21. go func() {
  22. defer wg.Done()
  23. defer close(c)
  24. for i := 0; i < 10; i++ {
  25. select {
  26. case c<-i:
  27. case c<-i*10:
  28. }
  29. }
  30. }()
  31. wg.Wait()
  32. }

输出:

  1. a1 0
  2. a1 1
  3. a2 2
  4. a2 3
  5. a1 4
  6. a1 50
  7. a1 60
  8. a1 7
  9. a2 80
  10. a2 90
  11. a2 0

当所有通道都不可用时,Select会执行default语句。如此可避开Select阻塞,但须注意处理外层循环,以免陷入空耗。

  1. func main() {
  2. c:=make(chan int)
  3. done:=make(chan bool)
  4. go func() {
  5. defer close(done)
  6. for {
  7. select {
  8. case x,ok:=<-c:
  9. if !ok{
  10. return
  11. }
  12. fmt.Println("data:",x)
  13. default: //避免Select阻塞
  14. }
  15. fmt.Println(time.Now())
  16. time.Sleep(time.Second)
  17. }
  18. }()
  19. time.Sleep(5*time.Second)
  20. c<-100
  21. close(c)
  22. <-done
  23. }

也可以用default处理一些默认逻辑。

  1. func main() {
  2. done := make(chan struct{})
  3. data := []chan int{ //数据缓冲区
  4. make(chan int, 3),
  5. }
  6. go func() { //生产数据
  7. defer close(done)
  8. for i := 0; i < 10; i++ {
  9. select {
  10. case data[len(data)-1] <- i: //生产数据
  11. default: //数据通道已满则新建chan
  12. data = append(data, make(chan int, 3))
  13. }
  14. }
  15. }()
  16. <-done
  17. for x := 0; x < len(data); x++ {
  18. c := data[x]
  19. close(c) //关闭通道后也能从中读取数据
  20. for i := range (c) {
  21. fmt.Println(i)
  22. }
  23. }
  24. }

输出:

  1. 0
  2. 1
  3. 2
  4. 4
  5. 5
  6. 6
  7. 8
  8. 9
  1. 可以看到,channel缓存满了后的第一个数据会被丢弃,直接走default创建新的通道了。

模式

通常使用工厂方法将goroutine和通道绑定。

  1. type receiver struct {
  2. wg sync.WaitGroup
  3. data chan int
  4. }
  5. func newReceiver() *receiver {
  6. r := &receiver{
  7. data: make(chan int),
  8. }
  9. r.wg.Add(1)
  10. go func() {
  11. defer r.wg.Done()
  12. for x := range r.data { //接收消息,直到通道关闭
  13. println("recv:", x)
  14. }
  15. }()
  16. return r
  17. }
  18. func main() {
  19. r := newReceiver()
  20. r.data <- 1
  21. r.data <- 2
  22. close(r.data) //关闭通道,发出结束通知
  23. r.wg.Wait() //等待接收者处理结束
  24. }

输出:

  1. recv: 1
  2. recv: 2

鉴于通道本身就是一个并发安全的队列,可用作ID generator、Pool等用途。

  1. type pool chan []byte
  2. func newPool(cap int)pool{
  3. return make(chan []byte,cap)
  4. }
  5. func (p pool)get() []byte{
  6. var v []byte
  7. select {
  8. case v=<-p: //返回
  9. default:
  10. v=make([]byte,10) //返回失败,新建
  11. }
  12. return v
  13. }
  14. func (p pool)put(b []byte){
  15. select {
  16. case p<-b: //放回
  17. default: //放回失败,新建
  18. }
  19. }

用通道实现信号量(semaphore)。

  1. func main() {
  2. runtime.GOMAXPROCS(4)
  3. var wg sync.WaitGroup
  4. sem:=make(chan struct{}, 2) //最多允许两个并发同时执行
  5. for i:=0;i<5;i++{
  6. wg.Add(1)
  7. go func(id int) {
  8. defer wg.Done()
  9. sem<- struct{}{} // acquire: 获取信号
  10. defer func() {<-sem}() //release: 释放信号
  11. time.Sleep(time.Second * 2)
  12. fmt.Println(id,time.Now())
  13. }(i)
  14. }
  15. wg.Wait()
  16. }

标准库time提供了timeout和tick channel实现。

  1. package main
  2. import (
  3. "time"
  4. "fmt"
  5. "os"
  6. )
  7. func main() {
  8. go func() {
  9. for{
  10. select {
  11. case <-time.After(time.Second*5):
  12. fmt.Println("timeout...")
  13. os.Exit(0)
  14. }
  15. }
  16. }()
  17. go func() {
  18. tick:=time.Tick(time.Second)
  19. //for _=range tick{
  20. // fmt.Println(time.Now(),"test")
  21. //}
  22. for {
  23. select {
  24. case <-tick:
  25. fmt.Println(time.Now())
  26. }
  27. }
  28. }()
  29. <-(chan struct {})(nil) //直接用nil channel阻塞进程
  30. }

捕获INT、TERM信号,顺便实现一个简易的atexit函数。
atexit函数是一个特殊的函数,它是在正常程序退出时调用的函数,我们把他叫为登记函数(函数原型:int atexit (void (*)(void))):
n个进程可以登记若n个函数,这些函数由exit⾃动调⽤,这些函数被称为终⽌处理函数, atexit函数可以登记这些函数。 exit调⽤终⽌处理函数的顺序和atexit登记的顺序相反(网上很多说造成顺序相反的原因是参数压栈造成的,参数的压栈是先进后出,和函数的栈帧相同),如果⼀个函数被多次登记,也会被多次调⽤。

python中有专门的atexit模块,简介如下:
从模块的名字也可以看出来,atexit模块主要的作用就是在程序即将结束之前执行的代码,atexit模块使用register函数用于注册程序退出时的回调函数,然后在回调函数中做一些资源清理的操作。

注意:

  1. 如果程序是非正常crash,或通过os._exit()退出,注册的回调函数将不会被调用。
  2. 也可以通过sys.exitfunc来注册回调,但通过它只能注册一个回调,而且还不支持参数。
  3. 建议使用atexit来注册回调函数。 ```go import ( “sync” “os” “os/signal” “syscall” “fmt” )

//type atexits struct { // sync.WaitGroup // signal chan os.Signal // funcs []func() //} var exits=&struct { sync.RWMutex signals chan os.Signal funcs []func() }{}

func atexit(f func()){ exits.Lock() defer exits.Unlock() exits.funcs=append(exits.funcs,f) }

func waitExit(){ if exits.signals==nil{ exits.signals = make(chan os.Signal) signal.Notify(exits.signals,syscall.SIGINT,syscall.SIGTERM) fmt.Println(“test”) } exits.RLock() for _,f:=range exits.funcs{ defer f() //延迟调用函数采用FILO顺序执行。即便某些函数panic,延迟调用也能确保后续函数执行。 } fmt.Println(“after range exits.funcs”) exits.RUnlock() fmt.Println(“after exits.Runlock”) <-exits.signals }

func main() { atexit(func() { println(“exit1…”) }) atexit(func() { println(“exit2…”) }) fmt.Println(“befor exit”)

  1. waitExit()

}

  1. <a name="c3318eaa"></a>
  2. ### 性能
  3. 将发往通道的数据打包,减少传输次数,可有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。
  4. ```go
  5. const (
  6. max = 500000 //数据统计上限
  7. block = 500 //数据块大小
  8. bufsize = 100 //缓冲区大小
  9. )
  10. func test() { //普通模式,每次传递一个整数
  11. done := make(chan struct{})
  12. c := make(chan int, bufsize)
  13. go func() {
  14. count := 0
  15. for x := range c {
  16. count += x
  17. }
  18. close(done)
  19. }()
  20. for i := 0; i < max; i++ {
  21. c <- i
  22. }
  23. close(c)
  24. <-done
  25. }
  26. func testBlock() { //块模式:每次将500个数字打包成块传输
  27. done := make(chan struct{})
  28. c:=make(chan [block]int,bufsize)
  29. go func() {
  30. count:=0
  31. for a:=range c{
  32. for _,x:=range a{ //a 是[block]int数组
  33. count +=x
  34. }
  35. }
  36. fmt.Println(count)
  37. close(done)
  38. }()
  39. for i:=0;i<max;i+=block{
  40. var b [block]int //使用数组对数据打包
  41. for n:=0; n<block;n++{
  42. b[n] = i+n
  43. if i+n == max -1{
  44. break
  45. }
  46. }
  47. c <- b
  48. }
  49. close(c)
  50. <-done
  51. }

BenchmarkTest

虽然单次消耗更多内存,但性能提升非常明显。如将数组改成切片会造成更多内存分配次数。

资源泄漏

通道可能会引发goroutine leak,确切地说,是指goroutine处于发送或接受阻塞状态,但一直未被唤醒。垃圾回收器并不手机此类资源,导致它们会在等待队列里长久休眠,形成资源泄漏。

同步

通道并不是用来取代锁的,它们有各自不同的应用场景。通道倾向于解决逻辑层次的并发处理架构,而锁则用来保护局部范围内的数据安全。
标准库sync提供了互斥和读写锁,另有原子操作等。mutex、rwmutex的使用并不复杂,只有几个地方需要注意。
将Mutex作为匿名字段时,相关方法必须实现为pointer-receiver,否则会因复制导致锁机制失效。

  1. type data struct {
  2. sync.Mutex
  3. }
  4. func (d data)test(s string){
  5. d.Lock()
  6. defer d.Unlock()
  7. for i:=0;i<5;i++{
  8. fmt.Println(s, i)
  9. time.Sleep(time.Second)
  10. }
  11. }
  12. func main() {
  13. var wg sync.WaitGroup
  14. var d data
  15. wg.Add(2)
  16. go func() {
  17. defer wg.Done()
  18. d.test("Read")
  19. }()
  20. go func() {
  21. defer wg.Done()
  22. d.test("write")
  23. }()
  24. wg.Wait()
  25. }

上述代码运行后会发现锁机制已失效,解决方案是将data 改为data.
也可用嵌入Mutex来避免复制问题,但那需要专门初始化。
应将Mutex锁粒度控制在最小范围内,及早释放。
Mutex不支持递归锁,即锁里面不允许有锁,否则即使在同一goroutine下也会导致死锁。
在设计并发安全类型时,千万注意此类问题。

  1. import "sync"
  2. type cache struct {
  3. sync.Mutex
  4. data []int
  5. }
  6. func (c *cache)count()int{
  7. c.Lock()
  8. n:=len(c.data)
  9. c.Unlock()
  10. return n
  11. }
  12. func (c *cache)get() int{
  13. c.Lock()
  14. defer c.Unlock()
  15. var d int
  16. if n:=c.count();n>0{ //锁中套锁
  17. d=c.data[0]
  18. c.data=c.data[1:]
  19. }
  20. return d
  21. }
  22. func main() {
  23. c:=cache{data:[]int{1,2,3,4}}
  24. c.get()
  25. }

相关建议:

  • 对性能要求较高时,应避免使用defer Unlock.
  • 读写并发时,用RWMutex性能会更好一些。
  • 对单个数据读写保护,可尝试用原子操作。
  • 执行严格测试,尽可能打开数据竞争检查。

image.jpeg