许世伟的架构课
:::tips 对于下面的讨论, 一定要记住是在 讨论 进程内!
:::
锁机制
锁真的慢吗
经常看到这样的忠告
不要通过共享内存(锁)来通信, 要通过通信(channel)来共享内存
对于我这样的菜鸟来说, 以前也确实是这样想的, 这就体现了 知其然不知其所以然
实际上由于对 锁
机制和 channel
的误解比较深
锁真正的危害
对于进程内通讯的原语来说, 锁并不慢, 在这个条件下, 只有原子操作会比锁更快
channel
本身是共享变量, channel的操作也是必然有锁的
- 锁真正的危害在于不容易控制, 锁之后
忘记解锁或意外导致未解锁
才是锁的危害 - 锁中不要执行费时的操作
执行体
执行体的互斥
如果一组数据的并发访问,符合大部分情况下是读, 少量情况有写
这种读写操作那么应该用读写锁
读锁
mutex.RLock()
defer mutex.RUnlock()
doReadOnlyThings
写锁
mutex.Lock()
defer mutex.Unlock()
doWriteThings
写操作和普通锁一样, 整体来说是这样的
读锁阻止写操作
写锁阻止读和写操作
执行体的同步
信号量
一个常用的场景就是将一个任务分为多个小人务, 分配给n个执行体并行做
大概如下
func (wg *WaitGroup) Add(n int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
用法大概如下
var wg WaitGroup
...
wg.Add(n)
for 循环n次 {
go func() {
defer wg.Done()
doTaski // 执行第i个任务
}()
}
wg.Wait()
条件变量(Condtion Variable)
:::tips 变量: 一组要在多个执行体之间协同的数据
条件: 做任务前的前置条件, 和做任务时需要唤醒其他人的 唤醒条件
:::
一个更通用的同步原语, 我们用它来模拟一下 channel
的通讯机制
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
初始化, 初始化时需要传入一个互斥体, 可以是普通锁, 也可以是读写锁
var mutex sync.Mutex // 或者 sync.RWMutex
var cond = sync.NewCond(&mutex)
...
传入锁的原因: cond.Wait()
中需要
把自己加入到挂起队列
mutex.Unlock()
等待被唤醒 // 挂起的执行体会被后续的 cond.Broadcast 或 cond.Signal() 唤醒
mutex.Lock()
条件变量的使用方法大致如下
mutex.Lock()
defer mutex.Unlock()
for conditionNotMetToDo {
cond.Wait()
}
doSomething
if conditionNeedNotify {
cond.Broadcast()
// 有时可以优化为 cond.Signal()
}
- 加锁
- 用一个循环判断是否能
do
, 如果不行就调用cond.Wait()
进行等待- 这里使用
for
是因为cond.Wait()
获得权限后不一定就一定是可以继续do
了, 所以要再次判断
- 这里使用
doSomething
- 如果挂起队列中的部分执行体满足了重新执行的条件, 就用
cond.Broadcast
或cond.Signal
唤醒他们cond.Broadcast
会唤醒所有在这个条件变量挂起的执行体cond.Signal
只会唤醒一个- 挂起在这个条件变量上的执行体,他们的条件是一致的
- 本次
doSometing
完成后, 所释放的资源只能够一个执行体来做事情
实现简易Channel的机制(不是真正的channel实现机制)
package main
import (
"fmt"
"sync"
)
type Queue struct {
element []interface{}
}
//创建一个新队列
func NewQueue()*Queue{
return &Queue{}
}
//判断队列是否为空
func (s *Queue)IsEmpty()bool{
if len(s.element) == 0 {
return true
}else {
return false
}
}
//求队列的长度
func (s *Queue)Len()int{
return len(s.element)
}
//进队操作
func (s *Queue)Push(value interface{}) {
s.element = append(s.element, value)
}
//出队操作
func (s *Queue)Pop()bool{
if s.IsEmpty(){
return false
}else{
s.element = s.element[1:]
}
return true
}
//打印队列
func (s *Queue)Print(){
for i := 0;i <= s.Len()-1;i++{
fmt.Printf("%d ", s.element[i])
}
fmt.Printf("\n")
}
type Channel struct {
mutex sync.Mutex
cond *sync.Cond
queue *Queue
n int
}
func NewChannel(n int) *Channel{
if n < 1 {
panic("todo: support unbuffered channel")
}
c := new(Channel)
c.cond = sync.NewCond(&c.mutex)
c.queue = NewQueue()
c.n = n
return c
}
func (c *Channel) Push(v interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
for c.queue.Len() == c.n { // 等待队列不满
c.cond.Wait()
}
if c.queue.Len() == 0 {// 队列为空, 可能有人在等待数据, 通知它们
c.cond.Broadcast()
}
c.queue.Push(v)
}
func (c *Channel) Pop() (v interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
for c.queue.Len() == 0 { // 等待对列不为空
c.cond.Wait()
}
if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们
c.cond.Broadcast()
}
return c.queue.Pop()
}
func (c *Channel) TryPop() (v interface{}, ok bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
for c.queue.Len() == 0 { // 等待对列不为空
return
} // 队列空直接返回
if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们
c.cond.Broadcast()
}
return c.queue.Pop(), true
}
func (c *Channel) TryPush(v interface{})(ok bool) {
c.mutex.Lock()
defer c.mutex.Unlock()
for c.queue.Len() == c.n{ // 等待对列不为空
return
}// 对列满 直接返回
if c.queue.Len() == 0 {// 队列空, 可能有人在等待数据, 通知它们
c.cond.Broadcast()
}
c.queue.Push(v)
return true
}
func main() {
}
这个是有缓冲的channel
不支持 0
的情况
执行体的通讯
管道
大致如下
func Pipe() (pr *PipeReader , pw PipeWriter)
先调用 pr, pw:= io.Pipe()
得到管道写入端和读出端, 分别给两个并行的goroutine, 一个负责读 , 一个负责写
例:读写转换
现在我有一个算法
func Foo(w io.Writer) error
但是这个数据流的输入是 io.Reader
func Bar(r io.Reader)
使用pipe串联它们
func FooReader() io.ReadCloser{
pr , pw := io.Pipe()
go func(){
err := Foo(pw)
pw.CloseWithError(err)
}()
return pr
}