一、并发编程基本概念
并发编程开发将一个过程按照并行算法拆分为多个可以独立执行的代码块,从而充分利用多核和多处理器提高系统吞吐率
顺序、并发与并行
a)顺序是指发起执行的程序只能有一个
b)并发是指同时发起执行(同时处理)的程序可以有很多个(单车道并排只能有一辆车,可同时驶入路段多辆车)
c)并行是指同时执行(同时做)的程序可以有很多个(多车道并排可以有多个车)
操作系统:进程和线程
进程:资源分配的基本单位
线程:CPU调度的基本单位
二、例程
Go语言中每个并发执行的单元叫Goroutine,使用go关键字后接函数调用 来创建Goroutine
package main
import (
"fmt"
"time"
)
func testA() {
for i := 0; i <= 10; i++ {
fmt.Printf("%d", i)
}
fmt.Println()
}
func testB() {
for i := 'A'; i <= 'Z'; i++ {
fmt.Printf("%c", i)
}
fmt.Println()
}
func main() {
fmt.Println("Start")
go testA() // 启动一个例程
go testB() // 启动一个例程
time.Sleep(time.Second * 3)
fmt.Println("End")
// main例程 =》 主例程
// go 工作例程
// 主例程 不等待工作例程执行结束
}
输出:
main函数也是由一个例程来启动执行,这个例程称为主例程,其他例程叫工作例程,主例程结束后工作例程也会随之销毁,使用sync.WaitGroup(技术信号量)来维护执行例程执行状态 可以通过runtime包中的GoSched让例程主动让出CPU,也可以通过time.Sleep让例程休眠从而让出CPU
2.1 技术信号量
package main
import (
"fmt"
"sync"
)
func testA(wg *sync.WaitGroup) {
for i := 0; i <= 10; i++ {
fmt.Printf("%d", i)
}
fmt.Println()
// 等待信号量-1
defer wg.Done()
}
func testB(wg *sync.WaitGroup) {
for i := 'A'; i <= 'Z'; i++ {
fmt.Printf("%c", i)
}
fmt.Println()
// 等待信号量-1
defer wg.Done()
}
func main() {
fmt.Println("Start")
// 计算信号量
// 启动例程的之前,+1
// 当例程执行结束时,-1
//var wg sync.WaitGroup
wg := new(sync.WaitGroup)
wg.Add(2)
go testA(wg) // 启动一个例程
go testB(wg) // 启动一个例程
//time.Sleep(time.Second * 3)
wg.Wait()
fmt.Println("End")
}
2.2 闭包陷阱
> 闭包:在一个函数的内部引用了函数外部的变量,使函数的外部变量的生命周期延长
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
// 使用for循环启动3个例程
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
输出:
因为闭包使用函数外变量,当例程执行是,外部变量已经发生变化,导致打印内容不正确,可使用在创建例程时通过函数传递参数(值拷贝)方式避免
三、 并发程序的通信方式
3.1 共享数据(同步)
多个并发程序需要对同一个资源进行访问,则需要先申请资源的访问权限,同时再使用完成后释放资源的访问权。当资源被其他程序已申请访问权后,程序应该等待访问权被释放并被申请到时进行访问操作。同一时间资源只能被一个程序访问和操作
package main
import (
"fmt"
"runtime"
"sync"
)
// 10 个例程分别给salary + 10 1000
// 10 个例程分别给salary - 10 1000
func main() {
var salary int = 0
var wg sync.WaitGroup
// 定义锁
var locker sync.Mutex
fmt.Println("Start")
for i := 0; i < 10; i++ {
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
locker.Lock() // 加锁
salary += 10
locker.Unlock() // 释放锁
runtime.Gosched()
}
}()
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
locker.Lock()
salary -= 10
locker.Unlock()
runtime.Gosched()
}
}()
}
wg.Wait()
fmt.Println(salary)
fmt.Println("End")
}
输出:
Start
0
End
3.2 管道(异步)
a. 不带缓冲区的管道
死锁
:无缓冲区的管道,如果一直往channel里面写数据,而不读取则会产生死锁
数据处理者处理完数据后将数据放入缓冲区中,数据接收者从缓冲区中获取数据,处理者不用等待接收者是否准备好处理数据
package main
import "fmt"
type Element struct {
}
func main() {
// 管道中放什么类型需要提前执行
// 声明int类型的管道channel
// var channel chan int =make(chan int)
channel := make(chan int) // 无缓冲区管道
// 初始化&赋值
// make()
// channel = make(chan int)
// 操作
// 读,写
go func() {
fmt.Println("go Start")
channel <- 1 // 将1写入管道
fmt.Println("go end")
}()
fmt.Println("channel begin")
num := <-channel // 如果未读取到数据会进行阻塞
fmt.Println("channel after") // go start 之后
fmt.Println(num)
}
输出:
channel begin
go Start
go end
channel after
1
如果管道关闭后则管道不能继续写close(channel)
,但是可以读取channel
,但是读取到的channel
值为0
判断管道是否关闭,通过v,ok := <-channel
,如果ok
为true
则表示管道开启,否则为关闭
遍历管道
package main
import "fmt"
func main() {
channel := make(chan int)
go func() {
channel <- 1
channel <- 2
channel <- 3
close(channel)
}()
for num := range channel {
fmt.Println(num)
}
}
输出:
1
2
3
b.带缓冲区的管道
死锁
:有缓冲区的管道,如果一直往channel里面写数据超过最大长度后,不进行读取则会产生死锁c.只读只写管道
chan<-
:只写管道<-chan
:只读管道package main
import (
"fmt"
)
// 只写
func in(channel chan<- int) {
channel <- 1
channel <- 2
}
// 只读
func out(channel <-chan int) {
fmt.Println(<-channel)
fmt.Println(<-channel)
}
func main() {
channel := make(chan int, 3)
in(channel)
out(channel)
}
输出:
1
2
3.3多路复用
package main
import (
"fmt"
"time"
)
func main() {
// 在多个管道中只要有一个操作成功就执行相应逻辑
channelA := make(chan int)
channelB := make(chan int)
go func() {
time.Sleep(3 * time.Second)
//channelA <- 1
close(channelA)
}()
for {
select {
case v, ok := <-channelA:
fmt.Println("a", v, ok)
case v, ok := <-channelB:
fmt.Println("b", v, ok)
default:
fmt.Println("default")
}
}
}
程序执行超时时间:
package main
import (
"fmt"
"math/rand"
"time"
)
func task(result chan<- int64) {
interval := rand.Intn(10)
fmt.Println("sleep:", interval)
time.Sleep(time.Duration(interval) * time.Second) // 随机休眠n秒
result <- time.Now().Unix()
}
func main() {
rand.Seed(time.Now().Unix()) // 设置随机数种子
//var result chan int64 =make(chan int64)
//timeout := make(chan int)
result := make(chan int64)
go task(result)
//go func() {
// time.Sleep(3 * time.Second)
// close(timeout)
//}()
select {
case r := <-result:
fmt.Println("success:", r)
//case <-timeout:
// fmt.Println("timeout")
case <-time.After(3 * time.Second):
fmt.Println("timeout")
}
fmt.Println(time.Now())
}
输出:
sleep: 7
timeout
2021-08-19 18:27:19.9093326 +0800 CST m=+3.003692901
四、sync包
sync.WaitGroup
:计数信号量sync.Mutex
:锁 互斥锁
- 共享数据 读
- 共享数据 写
sync.RWMutex
:读写锁sync.Cond
:条件锁
- 多个例程,某个执行检查是否满足条件,不满足等待 Wait
- 其他例程,当可能产生等待例程条件重新满足,通知等待例程 Signal/Boardcase
sync.Pool
:线程池,连接池,…
- 对象池,从池中获取对象,当池中无可用对象,创建并返回
- 当使用完成会放入池中
- 示例:
package main
mport (
"fmt"
"sync"
unc main() {
intPool := &sync.Pool{
New: func() interface{} {
fmt.Println("new")
return 1
},
}
v := intPool.Get() // 需要创建 -> New
// v断言某类型,执行
fmt.Println(v)
// 使用完成放入池中
intPool.Put(v) // 1个对象
v1 := intPool.Get()
v2 := intPool.Get() // 需要创建 -> New
fmt.Println(v1, v2)
出:
ew
ew
1
五、runtime包
runtime.GOMAXPROCS(1)
:设置使用cpu核数,开发终端对cpu要求有限制 4核 1runtime.GOROOT
:获取GOROOT的安装目录runtime.NumCPU
:获取CPU的核数runtime.NumGoroutine
:获取例程的数量,默认为1个