上一篇文章跟踪了ListenAccept接口的实现流程,本文将继续分析 epoll 在 runtime 层的运作,文中内容会集中在 runtime 层,若有不当之处请指出。

poll

runtime/netpoll.go是 poll 的抽象,它规范 poll 层和 runtime 层之间的交互接口。

poll_runtime_pollServerInit

  1. func poll_runtime_pollServerInit() {
  2. netpollinit()
  3. atomic.Store(&netpollInited, 1)
  4. }

poll 初始化,初始化网络轮询器。

poll_runtime_isPollServerDescriptor

判断给定的 fd 是否是当前 epoll 中使用的 fd。

poll_runtime_pollOpen

  1. func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  2. pd := pollcache.alloc()
  3. // ...
  4. var errno int32
  5. errno = netpollopen(fd, pd)
  6. return pd, int(errno)
  7. }

开启网络轮询器。pollcache.alloc()pollcache创建pollDescpollDescpollcache中以链表的方式存储。将pollDesc和 fd 绑定起来,netpollopen将在下面解释。

poll_runtime_pollClose

关闭某个连接,需当前连接无读写行为。

poll_runtime_pollReset

重置某个连接,即重置pollDesc

poll_runtime_pollWait

就地等待读信号或者写信号,该函数在前一篇文章详解过。

poll_runtime_pollSetDeadline

设置到期时间。网络请求过程中存在很高的不确定性,大部分情况我们需要有到期时间来标记某个操作已截止。

  1. func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
  2. // 并发访问加锁
  3. lock(&pd.lock)
  4. if pd.closing {
  5. unlock(&pd.lock)
  6. return
  7. }
  8. rd0, wd0 := pd.rd, pd.wd
  9. combo0 := rd0 > 0 && rd0 == wd0
  10. // 计算过期时间点
  11. if d > 0 {
  12. d += nanotime()
  13. if d <= 0 {
  14. d = 1<<63 - 1
  15. }
  16. }
  17. // 将过期时间根据mode存到rd和wd上
  18. if mode == 'r' || mode == 'r'+'w' {
  19. pd.rd = d
  20. }
  21. if mode == 'w' || mode == 'r'+'w' {
  22. pd.wd = d
  23. }
  24. combo := pd.rd > 0 && pd.rd == pd.wd
  25. // timer回调函数
  26. rtf := netpollReadDeadline
  27. if combo {
  28. rtf = netpollDeadline
  29. }
  30. // 读timer
  31. if pd.rt.f == nil {
  32. if pd.rd > 0 {
  33. pd.rt.f = rtf
  34. pd.rt.when = pd.rd
  35. // seq的作用就是在timer到期的时候,和原pollDesc.rseq比较,
  36. // 如果不同,则重用描述符或重置计时器
  37. pd.rt.arg = pd
  38. pd.rt.seq = pd.rseq
  39. addtimer(&pd.rt)
  40. }
  41. } else if pd.rd != rd0 || combo != combo0 {
  42. // 重置当前正在进行中的计时器
  43. pd.rseq++
  44. if pd.rd > 0 { // 修改计时器
  45. modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
  46. } else { // 删除计时器
  47. deltimer(&pd.rt)
  48. pd.rt.f = nil
  49. }
  50. }
  51. // 写计时器
  52. // ...
  53. // 获取正在进行IO操作的读goroutine地址或写goroutine地址
  54. var rg, wg *g
  55. if pd.rd < 0 || pd.wd < 0 {
  56. // 内存操作
  57. atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil)
  58. // 获取已被阻塞的goroutine地址
  59. if pd.rd < 0 {
  60. rg = netpollunblock(pd, 'r', false)
  61. }
  62. if pd.wd < 0 {
  63. wg = netpollunblock(pd, 'w', false)
  64. }
  65. }
  66. unlock(&pd.lock)
  67. // 唤醒对应的goroutine
  68. if rg != nil {
  69. netpollgoready(rg, 3)
  70. }
  71. if wg != nil {
  72. netpollgoready(wg, 3)
  73. }
  74. }

还有另外一个类似实现接口netpolldeadlineimpl,实际上大多数情况下都是调用netpollDeadlinenetpollReadDeadlinenetpollWriteDeadline完成。

netpollready

  1. func netpollready(toRun *gList, pd *pollDesc, mode int32) {
  2. var rg, wg *g
  3. if mode == 'r' || mode == 'r'+'w' {
  4. rg = netpollunblock(pd, 'r', true)
  5. }
  6. if mode == 'w' || mode == 'r'+'w' {
  7. wg = netpollunblock(pd, 'w', true)
  8. }
  9. if rg != nil {
  10. toRun.push(rg)
  11. }
  12. if wg != nil {
  13. toRun.push(wg)
  14. }
  15. }

netpollready是 epoll 上报事件的接口,通过 mode 取到当前读写 goroutine 地址将之推送到即将执行队列。

netpollunblock

  1. // ioready为false表示此次调用并非底层epoll事件上报
  2. func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
  3. gpp := &pd.rg
  4. if mode == 'w' {
  5. gpp = &pd.wg
  6. }
  7. for {
  8. // (1)
  9. old := *gpp
  10. if old == pdReady {
  11. return nil
  12. }
  13. // (2)
  14. if old == 0 && !ioready {
  15. // Only set READY for ioready. runtime_pollWait
  16. // will check for timeout/cancel before waiting.
  17. return nil
  18. }
  19. var new uintptr
  20. if ioready {
  21. new = pdReady
  22. }
  23. // (3)
  24. if atomic.Casuintptr(gpp, old, new) {
  25. if old == pdReady || old == pdWait {
  26. old = 0
  27. }
  28. return (*g)(unsafe.Pointer(old))
  29. }
  30. }
  31. }

netpollunblock尝试获取在netpollblock中被gopark的 goroutine,通过抽象数据结构g返回。
(1) old == pdReady即已唤醒,可以直接使用遂直接返回nil
(2) 初始化状态时候,当前既没 Ready 的 goroutine 也没有 Wait 的 goroutine 也直接返回nil
(3) 通过原子操作重置并拿到当前正在被gopark的 goroutine 地址,抽象数据结构g返回。

runtime-epoll

epoll 在 runtime 中的部分在runtime/netpoll_epoll.go文件中实现。上文中涉及到两个函数:netpollinitnetpollopen,实际上是调用到了 epoll 中。

netpollinit

  1. func netpollinit() {
  2. epfd = epollcreate1(_EPOLL_CLOEXEC)
  3. if epfd >= 0 {
  4. return
  5. }
  6. epfd = epollcreate(1024)
  7. if epfd >= 0 {
  8. closeonexec(epfd)
  9. return
  10. }
  11. println("runtime: epollcreate failed with", -epfd)
  12. throw("runtime: netpollinit failed")
  13. }

首先调用 epoll_create1 创建 epoll handle,若epoll_create1失败再调用epoll_create

netpollopen

  1. func netpollopen(fd uintptr, pd *pollDesc) int32 {
  2. var ev epollevent
  3. ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
  4. *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
  5. return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
  6. }

epoll 事件注册,注册这个 epoll 里关心的事件,并将 user data 设置为runtime.pollDesc,这也就是为什么 netpoll 系列函数均以pollDesc为参数。

netpollclose

  1. func netpollclose(fd uintptr) int32 {
  2. var ev epollevent
  3. return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
  4. }

从 epoll 中剔除某个不再关心的 fd,应用于主动关闭或超时关闭。

netpoll

netpoll中调用了 epoll 中第三个 API:epoll_wait

  1. func netpoll(block bool) gList {
  2. if epfd == -1 {
  3. return gList{}
  4. }
  5. waitms := int32(-1)
  6. if !block {
  7. waitms = 0
  8. }
  9. var events [128]epollevent
  10. retry:
  11. // (1)
  12. n := epollwait(epfd, &events[0], int32(len(events)), waitms)
  13. if n < 0 {
  14. if n != -_EINTR {
  15. println("runtime: epollwait on fd", epfd, "failed with", -n)
  16. throw("runtime: netpoll failed")
  17. }
  18. goto retry
  19. }
  20. // (2)
  21. var toRun gList
  22. for i := int32(0); i < n; i++ {
  23. ev := &events[i]
  24. if ev.events == 0 {
  25. continue
  26. }
  27. var mode int32
  28. // 通过netpollopen注册的epoll关心事件确定是否读写事件
  29. if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
  30. mode += 'r'
  31. }
  32. if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
  33. mode += 'w'
  34. }
  35. if mode != 0 {
  36. // 由netpollopen可知,此处的&ev.data是pollDesc
  37. pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
  38. pd.everr = false
  39. if ev.events == _EPOLLERR {
  40. pd.everr = true
  41. }
  42. // 唤醒goroutine
  43. netpollready(&toRun, pd, mode)
  44. }
  45. }
  46. if block && toRun.empty() {
  47. goto retry
  48. }
  49. // 返回可以执行事件的goroutine地址集合
  50. return toRun
  51. }

(1) 调用epoll_wait获取事件,当次最多获取 128 个 epoll 事件。
(2) 根据事件类型唤醒读写 goroutine。

从整个流程上来看,分别调用了 epoll 中的三个 API:epoll_createepoll_ctl以及epoll_wait,通过层级化的封装使用 epoll 完成 IO 多路复用。这里很多人可能会好奇,netpoll是在哪里调用的?

实际上netpoll是在runtime.proc.go被底层多处调用,以 Go1.13 为例,runtime.proc.go中有四处调用netpoll,分别是:
func startTheWorldWithSema(emitTraceEvent bool) int64
func findrunnable() (gp *g, inheritTime bool)
func pollWork() bool
func sysmon()
以上均涉及到底层轮询器和调度器。

从源代码角度看epoll在Go中的使用(二) - 知乎 - 图1

netpoll 调用

小结

通过前面的内容,我们清楚了 epoll 在 Go 中是如何封装的,对用户接口层简化了ListenAcceptReadWrite等接口,简单友好的接口给用户层的逻辑代码提供相当大的便利。

而从 net 包的整体实现来看,对于用户而言:net 的实现是基于 epoll 的 nonblock 模式的一些列 fd 操作。网络操作未 Ready 时切换 goroutine,Ready 后等待调度的 goroutine 加入运行队列,实现了网络操作既不阻塞又是同步执行,这也就是前一篇文章所说的 epoll+goroutine。
https://zhuanlan.zhihu.com/p/109559267