许世伟的架构课
:::tips 对于下面的讨论, 一定要记住是在 讨论 进程内!
:::
锁机制
锁真的慢吗
经常看到这样的忠告
不要通过共享内存(锁)来通信, 要通过通信(channel)来共享内存
对于我这样的菜鸟来说, 以前也确实是这样想的, 这就体现了 知其然不知其所以然 实际上由于对 锁 机制和 channel的误解比较深
锁真正的危害
对于进程内通讯的原语来说, 锁并不慢, 在这个条件下, 只有原子操作会比锁更快
channel本身是共享变量, channel的操作也是必然有锁的
- 锁真正的危害在于不容易控制, 锁之后
忘记解锁或意外导致未解锁才是锁的危害 - 锁中不要执行费时的操作
执行体
执行体的互斥
如果一组数据的并发访问,符合大部分情况下是读, 少量情况有写这种读写操作那么应该用读写锁
读锁
mutex.RLock()defer mutex.RUnlock()doReadOnlyThings
写锁
mutex.Lock()defer mutex.Unlock()doWriteThings
写操作和普通锁一样, 整体来说是这样的
读锁阻止写操作
写锁阻止读和写操作
执行体的同步
信号量
一个常用的场景就是将一个任务分为多个小人务, 分配给n个执行体并行做
大概如下
func (wg *WaitGroup) Add(n int)func (wg *WaitGroup) Done()func (wg *WaitGroup) Wait()
用法大概如下
var wg WaitGroup...wg.Add(n)for 循环n次 {go func() {defer wg.Done()doTaski // 执行第i个任务}()}wg.Wait()
条件变量(Condtion Variable)
:::tips 变量: 一组要在多个执行体之间协同的数据
条件: 做任务前的前置条件, 和做任务时需要唤醒其他人的 唤醒条件
:::
一个更通用的同步原语, 我们用它来模拟一下 channel的通讯机制
func NewCond(l Locker) *Condfunc (c *Cond) Broadcast()func (c *Cond) Signal()func (c *Cond) Wait()
初始化, 初始化时需要传入一个互斥体, 可以是普通锁, 也可以是读写锁
var mutex sync.Mutex // 或者 sync.RWMutexvar cond = sync.NewCond(&mutex)...
传入锁的原因: cond.Wait()中需要
把自己加入到挂起队列mutex.Unlock()等待被唤醒 // 挂起的执行体会被后续的 cond.Broadcast 或 cond.Signal() 唤醒mutex.Lock()
条件变量的使用方法大致如下
mutex.Lock()defer mutex.Unlock()for conditionNotMetToDo {cond.Wait()}doSomethingif conditionNeedNotify {cond.Broadcast()// 有时可以优化为 cond.Signal()}
- 加锁
- 用一个循环判断是否能
do, 如果不行就调用cond.Wait()进行等待- 这里使用
for是因为cond.Wait()获得权限后不一定就一定是可以继续do了, 所以要再次判断
- 这里使用
doSomething- 如果挂起队列中的部分执行体满足了重新执行的条件, 就用
cond.Broadcast或cond.Signal唤醒他们cond.Broadcast会唤醒所有在这个条件变量挂起的执行体cond.Signal只会唤醒一个- 挂起在这个条件变量上的执行体,他们的条件是一致的
- 本次
doSometing完成后, 所释放的资源只能够一个执行体来做事情
实现简易Channel的机制(不是真正的channel实现机制)
package mainimport ("fmt""sync")type Queue struct {element []interface{}}//创建一个新队列func NewQueue()*Queue{return &Queue{}}//判断队列是否为空func (s *Queue)IsEmpty()bool{if len(s.element) == 0 {return true}else {return false}}//求队列的长度func (s *Queue)Len()int{return len(s.element)}//进队操作func (s *Queue)Push(value interface{}) {s.element = append(s.element, value)}//出队操作func (s *Queue)Pop()bool{if s.IsEmpty(){return false}else{s.element = s.element[1:]}return true}//打印队列func (s *Queue)Print(){for i := 0;i <= s.Len()-1;i++{fmt.Printf("%d ", s.element[i])}fmt.Printf("\n")}type Channel struct {mutex sync.Mutexcond *sync.Condqueue *Queuen int}func NewChannel(n int) *Channel{if n < 1 {panic("todo: support unbuffered channel")}c := new(Channel)c.cond = sync.NewCond(&c.mutex)c.queue = NewQueue()c.n = nreturn c}func (c *Channel) Push(v interface{}) {c.mutex.Lock()defer c.mutex.Unlock()for c.queue.Len() == c.n { // 等待队列不满c.cond.Wait()}if c.queue.Len() == 0 {// 队列为空, 可能有人在等待数据, 通知它们c.cond.Broadcast()}c.queue.Push(v)}func (c *Channel) Pop() (v interface{}) {c.mutex.Lock()defer c.mutex.Unlock()for c.queue.Len() == 0 { // 等待对列不为空c.cond.Wait()}if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们c.cond.Broadcast()}return c.queue.Pop()}func (c *Channel) TryPop() (v interface{}, ok bool) {c.mutex.Lock()defer c.mutex.Unlock()for c.queue.Len() == 0 { // 等待对列不为空return} // 队列空直接返回if c.queue.Len() == c.n{// 队列满, 可能有人在写数据, 通知它们c.cond.Broadcast()}return c.queue.Pop(), true}func (c *Channel) TryPush(v interface{})(ok bool) {c.mutex.Lock()defer c.mutex.Unlock()for c.queue.Len() == c.n{ // 等待对列不为空return}// 对列满 直接返回if c.queue.Len() == 0 {// 队列空, 可能有人在等待数据, 通知它们c.cond.Broadcast()}c.queue.Push(v)return true}func main() {}
这个是有缓冲的channel 不支持 0的情况
执行体的通讯
管道
大致如下
func Pipe() (pr *PipeReader , pw PipeWriter)
先调用 pr, pw:= io.Pipe()得到管道写入端和读出端, 分别给两个并行的goroutine, 一个负责读 , 一个负责写
例:读写转换
现在我有一个算法
func Foo(w io.Writer) error
但是这个数据流的输入是 io.Reader
func Bar(r io.Reader)
使用pipe串联它们
func FooReader() io.ReadCloser{pr , pw := io.Pipe()go func(){err := Foo(pw)pw.CloseWithError(err)}()return pr}
