计时器

概述

准确的时间对于任何程序的运行都十分重要,但是在分布系统内很难保障时间一致,即使通过 NTP 协议,也只能把各个节点上的误差控制在毫秒级,所以准确的时间在分布式系统中显得极为重要。

go 的计时器发展:开始所有的计时器由唯一的四叉堆维护,后来全局使用 64 个四叉堆维护全部计时器,每个处理器(p)创建的计时器会由对应的四叉堆维护。最后至今,每个处理器单独管理计时器并通过网络轮训器触发。

设计原理

全局四叉堆

go 1.10 之前的计时器都是使用最小四叉堆实现,所有计时器都会存储在如下结构体中:

  1. var timers struct {
  2. lock mutex
  3. gp *g
  4. created bool
  5. sleeping bool
  6. rescheduling bool
  7. sleepUntil int64
  8. waitnote note
  9. t *[]timer
  10. }

其中的 t 就是最小四叉堆,运行时创建的所有计时器都会加入到四叉堆中,如图:image.png
运行时发生以下事件时唤醒计时器:

● 四叉堆中的计时器到期。

● 四叉堆中加入了触发时间更早的新计时器。

然而全局四叉堆公用的互斥锁对计时器的影响非常大,计时器的各种操作都需要获取全局唯一的互斥锁,这会严重影响计时器性能。

分片四叉堆

go 1.10 将全局的四叉堆分割为 64 个小的四叉堆,理想状况下,四叉堆的数量应该等于处理器的数量,但是这需要实现动态分配的过程,权衡之后使用 64 个小四叉堆,以牺牲内存为代价提升性能。

const timersLen = 64

var timers [timersLen]struct { 
    timersBucket
}

type timersBucket struct {
    lock            mutex
    gp                *g
    created            bool
    sleeping        bool
    rescheduling    bool
    sleepUntil        int64
    waitnote        note
    t                *[]timer
}

如果当前机器上的处理器 p 超过 64 个,多个处理器上的计时器就可能存储在一个桶中每个计时器桶都有一个 goroutine 处理。image.png
将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但其造成的处理器和线程之间的频繁切换成了影响计时器的首要因素。

网络轮询器

最新版本的计时器中移除了桶,所有计时器都以最小四叉堆的形式存储在处理器中:
image.png
处理器 runtime.p 中与计时器有关的字段:

type P struct {
    ...
    timersLock        mutex
    timers            []*timer
    numTimers        uint32
    adjustTimers    uint32
    deletedTimers    uint32
    ...
}
● timersLock 用于保护计时器的互斥锁。

● timers 存储计时器的最小四叉堆。

● numTimer 处理器中计时器的数量。

● adjustTimers 处理器中计时器的数量。

● deletedTimers 处理器处于 timerDeleted 状态的计时器数量。

目前计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性,减少上下文切换的开销,也是目前性能最佳的方式。

数据结构

runtime.timer 是计时器的内部表示,每一个计时器都存储在对应的处理器的最小四叉堆中,结构体:

type timer struct {
    pp puintptr
    when   int64
    period int64
    f      func(interface{}, uintptr)
    arg    interface{}
    seq    uintptr
    nextwhen int64
    status uint32
}

● pp 处理器指针。

● when 当前计时器被唤醒的时间。

● period 计时器两次被唤醒的时间间隔。

● f 每当计时器被唤醒时都会调用的函数。

● arg 计时器被唤醒时,函数 f 传入的参数。

● seq 计时器被唤醒时,函数 f 传入的参数,与 netpool 相关。

● nextwhen 计时器处于 timerModifiedXX 状态时,用于设置 when 字段。

● status 计时器的状态。

这里的 time 只是运行时的私有字段,对外暴露的是 time.Timer 结构体:

struct Timer struct {
    C         <-chan Time
    r         runtimeTimer
}

type Time struct {
    wall uint64
    ext  int64
    loc *Location
}

type runtimeTimer struct {
    pp       uintptr
    when     int64
    period   int64
    f        func(interface{}, uintptr) // NOTE: must not be closure
    arg      interface{}
    seq      uintptr
    nextwhen int64
    status   uint32
}

time.Timer 必须通过 time.NewTimertime.AfterFunc 或者 time.After 函数创建,当计时器失效时,订阅计时器 channel 的 goroutine 会受到失效时间。

状态机

运行时使用状态机的方式处理全部计时器,其中包括十种状态和几种操作,由于计时器需要同时支持增加,删除,修改,重置等操作,所以状态非常复杂。

触发计时器

概述

go 会在两种情况下触发计时器,运行计时器中保存的函数。

● 调度器调度时会检查处理器中的计时器是否准备就绪。

● 系统监控会检查是否有未执行的到期计时器。

channel

设计原理

不要通过共享内存的方式通信,而是应该通过通信的方式共享内存。go 语言提供了一种不同的并发模型——通信顺序进程(CSP),goroutine 和 channel 分别对应 CSP 中的实体和传递信息的媒介,goroutine 之间会通过 channel 传递信息。image.png

先进先出

channel 收发操作支持先进先出(FIFO)的设计,规则如下:

● 先从 channel 中读取数据的 goroutine 会先获取到数据。

● 先向 channel 中发送数据的 goroutine 有权先发送数据。

无锁 channel

无锁队列更准确的描述是使用乐观锁并发控制的队列,乐观锁并发控制本质上是基于验证的协议,使用原子指令 CAS 在多线程之间同步数据,无锁队列的实现也是依赖这一原子指令。

channel 在运行时内部表示是 runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,某种程度上来说,channel 是一个用于通信和同步的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题很常见,可以相对容易的实现有锁队列。

然而锁的休眠和唤醒会带来额外的上下文切换,如果临界区过小,加解锁导致的额外开销就会成为性能瓶颈。

数据结构

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

qcount channel 中元素的数量。

datasize channel 中循环队列的长度。

buf channel 缓存区数据指针。

sendx channel 发送操作处理到的位置。

recvxchannel 接收操作处理到的位置。

elemsizeelemtype 分别表示当前 channel 能够收发的元素类型和大小。

recvqsendq 存储了当前 channel 由于缓存区不足而阻塞的 goroutine 的列表,这些等待队列使用双向列表表示,列表中所有元素都是 runtime.sudog 结构,该结构表示一个在等待列表中的 goroutine,该结构存储了两个分别指向前后 runtime.sudog 的指针来构成链表。

创建 channel


go 中所有的 channel 创建都是使用 make 关键字,编译器将代码中的 make 语句转化为 OMAKE 节点,类型检查阶段将 OMAKE 类型的节点转化为 OMAKECHAN 类型。

OMAKECHAN 类型节点最终会在 SSA 中间代码生成阶段之前被转换成调用 runtime.makechan 或者 runtime.makechan64。

创建过程遵循以下几点:

● 如果当前 channel 不存在缓存区,只会为 runtime.hchan 分配一块内存空间。

● 如果当前 channel 中存储的类型不是指针类型,会为当前 channel 和底层数组分配一块内存空间。

● 默认情况下会单独为 runtime.hchan 和缓存区分配内存。

发送数据

概述

发送数据的执行大致分为三个阶段:

● 当存在等待的接收者时,直接通过 runtime.send 将数据发送个等待的接收者。

● 当缓存区中存在空余空间时,将发送的数据写入 channel 的缓存区。

● 当不存在缓存区或者缓存区已满时,等待其他的 goroutine 从 channel 中获取数据。

直接发送

如果目标 channel 没有被关闭,并且已经有处于等待的 goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 goroutine 并向其发送数据。

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

该函数运行分为两个部分:

● 调用 runtime.sendDirect 直接将发送的数据复制到 x = <-c 表达式中变量 x 所在的地址中。

● 运行 runtime.goready 将等待接收数据的 goroutine 标记为可运行状态 grunnable,并把该 goroutine 放到发送方所在的处理器的 runnext 上等待执行,

需要注意,发送数据的过程只是将接收方的 goroutine 放到了处理器的 runnext 中,程序没有立即执行该 goroutine。

缓冲区

如果创建的 channel 包含缓存区并且缓存区未满,首先会计算出下一个可以存储数据的位置,然后将发送的数据存储到缓存区,并增加 sendx 和 qcount 的索引。

如果当前 channel 的缓存区未满,向 channel 发送的数据会被存储到 channel 的 sendx 索引的位置,并将 sendx 加一,因为这里的 buf 是一个循环数组,所以当 sendx 等于 dateqsiz 时会重新回到数组开始的位置。

阻塞发送

if !block {
        unlock(&c.lock)
        return false
    }
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    return true
}

代码核心逻辑如下:

● 首先调用 gp := getg() 获取发送数据使用的 goroutine。

● 执行 acquireSudog 获取 runtime.sudog 结构体,设置此次阻塞发送的相关信息,例如发送的 channel,是否在 select 中,以及发送数据的内存地址等。

● 将刚刚创建并初始化的 sudog 加入到等待发送队列,设置到当前 goroutine 的 waiting 上,表示 goroutine 正在等待该 sudog 唤醒。

● 调用 runtime.goparkkunlock 令当前的 goroutine 陷入睡眠等待唤醒。

● 被调度器唤醒之后执行一些收尾工作,将一些属性设置为零,并释放 sudog 结构体。

● 函数最后返回 true 表示这次已经成功向 channel 中发送了数据。

接收数据

i := <-ch
i, ok := <-ch

接收分为以下几种情况:

● 当从一个空的 channel 中接收数据时,会调用 runtime.gopark 让出处理器的使用权。

● 如果当前 channel 关闭并且缓存区中不存在任何数据,清除 ep 指针中的数据并立刻返回。

● 存在等待的发送者时,通过 ruuntime.recv 从阻塞的发送者或是缓冲区中接受数据。

● 当缓冲区中存在数据时,从 channel 的缓冲区中接收数据。

● 当缓冲区不存在数据时,等待其他 goroutine 向 channel 发送数据。

小结

● channel 为空 调用 gopark 挂起当前 goroutine。

● channel 已经关闭并且缓存区中没有任何数据,直接返回。

● 如果 channel 的 sendq 队列中存在挂起的 goroutine 会将 recvx 索引所在的数据复制到接收变量所在的所在的内存空间中,并将 sendq 中的 channel 的数据复制到缓存区。

● 默认情况下回挂起当前 goroutine 将 sudog 结构加入 recvq 队列并陷入休眠等待调度器唤醒。

关闭 channel

首先判断当前的 channel 是否为空以及是否已经被关闭,如果存在以上情况,返回响应错误。如果没有问题,继续执行以下处理:

将 recvq 和 sendq 两个队列中的数据加入到 goroutine 列表 glist 中,函数 closechan 会清楚 runtime.sudog 上所有未被处理的元素。最后为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。