概念
[并发]
:一个cpu同时执行多项任务,通过时间分片轮询 (或多核心调度)算法,选择执行任务,任务同时执行,就是并发
[进程]
:cpu切换程序时,需要保存上一个程序的状态(context--上下文切换),就会丢失程序运行状态,引入进程概念
:进程用以划分程序运行时所需要的资源,因此进程就是一个程序运行时候的所需要的基本资源单位 (运行实体)
[并行]
:多个cpu(多核心),每个cpu都有进程在运行(同一时刻,多个任务同时执行),这就是并行
[用户态与内核态]
:系统分为两种运行状态,用户态以及内核态
[线程]
:进程切换需要消耗大量资源,线程本身几乎不占有资源,他们共享进程里的资源,内核调度切换耗费资源
[协程]
:线程还是需要内核去进行调度,切换起来也是需要把用户态的数据写入到内核态,也是需要耗费一定的计算机资源
:协程就是把自己的调度算法交给程序(用户态)去进行管理,能以更小的资源去进行并发。
多线程(multi-gopher)、协程
[goroutine]
:Goroutine是建立在线程之上的轻量级的抽象,其以非常低的代价在同一个地址空间中并行地执行多个函数方法
:相比于线程,其创建和销毁代价小很多,并且其调度是独立于线程的
:Goroutine并不会比线程更快,只是增加了更多的并发性。
:当一个goroutine被阻塞(比如等待IO),golang的scheduler会调度其它可以执行的goroutine运行。
[优点]
:内存消耗更少,Goroutine所需要的内存通常只有2kb,而线程则需要1Mb
:创建与销毁的开销更小,线程创建时需要向操作系统申请资源,并且在销毁时将资源归还
:切换开销更小,线程的调度方式是抢占式的;goroutine的调度是协同式的, 不会直接地与操作系统内核打交道
[调度]
:Processor(P)
:OSThread(M
:Goroutines(G)
=> 可用的线程数是通过GOMAXPROCS来设置,默认值是可用的CPU核数
=> OSThread 调度在processor上,goroutines调度在OSThreads上
[使用]
:go func()
[阻塞主线程]
:sync.WaitGroup 等待所有运行的goroutine运行结束后,再退出main函数 # 维护一个goroutine数量的计数器
// 是Go语言标准库的一部分;用于等待一组goroutine结束运行
:var n sync.WaitGroup
:n.Add(1) # 加一
:defer n.Done # 减一
:n.Wait() # 判断是否等于 0 (阻塞)
[死锁]
:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// 维护一个goroutine数量的计数器
var n sync.WaitGroup
for i := 0; i < 20; i++ {
n.Add(1)
go func(i int, n *sync.WaitGroup) {
defer n.Done()
time.Sleep(1 * time.Second)
fmt.Printf("goroutine %d is running\n", i)
}(i, &n)
}
// wait方法会一直阻塞,知道计数器为0
n.Wait()
}
Golang的调度器可以利用多processor资源,在任意时刻,M个goroutine需要被调度到N个OS threads上;
同时这些threads运行在至多GOMAXPROCS个processor上(N <= GOMAXPROCS)。
Go scheduler将可运行的goroutines分配到多个运行在一个或多个processor上的OS threads上。
每个processor有一个本地goroutine队列。同时有一个全局的goroutine队列。每个OSThread都会被分配给一个processor。最多只能有GOMAXPROCS个processor,每个processor同时只能执行一个OSThread。Scheculer可以根据需要创建OSThread。
在每一轮调度中,scheduler找到一个可以运行的goroutine并执行直到其被阻塞;操作系统的一个线程下可以并发执行上千个goroutine,每个goroutine所占用的资源和切换开销都很小,因此,goroutine是golang适合高并发场景的重要原因。
并发安全
[概念]
:函数在线程程序,和并发情况下可以正确允许,那么这个函数是并发安全的。
:一个特定类型的一些方法和操作函数,如果该类型是并发安全的,那么所有它的访问方法和操作都是并发安全的
[问题]
:死锁 DeadLock
:活锁 LiveLock
:饿死 ResourceStarvation
[竞争条件]
:指程序在多个 goroutine 交叉执行时,运行结果无法保持正确
:数据竞争
数据竞争
[概念]
:多个goroutine 并发访问同一个变量,且至少一个是写操作时会发生数据竞争
[消除数据竞争]
:01 不去写变量,创建goroutine 之前初始化阶段,就初始化完成,其后都只是读取
:02 避免多个goroutine访问变量,(变量的监控 monitor goroutine)
=> "不要使用共享内存来通信,使用通信来共享内存"
:03 允许多个gorountine访问变量,但同一时刻只有一个gorountine能访问该变量,即互斥
=> 互斥锁 - sync.Mutex (Lock() 获取锁、Unlock() 释放锁) ,该锁不能呢个重复获取
=> 读写锁 - sync.RWMutex (RLlock() 获取锁、RUnlock() 释放锁,写操作互斥,读操作并行)
=> 一次性初始化 - sync.Once
// "不要使用共享内存来通信,使用通信来共享内存"
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func teller() {
// balance is confined to teller goroutine
var balance int
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
}
}
}
func init() {
// start the monitor goroutine
go teller()
}
通信机制 - 通信并发
channel 信道
[channel - 信道]
:Channels允许线程间通信,goroutines 可以往里面发消息,也可以从中接收其它go routines 的消息
:channel := make(chan int)
:bufferedChan := make(chan string, 3) # 创建有缓冲信道
[使用]
:var channel chan int = make(chan int)
[信道和取消息]
:信道默认的存消息和取消息都是阻塞的 (无缓冲的信道)
:无缓冲的信道在取消息和存消息的时候都会挂起当前的goroutine,除非另一端已经准备好
=> 从无缓冲信道取数据,必须要有数据流进来才可以,否则当前线阻塞
=> 数据流入无缓冲信道, 如果没有其他goroutine来拿走这个数据,那么当前线阻塞
[创建信道]
:var channel chan int = make(chan int)
[读信道]
:x := <-channel
:fmt.Println(x)
[写信道]
:y := 1
:channel -< y
匿名Goroutines
// Anonymous go routine
go func() {
fmt.Println("I'm running in my own go routine")
}()
非阻塞读
// select case 语句可以实现对 channel 的非阻塞读
// select 是语言级内置, 非阻塞
myChan := make(chan string)
go func(){
myChan <- "Message!"
}()
select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println("No Msg")
}
<-time.After(time.Second * 1)
select {
case msg := <- myChan:
fmt.Println(msg)
default:
fmt.Println("No Msg")
}
非阻塞写
// select case 语句可以实现对 channel 的非阻塞写
// 用于从一组可能的通讯中选择一个进一步处理
select {
case myChan <- "message":
fmt.Println("sent the message")
default:
fmt.Println("no message sent")
}
同步 - 共享内存并发
互斥锁 sync.Mutex
[同步锁]
// 所有对共享数据的访问,不管读写,仅当goroutine持有锁才能操作
:n := sync.Mutex // 互斥锁
: n.Lock()
:n.Unlock()
[原子操作子包 - 基础数据类型的原子操作函数]
:sync.atomic
:func CompareAndSwapUint64(val *uint64, old, new uint64) (swapped bool)
定义了一个函数使用并发来推迟触发一个事件
// 函数Publish在给定时间过期后打印text字符串到标准输出
// 该函数并不会阻塞而是立即返回
func Publish(text string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println("BREAKING NEWS:", text)
}() // 注意这里的括号。必须调用匿名函数
}
func main() {
Publish("A goroutine starts a new thread of execution.", 5*time.Second)
fmt.Println("Let’s hope the news will published before I leave.")
// 等待发布新闻
time.Sleep(10 * time.Second)
fmt.Println("Ten seconds later: I’m leaving now.")
}
不要通过共享内存来通讯,而是通过通讯来共享内存
func sharingIsCaring() {
// 定义管道
ch := make(chan int)
go func() {
// 仅为一个goroutine可见的局部变量
n := 0
n++
// 数据从一个goroutine离开...
ch <- n
}()
// ...然后安全到达另一个goroutine
n := <-ch
n++
fmt.Println(n) // 输出: 2
}
并发数据结构,AtomicInt,用于存储一个整型值
// AtomicInt是一个并发数据结构, 持有一个整数值, 该数据结构的零值为0
type AtomicInt struct {
// 锁,一次仅能被一个goroutine持有
mu sync.Mutex
n int
}
// Add方法作为一个原子操作将n加到AtomicInt
func (a *AtomicInt) Add(n int) {
a.mu.Lock() // 等待锁释放,然后持有它
a.n += n
a.mu.Unlock() // 释放锁
}
// Value方法返回a的值
func (a *AtomicInt) Value() int {
a.mu.Lock()
n := a.n
a.mu.Unlock() // 整个结构被解锁了
return n
}
func lockItUp() {
wait := make(chan struct{})
var n AtomicInt
go func() {
n.Add(1) // 一个访问
close(wait)
}()
n.Add(1) // 另一个并发访问
<-wait
fmt.Println(n.Value()) // 输出: 2
}
读写锁 sync.RWMutex
[多读锁]
:n := sync.RWMutex
:n.RLock()
:n.RUnlock()
sync.Once 初始化
[全局唯一操作 - 保证在全局范围内仅仅调用指定的函数一次]
:sync.Once.Do()
:var once sync.Once
:once.Do(func())
数据竞争自动检测
$ go run -race raceClosure.go
Race:
==================
WARNING: DATA RACE
Read by goroutine 2:
main.func·001()
../raceClosure.go:22 +0x65
Previous write by goroutine 0:
main.race()
../raceClosure.go:20 +0x19b
main.main()
../raceClosure.go:10 +0x29
runtime.main()
../go/src/pkg/runtime/proc.c:248 +0x91
Goroutine 2 (running) created at:
main.race()
../raceClosure.go:24 +0x18b
main.main()
../raceClosure.go:10 +0x29
runtime.main()
../go/src/pkg/runtime/proc.c:248 +0x91
==================
55555
Correct:
01234
Also correct:
01324
Found 1 data race(s)
exit status 66
// 该工具发现一处数据竞争,包含:一个goroutine在第20行对一个变量进行写操作
// 跟着另一个goroutine在第22行对同一个变量进行了未同步的读操作。
并行计算
func init() {
numCpu := runtime.NumCPU()
// 尝试使用所有可用的CPU
runtime.GOMAXPROCS(numCpu)
}
type Vector []float64
// 函数mul 返回 Σ u[i]*v[j], i + j = k.
func mul(u, v Vector, k int) (res float64) {
n := min(k+1, len(u))
j := min(k, len(v)-1)
for i := k - j; i < n; i, j = i+1, j-1 {
res += u[i] * v[j]
}
return
}
// 思路很简单:确定合适大小的工作单元,然后在不同的goroutine中执行每个工作单元
// 函数Convolve 计算 w = u * v,其中 w[k] = Σ u[i]*v[j], i + j = k
// 先决条件:len(u) > 0, len(v) > 0
func Convolve(u, v Vector) (w Vector) {
n := len(u) + len(v) - 1
w = make(Vector, n)
for k := 0; k < n; k++ {
w[k] = mul(u, v, k)
}
return
}
// 并发版本
func ConvolveGo(u, v Vector) (w Vector) {
n := len(u) + len(v) - 1
w = make(Vector, n)
// 将 w 切分成花费 ~100μs-1ms 用于计算的工作单元
size := max(1, 1<<20/n)
wg := new(sync.WaitGroup)
wg.Add(1 + (n-1)/size)
for i := 0; i < n && i >= 0; i += size { // 整型溢出后 i < 0
j := i + size
if j > n || j < 0 { // 整型溢出后 j < 0
j = n
}
// 这些goroutine共享内存,但是只读
go func(i, j int) {
for k := i; k < j; k++ {
w[k] = mul(u, v, k)
}
wg.Done()
}(i, j)
}
wg.Wait()
return
}
// 业务计算
func max(a, b int) int {
return 1;
}
// 业务计算
func min(a, b int) int {
return b;
}