仅执行一次
场景
code
只执行一次,输出一个数字结果。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
var once sync.Once
for i := 0; i < 10; i++ {
once.Do(func() {
num := rand.Intn(10)
fmt.Println(num)
})
}
}
sync.Once
其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
思考
假如加载配置是并发的,某个函数发现配置是空的nil
,并发去加载配置,假如一个goroutine加载配置时出错,导致只加载了部分配置;其他goroutine发现配置不是空的,不去加载。最终结果是配置没加载完整。
单例模式
定义:单例对象的类必须保证只有一个实例存在,全局有唯一接口访问。
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
仅需任意任务完成
场景
这里所有任务都完成了,但是只用了最快的一个结果,所以是所有任务都完成了;
当有一个任务完成时,取消其他任务,因为任务都是有开销的。
code
package main
import (
"fmt"
"runtime"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func firstResponse() string {
numOfRunner := 10
// 使用带缓存的channel,让goroutines不会堵塞。
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
// 任意一个返回,这个函数就返回了。
return <-ch
}
func main() {
fmt.Println("Before:", runtime.NumGoroutine())
fmt.Println(firstResponse())
time.Sleep(time.Second * 1)
fmt.Println("After:", runtime.NumGoroutine())
}
所有任务都完成
基于基于CSP实现
package main
import (
"fmt"
"sync"
)
func main() {
var mutex sync.Mutex
max := 10000
ch := make(chan int, max)
for i := 0; i < max; i++ {
go func() {
mutex.Lock()
ch <- 1
defer func() {
mutex.Unlock()
}()
}()
}
counter := 0
for i := 0; i < max; i++ {
counter += <-ch
}
fmt.Println("counter:", counter)
}
基于sync.WaitGroup
在代码中生硬的使用time.Sleep
肯定是不合适的,Go语言中可以使用sync.WaitGroup
来实现并发任务的同步。 sync.WaitGroup
有以下几个方法:
方法名 | 功能 |
---|---|
(wg *WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup
内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
package main
import (
"fmt"
"sync"
)
func main() {
var mutex sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 10000; i++ {
wg.Add(1) // 每启动一个协程都新增加一个等待
go func() {
mutex.Lock()
defer func() {
mutex.Unlock()
wg.Done()
}()
counter++
}(i)
}
wg.Wait()
fmt.Println("counter:", counter)
}
对象池
适合通过复用降低复杂对象的创建和GC的代价
协程安全,会有锁的开销
生命周期受GC影响,不适合做连接池等需要自己管理生命周期的资源的池化。
code
基于buffered channel实现对象池,取用完后放回channel。
package object_pool
import (
"errors"
"fmt"
"testing"
"time"
)
type ReusableObject struct {
token int
}
type ObjectPool struct {
bufChan chan *ReusableObject // 用于缓冲可重用对象
}
func NewObjectPool(numOfObject int) *ObjectPool {
objectPool := ObjectPool{}
objectPool.bufChan = make(chan *ReusableObject, numOfObject)
for i := 0; i < numOfObject; i++ {
objectPool.bufChan <- &ReusableObject{
token: i,
}
}
return &objectPool
}
func (pool *ObjectPool) GetObject(timeout time.Duration) (*ReusableObject, error) {
select {
case ret := <-pool.bufChan:
return ret, nil
case <-time.After(timeout): //超时控制
return nil, errors.New("time out")
}
}
func (pool *ObjectPool) ReleaseObject(object *ReusableObject) error {
select {
case pool.bufChan <- object:
return nil
default:
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T) {
pool := NewObjectPool(10)
// 创建对象池后,对象池是满的
if err := pool.ReleaseObject(&ReusableObject{}); err != nil { //尝试放置超出池大小的对象
t.Error(err)
}
for i := 0; i < 11; i++ {
if v, err := pool.GetObject(time.Second); err != nil {
t.Error(err)
} else {
fmt.Printf("%T %d\n", v, v.token)
// 使用后立即释放
if err := pool.ReleaseObject(v); err != nil {
t.Error(err)
}
}
}
fmt.Println("Done")
}
sync.pool 对象生命周期
- gc会清除sync.pool缓存的对象
- 对象的有效期是下次gc前 —> gc 执行的时机是什么?
带来的思考
每次获取对象,可能会受锁的限制,所以是创建对象的开销大,还是锁带来的开销大需要根据实际情况权衡。
code
package main
import (
"fmt"
"runtime"
"sync"
)
func SyncPool() {
pool := &sync.Pool{
// 当对象池为空时,调用get时会自动New创建一个新的对象,可以理解为默认对象
New: func() interface{} {
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
runtime.GC() //GC 会清除sync.pool中缓存的对象
v1, _ := pool.Get().(int)
fmt.Println(v1)
}
func SyncPoolInMultiGoroutine() {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 10
},
}
pool.Put(1)
pool.Put(2)
pool.Put(3)
pool.Put(4)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get())
wg.Done()
}(i)
}
wg.Wait()
}
func main() {
//SyncPool()
SyncPoolInMultiGoroutine()
}
多路选择和超时控制
select
的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select
会一直等待,直到某个case
的通信操作完成时,就会执行case
分支对应的语句。
- 可处理一个或多个channel的发送/接收操作。
- 如果多个
case
同时满足,select
会随机选择一个。 - 对于没有
case
的select{}
会一直等待,可用于阻塞main函数。
// 多路选择器与超时
package main
import (
"fmt"
"github.com/asmcos/requests"
"time"
)
func main() {
responses := make(chan string, 3)
go func() {
resp, _ := requests.Get("http://qq.com")
responses <- resp.Text()
}()
go func() {
resp, _ := requests.Get("http://sina.com")
responses <- resp.Text()
}()
go func() {
resp, _ := requests.Get("http://baidu.com")
responses <- resp.Text()
}()
select {
case res := <-responses:
fmt.Println(res)
case <-time.After(time.Millisecond * 5):
fmt.Println("timeout 5ms")
}
}
任务取消
通过channel传递取消信号
package main
import (
"fmt"
"time"
)
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}
//部分取消
//向channel发送一个值,只有一个订阅者能取值
func cancel1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}
//全部取消
//关闭channel,所有订阅者都能取到值(chan 的零值)
func cancel2(cancelChan chan struct{}) {
close(cancelChan)
}
func main() {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ {
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelCh) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel1(cancelChan)
//cancel2(cancelChan)
time.Sleep(time.Second * 1)
}
关联任务的取消
根context通过context.Background()
创建
子context通过context.WithCancel(parentcontext)
创建,如:ctx, cancel := context.WithCancel(context.Background())
当前context被cancel()
取消时,基于它的子context都会被取消。
接收取消通知<-ctx.Done()
context
context就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
Deadline会返回一个超时时间,Goroutine获得了超时时间后,例如可以对某些io操作设定超时时间。
Done方法返回一个信道(channel),当Context被撤销或过期时,该信道是关闭的,即它是一个表示Context是否已关闭的信号。
当Done信道关闭后,Err方法表明Context被撤的原因。
Value可以让Goroutine共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个map。
示例
package main
import (
"context"
"fmt"
"time"
)
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine
可以访问共享资源。Go语言中使用sync
包的Mutex
类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥锁能够保证同一时间有且只有一个goroutine
进入临界区,其他的goroutine
则在等待锁;当互斥锁释放后,等待的goroutine
才可以获取锁进入临界区,多个goroutine
同时等待一个锁时,唤醒的策略是随机的。
读写互斥锁
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync
包中的RWMutex
类型。
读写锁分为两种:读锁和写锁。
当一个goroutine
获取读锁之后,其他的goroutine
如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
当一个goroutine
获取写锁之后,其他的goroutine
无论是获取读锁还是写锁都会等待。
读写锁示例:
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥锁
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
rwlock.Unlock() // 解写锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func read() {
// lock.Lock() // 加互斥锁
rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
rwlock.RUnlock() // 解读锁
// lock.Unlock() // 解互斥锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。