客户端,每次收到frame,就更新lastRead;

定时任务判断,每过x 秒,发送一次ping,如果发送ping后返回的传输超时,则调用t.close()

服务端:

keplive : 定时任务判断,每过x 秒,发送一次ping,如果发送ping后返回的传输超时,则调用t.close()

idle : 每次write,更新t.idle,如果 MaxConnectionIdle - time.Since(idle) <=0,则发送优雅关闭!

ageTimer:
ageTimer.C:到时,就关闭流

代码实现:

  1. // 针对一个http2连接
  2. func (t *http2Server) keepalive() {
  3. p := &ping{}
  4. // True iff a ping has been sent, and no data has been received since then.
  5. outstandingPing := false
  6. // Amount of time remaining before which we should receive an ACK for the
  7. // last sent ping.
  8. // 收到ping的剩余时间
  9. kpTimeoutLeft := time.Duration(0)
  10. // Records the last value of t.lastRead before we go block on the timer.
  11. // This is required to check for read activity since then.
  12. // 记录上一次read时间
  13. prevNano := time.Now().UnixNano()
  14. // Initialize the different timers to their default values.
  15. // 空闲定时器
  16. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  17. // 最长连接时间定时器
  18. ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  19. kpTimer := time.NewTimer(t.kp.Time)
  20. defer func() {
  21. // We need to drain the underlying channel in these timers after a call
  22. // to Stop(), only if we are interested in resetting them. Clearly we
  23. // are not interested in resetting them here.
  24. idleTimer.Stop()
  25. ageTimer.Stop()
  26. kpTimer.Stop()
  27. }()
  28. for {
  29. select {
  30. case <-idleTimer.C:
  31. t.mu.Lock()
  32. idle := t.idle
  33. if idle.IsZero() { // The connection is non-idle.
  34. t.mu.Unlock()
  35. idleTimer.Reset(t.kp.MaxConnectionIdle)
  36. continue
  37. }
  38. // 最大空闲时间
  39. val := t.kp.MaxConnectionIdle - time.Since(idle)
  40. t.mu.Unlock()
  41. if val <= 0 {
  42. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  43. // Gracefully close the connection.
  44. // 优雅关闭
  45. t.drain(http2.ErrCodeNo, []byte{})
  46. return
  47. }
  48. // 定时多久
  49. idleTimer.Reset(val)
  50. case <-ageTimer.C:
  51. t.drain(http2.ErrCodeNo, []byte{})
  52. ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  53. select {
  54. case <-ageTimer.C:
  55. // Close the connection after grace period.
  56. infof("transport: closing server transport due to maximum connection age.")
  57. t.Close()
  58. case <-t.done:
  59. }
  60. return
  61. case <-kpTimer.C:
  62. lastRead := atomic.LoadInt64(&t.lastRead)
  63. if lastRead > prevNano {
  64. // There has been read activity since the last time we were
  65. // here. Setup the timer to fire at kp.Time seconds from
  66. // lastRead time and continue.
  67. outstandingPing = false
  68. // 上一次读的 kp.time之后执行
  69. kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  70. prevNano = lastRead
  71. continue
  72. }
  73. // 第一次 false,然后服务端发送ping,然后置为true
  74. // kpTimeoutLeft 超时之后,才会<=0
  75. // 发完ping,然后超时未收到响应,调用t.close()
  76. if outstandingPing && kpTimeoutLeft <= 0 {
  77. infof("transport: closing server transport due to idleness.")
  78. // 服务端tcp关闭
  79. t.Close()
  80. return
  81. }
  82. if !outstandingPing {
  83. if channelz.IsOn() {
  84. atomic.AddInt64(&t.czData.kpCount, 1)
  85. }
  86. // ping
  87. t.controlBuf.put(p)
  88. kpTimeoutLeft = t.kp.Timeout
  89. outstandingPing = true
  90. }
  91. // The amount of time to sleep here is the minimum of kp.Time and
  92. // timeoutLeft. This will ensure that we wait only for kp.Time
  93. // before sending out the next ping (for cases where the ping is
  94. // acked).
  95. // 休眠时间,定时|剩余的检查时间
  96. sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  97. kpTimeoutLeft -= sleepDuration
  98. kpTimer.Reset(sleepDuration)
  99. case <-t.done:
  100. return
  101. }
  102. }
  103. }

客户端:


// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
    p := &ping{data: [8]byte{}}
    // True iff a ping has been sent, and no data has been received since then.
    outstandingPing := false
    // Amount of time remaining before which we should receive an ACK for the
    // last sent ping.
    timeoutLeft := time.Duration(0)
    // Records the last value of t.lastRead before we go block on the timer.
    // This is required to check for read activity since then.
    // 最后一次读的时间
    prevNano := time.Now().UnixNano()
    timer := time.NewTimer(t.kp.Time)
    for {
        select {
        case <-timer.C:
            lastRead := atomic.LoadInt64(&t.lastRead)
            // 正常情况,第二次会走这里
            if lastRead > prevNano {
                // There has been read activity since the last time we were here.
                outstandingPing = false
                // Next timer should fire at kp.Time seconds from lastRead time.
                timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
                prevNano = lastRead
                continue
            }
            // 超时,则关闭
            if outstandingPing && timeoutLeft <= 0 {
                t.Close()
                return
            }
            t.mu.Lock()
            if t.state == closing {
                // If the transport is closing, we should exit from the
                // keepalive goroutine here. If not, we could have a race
                // between the call to Signal() from Close() and the call to
                // Wait() here, whereby the keepalive goroutine ends up
                // blocking on the condition variable which will never be
                // signalled again.
                t.mu.Unlock()
                return
            }
            // 没有活跃流数量 并且关闭了允许无流情况发送ping
            if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
                // If a ping was sent out previously (because there were active
                // streams at that point) which wasn't acked and its timeout
                // hadn't fired, but we got here and are about to go dormant,
                // we should make sure that we unconditionally send a ping once
                // we awaken.
                outstandingPing = false
                t.kpDormant = true
                // 等待header帧发送
                t.kpDormancyCond.Wait()
            }
            t.kpDormant = false
            t.mu.Unlock()

            // We get here either because we were dormant and a new stream was
            // created which unblocked the Wait() call, or because the
            // keepalive timer expired. In both cases, we need to send a ping.
            if !outstandingPing {
                if channelz.IsOn() {
                    atomic.AddInt64(&t.czData.kpCount, 1)
                }
                t.controlBuf.put(p)
                timeoutLeft = t.kp.Timeout
                outstandingPing = true
            }
            // The amount of time to sleep here is the minimum of kp.Time and
            // timeoutLeft. This will ensure that we wait only for kp.Time
            // before sending out the next ping (for cases where the ping is
            // acked).
            // 检测ack返回的时间
            sleepDuration := minTime(t.kp.Time, timeoutLeft)
            timeoutLeft -= sleepDuration

            timer.Reset(sleepDuration)
        case <-t.ctx.Done():
            if !timer.Stop() {
                <-timer.C
            }
            return
        }
    }
}

使用方式:

客户端:

var kacp = keepalive.ClientParameters{
    Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
    Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
    PermitWithoutStream: true,             // send pings even without active streams
}

    conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))

服务端:

var kasp = keepalive.ServerParameters{
    // 空闲时间
    MaxConnectionIdle:     15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
    MaxConnectionAge:      30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY

    MaxConnectionAgeGrace: 5 * time.Second,  // Allow 5 seconds for pending RPCs to complete before forcibly closing connections


    Time:                  5 * time.Second,  // Ping the client if it is idle for 5 seconds to ensure the connection is still active
    Timeout:               1 * time.Second,  // Wait 1 second for the ping ack before assuming the connection is dead
}

    s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))