go并发编程
①、协程Goroutine
:::success Goroutine:每个协程的堆栈,go语言默认一开始分配2k但是很智能,可以自适应增长;Go语言中的goroutine就是这样一种机制,goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。
语法:**<font style="color:#DF2A3F;">go</font>**** func**
子协程异常退出:子协程的异常退出会将异常传播到主协程,直接会导致主协程也跟着挂掉,然后整个程序就崩溃了;使用panic和recover解决。
:::
:::color2 主协程(**main**)退出了,其他该协程的子协程任务全部结束执行;
父协程(**非main**)退出了:只要main没结束,其他该协程的子协程任务继续执行;
:::
package main
import (
"fmt"
"time"
)
func main() {
// 合起来写
go func() {
i := 0
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(time.Second)
}
}()
i := 0
for {
i++
fmt.Printf("main goroutine: i = %d\n", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
}
/* 输出打印
main goroutine: i = 1
new goroutine: i = 1
new goroutine: i = 2
main goroutine: i = 2
*/
package main
import (
"fmt"
"time"
)
func main() {
// 合起来写
go func() {
i := 0
go func() {
i := 0
for {
i++
fmt.Printf("zi zi goroutine: i = %d\n", i)
time.Sleep(time.Second)
}
}()
for {
i++
fmt.Printf("new goroutine: i = %d\n", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
}()
i := 0
for {
i++
fmt.Printf("main goroutine: i = %d\n", i)
time.Sleep(time.Second)
if i == 10 {
break
}
}
}
/* 输出
main goroutine: i = 1
new goroutine: i = 1
zi zi goroutine: i = 1
new goroutine: i = 2
zi zi goroutine: i = 2
main goroutine: i = 2
zi zi goroutine: i = 3
main goroutine: i = 3
zi zi goroutine: i = 4
main goroutine: i = 4
zi zi goroutine: i = 5
main goroutine: i = 5
zi zi goroutine: i = 6
main goroutine: i = 6
zi zi goroutine: i = 7
main goroutine: i = 7
zi zi goroutine: i = 8
main goroutine: i = 8
zi zi goroutine: i = 9
main goroutine: i = 9
zi zi goroutine: i = 10
main goroutine: i = 10
zi zi goroutine: i = 11
*/
:::color2
goroutine与OS线程对比:
可增长的栈:OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,他可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到这个大。所以在Go语言中一次创建十万左右的goroutine也是可以的。
goroutine调度:GPM是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统(调度是在用户态下完成的)。不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
:::
②、通道channel
:::warning 协程的同步问题解决方案:1、channel 2、 锁
go语言中,控制协程间的并发主要是用channel;如果说goroutine是Go语言程序的并发体的话,那么channel是它们之间的通信机制(锁用的很少,协程间channel是并发安全的**)**。
channel:
- 作为协程的输出,通道是一个容器,它可以容纳数据。
- 作为协程的输入,通道是一个生产者,它可以向协程提供数据。
- 通道作为容器是有限定大小的,满了就写不进去,空了就读不出来。
- 通道有它自己的类型,它可以限定进入通道的数据的类型。
:::
:::color2
channel是一种类型,一种引用类型,声明通道类型:var 变量 chan 元素类型
如:
var ch1 chan int
// 声明一个传递整型的双向通道 ch1 --> <nil>
var ch2 chan bool
// 声明一个传递布尔型的双向通道 ch2 --> <nil>
var ch3 chan []int
// 声明一个传递int切片的双向通道 ch3 --> <nil>
声明的通道后需要**使用make函数初始化之后才能使用**:创建channel的格式:
make(chan 元素类型, [缓冲大小])
如:
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
「缓冲型通道」:上述通道就是缓冲型通道;
「非缓冲型通道」:省略[缓冲大小]为非缓冲型通道,只能写入一个,等对方读取了才能再写,对方没读,你还写你就会阻塞。
比较:两个相同类型的channel可以使用==运算符比较。如果两个channel引用的是相通的对象,那么比较的结果为真。一个channel也可以和nil进行比较
:::
:::color2 channel操作:
发送(send):ch := make(chan int) //创建一个通道
;ch <- 10 // 把10发送到ch中
接收(receive):x := <- ch // 从ch中接收值并赋值给变量x
、<-ch // 从ch中接收值,**<font style="color:#DF2A3F;">忽略结果</font>**
、data, ok := <-ch // 从ch中接收值,并通过ok判断是否通道关闭
关闭(close):**<font style="color:#DF2A3F;">内置函数</font>**close(ch)
:关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值就会导致panic。
- 对一个关闭的通道进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
:::
:::success 无缓冲的通道:无缓冲的通道又称为阻塞的通道,无缓冲通道也被称为同步通道;
无缓冲的通道主线程不能阻塞,deadlock只检查主线程,主线程发送时,必须有人接收值的时候才能发送值,主线程接收时,必须要有人发送值才能接收。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。 :::go
// ① 主发送,无接收
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
/* 会报deadlock错,因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
*/
// ② 协程发送,无人接收
func recv(c chan int) {
fmt.Println("发送成功")
c <- 10
time.Sleep(time.Second*2)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
//
// ret := <-ch
time.Sleep(time.Second)
//fmt.Println("接收成功", ret)
}
// 编译通过
// ③ 主线程接收,无人发送
func recv(c chan int) {
// c <- 10
fmt.Println("发送成功")
time.Sleep(time.Second*2)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
//
ret := <-ch
time.Sleep(time.Second)
fmt.Println("接收成功", ret)
}
/*
发送成功
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
E:/桌面整理/00零声学院/0voice_go/2.资料/01-GO语言基础精讲和test方法/1-src/1-src/src/3/printa-z.go:17 +0x85
exit status 2
*/
// ④ 协程接收,无人发送
func recv(c chan int) {
ret := <-c
time.Sleep(time.Second)
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用goroutine从通道接收值
time.Sleep(time.Second*2)
}
/* 无输出结果 */
:::success
有缓冲区的通道:使用make函数初始化通道的时候为其指定通道的容量,如make(chan int, 1)
;
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量;
我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量。
主线程不能永远阻塞**,若逻辑出现问题就会报deadlock的错误;**
:::
func main() {
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}
// 不会报错
func main() {
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
ch <- 10
fmt.Println("发送成功")
}
// 又会报死锁错,主要思想就是主线程不能阻塞
:::success 正确的通道读方式:当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。如果通道里的元素是整型的,读操作是不能通过返回值来确定通道是否关闭的。
有两种方式在接收值的时候判断通道是否被关闭,我们通常使用的是for range的方式。:::
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}
/*
PS E:\2.资料\01-GO语言基础精讲和test方法\1-src\1-src\src> go run .\3\printa-z.go
0
1
2
3
4
main结束
PS E:\2.资料\01-GO语言基础精讲和test方法\1-src\1-src\src>
*/
package main
import "fmt"
// channel 练习
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 开启goroutine将0~100的数发送到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
go func() {
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}
/*
PS E:\01-GO语言基础精讲和test方法\1-src\1-src\src> go run .\3\printa-z.go
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361
400
441
484
529
576
625
676
729
784
841
900
961
1024
1089
1156
1225
1296
1369
1444
1521
1600
1681
1764
1849
1936
2025
2116
2209
2304
2401
2500
2601
2704
2809
2916
3025
3136
3249
3364
3481
3600
3721
3844
3969
4096
4225
4356
4489
4624
4761
4900
5041
5184
5329
5476
5625
5776
5929
6084
6241
6400
6561
6724
6889
7056
7225
7396
7569
7744
7921
8100
8281
8464
8649
8836
9025
9216
9409
9604
9801
PS E:\01-GO语言基础精讲和test方法\1-src\1-src\src>
*/
:::success 通道写安全:
单写多读确保通道写安全的最好方式是由负责写通道的协程自己来关闭通道,读通道的协程不要去关闭通道。
多写确保通道写安全方式是使用到内置 sync 包提供的 WaitGroup 对象,它使用计数来等待指定事件完成。
:::
:::success 单向通道:有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。
**chan<- int**
是一个只能发送的通道,可以发送但是不能接收;
**<-chan int**
是一个只能接收的通道,可以接收但是不能发送。
函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。
:::
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
// 2-6 WaitGroup 在写端关闭channel对单写的程序有效,但是多写的时候呢?
package main
import (
"fmt"
"sync"
"time"
)
func send(ch chan int, wg *sync.WaitGroup) {
defer wg.Done() // 计数值减一
i := 0
for i < 4 {
i++
ch <- i
}
}
func recv(ch chan int) {
for v := range ch {
fmt.Println(v)
}
}
// 只要一个值能做界定符 比如nil, 比如0xfffe
func main() {
var ch = make(chan int, 4)
var wg = new(sync.WaitGroup)
wg.Add(2) // 增加计数值
go send(ch, wg) // 写
go send(ch, wg) // 写
go recv(ch)
// Wait() 阻塞等待所有的写通道协程结束
// 待计数值变成零,Wait() 才会返回
wg.Wait()
// 关闭通道
close(ch)
time.Sleep(time.Second)
}
/*
PS E:\3-src\3-src\src> go run .\2\2-6-channel-waitgroup.go
1
2
3
4
1
2
3
4
PS E
*/
:::success
:::
③、WaitGroup
:::success
**<font style="color:rgb(51, 51, 51);">sync.WaitGroup</font>**
来实现并发任务的同步;引包<font style="color:rgb(51, 51, 51);">import "sync"</font>
;
:::
方法名 | 功能 |
---|---|
(wg WaitGroup) *Add(delta int) | 计数器+delta |
(wg WaitGroup) *Done() | 计数器-1 |
(wg WaitGroup) *Wait() | 阻塞直到计数器变为0 |
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 启动另外一个goroutine去执行hello函数
fmt.Println("main goroutine done!")
wg.Wait()
}
:::success 需要注意sync.WaitGroup是一个结构体,传递参数的时候要传递指针。
:::
④、多路复用select
在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。Go内置了**<font style="color:rgb(51, 51, 51);">select</font>**
关键字,可以同时响应多个通道的操作。
<font style="color:rgb(51, 51, 51);">select</font>
的使用类似于<font style="color:rgb(51, 51, 51);">switch</font>
语句,它有一系列<font style="color:rgb(51, 51, 51);">case</font>
分支和一个<font style="color:rgb(51, 51, 51);">默认</font>
的分支。每个case会对应一个通道的通信(接收或发送)过程。<font style="color:rgb(51, 51, 51);">select</font>
会一直等待,直到某个<font style="color:rgb(51, 51, 51);">case</font>
的通信操作完成时,就会执行case分支对应的语句。具体格式如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
:::success select功能:
- select可以同时监听一个或多个channel,直到其中一个channel ready;
- 如果多个channel同时ready,则随机选择一个执行
- 可以用于判断管道是否存满
:::
package main
import (
"fmt"
"time"
)
func test1(ch chan string) {
time.Sleep(time.Second * 5)
ch <- "test1"
}
func test2(ch chan string) {
time.Sleep(time.Second * 2)
ch <- "test2"
}
func main() {
// 2个管道
output1 := make(chan string)
output2 := make(chan string)
// 跑2个子协程,写数据
go test1(output1)
go test2(output2)
// 用select监控
select {
case s1 := <-output1:
fmt.Println("s1=", s1)
case s2 := <-output2:
fmt.Println("s2=", s2)
}
}
/* s2= test2 */
package main
import (
"fmt"
)
func main() {
// 创建2个管道
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
go func() {
//time.Sleep(2 * time.Second)
int_chan <- 1
}()
go func() {
string_chan <- "hello"
}()
select {
case value := <-int_chan:
fmt.Println("int:", value)
case value := <-string_chan:
fmt.Println("string:", value)
}
fmt.Println("main结束")
}
/*
string: hello
main结束
*/
package main
import (
"fmt"
"time"
)
// 判断管道有没有存满
func main() {
// 创建管道
output1 := make(chan string, 10)
// 子协程写数据
go write(output1)
// 取数据
for s := range output1 {
fmt.Println("res:", s)
time.Sleep(time.Second)
}
}
func write(ch chan string) {
for {
select {
// 写数据
case ch <- "hello":
fmt.Println("write hello")
default:
fmt.Println("channel full")
}
time.Sleep(time.Millisecond * 500)
}
}
/*
res: hello
write hello
write hello
write hello
res: hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
write hello
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
write hello
channel full
res: hello
*/
:::success 非阻塞读写:当通道空时,读操作不会阻塞,当通道满时,写操作也不会阻塞。非阻塞读写需要依靠 select 语句的 default 分支。当 select 语句所有通道都不可读写时,如果定义了 default 分支,那就会执行 default 分支逻辑,这样就起到了不阻塞的效果。
:::
// 非阻塞读写
package main
import (
"fmt"
"time"
)
func send(ch1 chan int, ch2 chan int) {
i := 0
for {
i++
select {
case ch1 <- i:
fmt.Printf("send ch1 %d\n", i)
case ch2 <- i:
fmt.Printf("send ch2 %d\n", i)
default:
fmt.Printf("ch block\n")
time.Sleep(2 * time.Second) // 这里只是为了演示
}
}
}
func recv(ch chan int, gap time.Duration, name string) {
for v := range ch {
fmt.Printf("receive %s %d\n", name, v)
time.Sleep(gap)
}
}
func main() {
// 无缓冲通道
var ch1 = make(chan int)
var ch2 = make(chan int)
// 两个消费者的休眠时间不一样,名称不一样
go recv(ch1, time.Second, "ch1")
go recv(ch2, 2*time.Second, "ch2")
send(ch1, ch2)
}
/* 截取开始的一部分
ch block
send ch2 2
send ch1 3
ch block
receive ch1 3
receive ch2 2
send ch1 5
send ch2 6
ch block
receive ch2 6
receive ch1 5
send ch1 8
send ch2 9
ch block
receive ch2 9
receive ch1 8
send ch1 11
send ch2 12
ch block
receive ch2 12
receive ch1 11
send ch2 14
send ch1 15
ch block
*/
⑤、生产者消费者模型
// 生产者、消费者模型
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// 生产者
func Producer(factor int, out chan<- int) {
for i := 0; ; i++ {
out <- i * factor
time.Sleep(3 * time.Second)
}
}
// 消费者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64)
go Producer(3, ch) // 生成3的倍数序列
go Producer(5, ch) // 生成5的倍数序列
go Consumer(ch)
//Ctrl +C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("wait Ctrl +C\n")
fmt.Printf("quit (%v)\n", <-sig)
}
⑥、并发安全和锁
:::success 数据竞态:可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题;
竞态检查工具**:是基于运行时代码检查,而不是通过代码静态分析来完成的。这意味着那些没有机会运行到的代码逻辑中如果存在安全隐患,它是检查不出来的。需要加上-race 执行**。go run <font style="color:#DF2A3F;">-race</font> 代码
:::
:::success 互斥锁(**sync.Mutex**):互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。
:::
// 不加锁版本 输出6379
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
// 加锁版本 输出10000
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
:::success
读写互斥锁:互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的<font style="color:rgb(51, 51, 51);">RWMutex</font>
类型。
读写锁优势:读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
:::
package main
import (
"fmt"
"time"
"sync"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
lock.Lock() // 加互斥锁
// rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
// rwlock.Unlock() // 解写锁
lock.Unlock() // 解互斥锁
wg.Done()
}
func read() {
lock.Lock() // 加互斥锁
// rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
// rwlock.RUnlock() // 解读锁
lock.Unlock() // 解互斥锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
// 读写锁 146.8492ms
// 互斥锁 15.0116383s
:::success
避免锁复制:sync.Mutex
是一个结构体对象,这个对象在使用的过程中要避免被复制 —— 浅拷贝。复制会导致锁被「分裂」了,也就起不到保护的作用。所以在平时的使用中要尽量使用它的指针类型。
锁复制存在于结构体变量的赋值、函数参数传递、方法参数传递中,都需要注意。
:::
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]int
mutex *sync.Mutex
}
func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{
data: data,
mutex: &sync.Mutex{},
}
}
// defer 语句总是要推迟到函数尾部运行,所以如果函数逻辑运行时间比较长,
// 这会导致锁持有的时间较长,这时使用 defer 语句来释放锁未必是一个好注意。
func (d *SafeDict) Len() int {
d.mutex.Lock()
defer d.mutex.Unlock()
return len(d.data)
}
// func (d *SafeDict) Test() int {
// d.mutex.Lock()
// length := len(d.data)
// d.mutex.Unlock() // 手动解锁 减少粒度 // 这种情况就不要用 defer d.mutex.Unlock()
// fmt.Println("length: ", length)
// // 这里还有耗时处理 耗时1000ms
// }
func (d *SafeDict) Put(key string, value int) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}
func (d *SafeDict) Get(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
return old_value, ok
}
func (d *SafeDict) Delete(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}
func write(d *SafeDict) {
d.Put("banana", 5)
}
func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}
// go run -race 3-2-lock.go
func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}
:::info 匿名锁字段:在结构体章节,我们知道外部结构体可以自动继承匿名内部结构体的所有方法。如果将上面的SafeDict 结构体进行改造,将锁字段匿名,就可以稍微简化一下代码。
:::
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]int
*sync.Mutex
}
func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{
data,
&sync.Mutex{}, // 一样是要初始化的
}
}
func (d *SafeDict) Len() int {
d.Lock()
defer d.Unlock()
return len(d.data)
}
func (d *SafeDict) Put(key string, value int) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}
func (d *SafeDict) Get(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
return old_value, ok
}
func (d *SafeDict) Delete(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}
func write(d *SafeDict) {
d.Put("banana", 5)
}
func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}
func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}
⑦、发布订阅练习
⑧、Context上下文
:::success Context:每一个处理都应该有个超时限制,需要在调用中传递这个超时,Context是协程安全的。代码中可以将单个Context传递给任意数量的goroutine,并在取消该Context时可以将信号传递给所有的goroutine。
对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancel、WithDeadline、WithTimeout或WithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。:::
:::success Context接口:
◼ **Deadline**
方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消;
◼ **<font style="color:#DF2A3F;">Done</font>**
方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源;
◼ **Err**
方法返回取消的错误原因,因为什么Context被取消;
- 如果当前Context被取消就会返回
<font style="color:rgb(51, 51, 51);">Canceled</font>
错误; - 如果当前Context超时就会返回
<font style="color:rgb(51, 51, 51);">DeadlineExceeded</font>
错误;
◼ **Value**
方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。
:::
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
:::success Background()和TODO():
Go内置两个函数:**<font style="color:#DF2A3F;">Background()和TODO()</font>**
,这两个函数分别返回一个实现了Context接口的background和todo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。
Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。
TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。
background和todo本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。
:::
:::success With系列函数:四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思;
◼ **WithCancel**
函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context
◼ **WithDeadline**
函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消
◼ **WithTimeout**
和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思,只是传参数不一样。
◼ **WithValue**
函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,WithValue返回父节点的副本,其中与key关联的值为val。仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。
:::
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key interface{}, val interface{}) Context
func gen(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return // return结束该goroutine,防止泄露
case dst <- n:
n++
}
}
}()
return dst
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 当我们取完需要的整数后调用cancel
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
}
/*上面的示例代码中,gen函数在单独的goroutine中生成整数并将它们发送到返回的通道。
gen的调用者在使用生成的整数之后需要取消上下文,以免gen启动的内部goroutine发生泄漏*/
/* 输出
1
2
3
4
5
*/
package main
import (
"fmt"
"time"
"context"
)
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
// 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
// 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}
}
// 输出 context deadline exceeded
package main
import (
"context"
"fmt"
"sync"
"time"
)
// context.WithTimeout
var wg sync.WaitGroup
func worker(ctx context.Context) {
LOOP:
for {
fmt.Println("db connecting ...")
time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
select {
case <-ctx.Done(): // 50毫秒后自动调用
break LOOP
default:
}
}
fmt.Println("worker done!")
wg.Done()
}
func main() {
// 设置一个50毫秒的超时
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second * 5)
cancel() // 通知子goroutine结束
wg.Wait()
fmt.Println("over")
}
/*
db connecting ...
db connecting ...
db connecting ...
worker done!
over
*/
package main
import (
"context"
"fmt"
"sync"
"time"
)
// context.WithValue
type TraceCode string
var wg sync.WaitGroup
func worker(ctx context.Context) {
key := TraceCode("TRACE_CODE")
traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace code
if !ok {
fmt.Println("invalid trace code")
}
LOOP:
for {
fmt.Printf("worker, trace code:%s\n", traceCode)
time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
select {
case <-ctx.Done(): // 50毫秒后自动调用
break LOOP
default:
}
}
fmt.Println("worker done!")
wg.Done()
}
func main() {
// 设置一个50毫秒的超时
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
// 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合
ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second * 5)
cancel() // 通知子goroutine结束
wg.Wait()
fmt.Println("over")
}
/*
worker, trace code:12512312234
worker, trace code:12512312234
worker, trace code:12512312234
worker, trace code:12512312234
worker done!
over
*/
:::success Context使用原则:
◼ 不要把Context放在结构体中,要以参数的方式进行传递;
◼ 以 Context 作为参数的函数方法,应该把 Context 作为第一个参数;
◼ 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO;
◼ Context 的 Value 相关方法应该传递请求域的必要数据,不应该用于传递可选参数;
◼ Context 是线程安全的,可以放心的在多个 Goroutine 中传递。
:::
:::success contexts派生上下文:Context包提供了从现有Context值派生新Context值的函数。这些值形成一个树:当一个Context被取消时,从它派生的所有Context也被取消。
:::
⑨、调用服务端API时如何在客户端实现超时控制?
// context_timeout/server/main.go
package main
import (
"fmt"
"math/rand"
"net/http"
"time"
)
// server端,随机出现慢响应
func indexHandler(w http.ResponseWriter, r *http.Request) {
number := rand.Intn(2)
if number == 0 {
time.Sleep(time.Second * 10) // 耗时10秒的慢响应
fmt.Fprintf(w, "slow response")
return
}
fmt.Fprint(w, "quick response")
}
func main() {
http.HandleFunc("/", indexHandler)
err := http.ListenAndServe(":8000", nil)
if err != nil {
panic(err)
}
}
// context_timeout/client/main.go
package main
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
// 客户端
type respData struct {
resp *http.Response
err error
}
func doCall(ctx context.Context) {
transport := http.Transport{
// 请求频繁可定义全局的client对象并启用长链接
// 请求不频繁使用短链接
DisableKeepAlives: true, }
client := http.Client{
Transport: &transport,
}
respChan := make(chan *respData, 1)
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
if err != nil {
fmt.Printf("new requestg failed, err:%v\n", err)
return
}
req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client request
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
resp, err := client.Do(req)
fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
rd := &respData{
resp: resp,
err: err,
}
respChan <- rd
wg.Done()
}()
select {
case <-ctx.Done():
//transport.CancelRequest(req)
fmt.Println("call api timeout")
case result := <-respChan:
fmt.Println("call server api success")
if result.err != nil {
fmt.Printf("call server api failed, err:%v\n", result.err)
return
}
defer result.resp.Body.Close()
data, _ := ioutil.ReadAll(result.resp.Body)
fmt.Printf("resp:%v\n", string(data))
}
}
func main() {
// 定义一个100毫秒的超时
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel() // 调用cancel释放子goroutine资源
doCall(ctx)
}
/* 请求失败打印
call api timeout
client.do resp:<nil>, err:Get "http://127.0.0.1:8000/": context deadline exceeded
*/
/* 请求成功打印
client.do resp:&{200 OK 200 HTTP/1.1 1 1 map[Content-Length:[14] Content-Type:[text/plain; charset=utf-8] Date:[Sat, 08 Apr 2023 13:55:48 GMT]] 0xc00019e040 14 [] true
false map[] 0xc00012e100 <nil>}, err:<nil>
call server api success
resp:quick response
*/