仅执行一次
场景
code
只执行一次,输出一个数字结果。
package mainimport ("fmt""math/rand""sync""time")func init() {rand.Seed(time.Now().UnixNano())}func main() {var once sync.Oncefor i := 0; i < 10; i++ {once.Do(func() {num := rand.Intn(10)fmt.Println(num)})}}
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
思考
假如加载配置是并发的,某个函数发现配置是空的nil,并发去加载配置,假如一个goroutine加载配置时出错,导致只加载了部分配置;其他goroutine发现配置不是空的,不去加载。最终结果是配置没加载完整。
单例模式
定义:单例对象的类必须保证只有一个实例存在,全局有唯一接口访问。
package singletonimport ("sync")type singleton struct {}var instance *singletonvar once sync.Oncefunc GetInstance() *singleton {once.Do(func() {instance = &singleton{}})return instance}
仅需任意任务完成
场景
这里所有任务都完成了,但是只用了最快的一个结果,所以是所有任务都完成了;
当有一个任务完成时,取消其他任务,因为任务都是有开销的。
code
package mainimport ("fmt""runtime""time")func runTask(id int) string {time.Sleep(10 * time.Millisecond)return fmt.Sprintf("The result is from %d", id)}func firstResponse() string {numOfRunner := 10// 使用带缓存的channel,让goroutines不会堵塞。ch := make(chan string, numOfRunner)for i := 0; i < numOfRunner; i++ {go func(i int) {ret := runTask(i)ch <- ret}(i)}// 任意一个返回,这个函数就返回了。return <-ch}func main() {fmt.Println("Before:", runtime.NumGoroutine())fmt.Println(firstResponse())time.Sleep(time.Second * 1)fmt.Println("After:", runtime.NumGoroutine())}
所有任务都完成
基于基于CSP实现
package mainimport ("fmt""sync")func main() {var mutex sync.Mutexmax := 10000ch := make(chan int, max)for i := 0; i < max; i++ {go func() {mutex.Lock()ch <- 1defer func() {mutex.Unlock()}()}()}counter := 0for i := 0; i < max; i++ {counter += <-ch}fmt.Println("counter:", counter)}
基于sync.WaitGroup
在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:
| 方法名 | 功能 |
|---|---|
| (wg *WaitGroup) Add(delta int) | 计数器+delta |
| (wg *WaitGroup) Done() | 计数器-1 |
| (wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
package mainimport ("fmt""sync")func main() {var mutex sync.Mutexvar wg sync.WaitGroupcounter := 0for i := 0; i < 10000; i++ {wg.Add(1) // 每启动一个协程都新增加一个等待go func() {mutex.Lock()defer func() {mutex.Unlock()wg.Done()}()counter++}(i)}wg.Wait()fmt.Println("counter:", counter)}
对象池
适合通过复用降低复杂对象的创建和GC的代价
协程安全,会有锁的开销
生命周期受GC影响,不适合做连接池等需要自己管理生命周期的资源的池化。
code
基于buffered channel实现对象池,取用完后放回channel。
package object_poolimport ("errors""fmt""testing""time")type ReusableObject struct {token int}type ObjectPool struct {bufChan chan *ReusableObject // 用于缓冲可重用对象}func NewObjectPool(numOfObject int) *ObjectPool {objectPool := ObjectPool{}objectPool.bufChan = make(chan *ReusableObject, numOfObject)for i := 0; i < numOfObject; i++ {objectPool.bufChan <- &ReusableObject{token: i,}}return &objectPool}func (pool *ObjectPool) GetObject(timeout time.Duration) (*ReusableObject, error) {select {case ret := <-pool.bufChan:return ret, nilcase <-time.After(timeout): //超时控制return nil, errors.New("time out")}}func (pool *ObjectPool) ReleaseObject(object *ReusableObject) error {select {case pool.bufChan <- object:return nildefault:return errors.New("overflow")}}func TestObjPool(t *testing.T) {pool := NewObjectPool(10)// 创建对象池后,对象池是满的if err := pool.ReleaseObject(&ReusableObject{}); err != nil { //尝试放置超出池大小的对象t.Error(err)}for i := 0; i < 11; i++ {if v, err := pool.GetObject(time.Second); err != nil {t.Error(err)} else {fmt.Printf("%T %d\n", v, v.token)// 使用后立即释放if err := pool.ReleaseObject(v); err != nil {t.Error(err)}}}fmt.Println("Done")}
sync.pool 对象生命周期
- gc会清除sync.pool缓存的对象
- 对象的有效期是下次gc前 —> gc 执行的时机是什么?
带来的思考
每次获取对象,可能会受锁的限制,所以是创建对象的开销大,还是锁带来的开销大需要根据实际情况权衡。
code
package mainimport ("fmt""runtime""sync")func SyncPool() {pool := &sync.Pool{// 当对象池为空时,调用get时会自动New创建一个新的对象,可以理解为默认对象New: func() interface{} {fmt.Println("Create a new object.")return 100},}v := pool.Get().(int)fmt.Println(v)pool.Put(3)runtime.GC() //GC 会清除sync.pool中缓存的对象v1, _ := pool.Get().(int)fmt.Println(v1)}func SyncPoolInMultiGoroutine() {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object.")return 10},}pool.Put(1)pool.Put(2)pool.Put(3)pool.Put(4)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {fmt.Println(pool.Get())wg.Done()}(i)}wg.Wait()}func main() {//SyncPool()SyncPoolInMultiGoroutine()}
多路选择和超时控制
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。
- 可处理一个或多个channel的发送/接收操作。
- 如果多个
case同时满足,select会随机选择一个。 - 对于没有
case的select{}会一直等待,可用于阻塞main函数。
// 多路选择器与超时package mainimport ("fmt""github.com/asmcos/requests""time")func main() {responses := make(chan string, 3)go func() {resp, _ := requests.Get("http://qq.com")responses <- resp.Text()}()go func() {resp, _ := requests.Get("http://sina.com")responses <- resp.Text()}()go func() {resp, _ := requests.Get("http://baidu.com")responses <- resp.Text()}()select {case res := <-responses:fmt.Println(res)case <-time.After(time.Millisecond * 5):fmt.Println("timeout 5ms")}}
任务取消
通过channel传递取消信号
package mainimport ("fmt""time")func isCancelled(cancelChan chan struct{}) bool {select {case <-cancelChan:return truedefault:return false}}//部分取消//向channel发送一个值,只有一个订阅者能取值func cancel1(cancelChan chan struct{}) {cancelChan <- struct{}{}}//全部取消//关闭channel,所有订阅者都能取到值(chan 的零值)func cancel2(cancelChan chan struct{}) {close(cancelChan)}func main() {cancelChan := make(chan struct{}, 0)for i := 0; i < 5; i++ {go func(i int, cancelCh chan struct{}) {for {if isCancelled(cancelCh) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, cancelChan)}cancel1(cancelChan)//cancel2(cancelChan)time.Sleep(time.Second * 1)}
关联任务的取消
根context通过context.Background()创建
子context通过context.WithCancel(parentcontext)创建,如:ctx, cancel := context.WithCancel(context.Background())
当前context被cancel()取消时,基于它的子context都会被取消。
接收取消通知<-ctx.Done()
context
context就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知
type Context interface {Deadline() (deadline time.Time, ok bool)Done() <-chan struct{}Err() errorValue(key interface{}) interface{}}
Deadline会返回一个超时时间,Goroutine获得了超时时间后,例如可以对某些io操作设定超时时间。
Done方法返回一个信道(channel),当Context被撤销或过期时,该信道是关闭的,即它是一个表示Context是否已关闭的信号。
当Done信道关闭后,Err方法表明Context被撤的原因。
Value可以让Goroutine共享一些数据,当然获得数据是协程安全的。但使用这些数据的时候要注意同步,比如返回了一个map。
示例
package mainimport ("context""fmt""time")func isCancelled(ctx context.Context) bool {select {case <-ctx.Done():return truedefault:return false}}func main() {ctx, cancel := context.WithCancel(context.Background())for i := 0; i < 5; i++ {go func(i int, ctx context.Context) {for {if isCancelled(ctx) {break}time.Sleep(time.Millisecond * 5)}fmt.Println(i, "Cancelled")}(i, ctx)}cancel()time.Sleep(time.Second * 1)}
互斥锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:
var x int64var wg sync.WaitGroupvar lock sync.Mutexfunc add() {for i := 0; i < 5000; i++ {lock.Lock() // 加锁x = x + 1lock.Unlock() // 解锁}wg.Done()}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)}
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。
读写互斥锁
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。
当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
读写锁示例:
package mainimport ("fmt""sync""time")var (x int64wg sync.WaitGrouplock sync.Mutexrwlock sync.RWMutex)func write() {// lock.Lock() // 加互斥锁rwlock.Lock() // 加写锁x = x + 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒rwlock.Unlock() // 解写锁// lock.Unlock() // 解互斥锁wg.Done()}func read() {// lock.Lock() // 加互斥锁rwlock.RLock() // 加读锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒rwlock.RUnlock() // 解读锁// lock.Unlock() // 解互斥锁wg.Done()}func main() {start := time.Now()for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 1000; i++ {wg.Add(1)go read()}wg.Wait()end := time.Now()fmt.Println(end.Sub(start))}
需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
