客户端,每次收到frame,就更新lastRead;
定时任务判断,每过x 秒,发送一次ping,如果发送ping后返回的传输超时,则调用t.close()
服务端:
keplive : 定时任务判断,每过x 秒,发送一次ping,如果发送ping后返回的传输超时,则调用t.close()
idle : 每次write,更新t.idle,如果 MaxConnectionIdle - time.Since(idle) <=0,则发送优雅关闭!
ageTimer:
ageTimer.C:到时,就关闭流
代码实现:
// 针对一个http2连接func (t *http2Server) keepalive() {p := &ping{}// True iff a ping has been sent, and no data has been received since then.outstandingPing := false// Amount of time remaining before which we should receive an ACK for the// last sent ping.// 收到ping的剩余时间kpTimeoutLeft := time.Duration(0)// Records the last value of t.lastRead before we go block on the timer.// This is required to check for read activity since then.// 记录上一次read时间prevNano := time.Now().UnixNano()// Initialize the different timers to their default values.// 空闲定时器idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)// 最长连接时间定时器ageTimer := time.NewTimer(t.kp.MaxConnectionAge)kpTimer := time.NewTimer(t.kp.Time)defer func() {// We need to drain the underlying channel in these timers after a call// to Stop(), only if we are interested in resetting them. Clearly we// are not interested in resetting them here.idleTimer.Stop()ageTimer.Stop()kpTimer.Stop()}()for {select {case <-idleTimer.C:t.mu.Lock()idle := t.idleif idle.IsZero() { // The connection is non-idle.t.mu.Unlock()idleTimer.Reset(t.kp.MaxConnectionIdle)continue}// 最大空闲时间val := t.kp.MaxConnectionIdle - time.Since(idle)t.mu.Unlock()if val <= 0 {// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.// Gracefully close the connection.// 优雅关闭t.drain(http2.ErrCodeNo, []byte{})return}// 定时多久idleTimer.Reset(val)case <-ageTimer.C:t.drain(http2.ErrCodeNo, []byte{})ageTimer.Reset(t.kp.MaxConnectionAgeGrace)select {case <-ageTimer.C:// Close the connection after grace period.infof("transport: closing server transport due to maximum connection age.")t.Close()case <-t.done:}returncase <-kpTimer.C:lastRead := atomic.LoadInt64(&t.lastRead)if lastRead > prevNano {// There has been read activity since the last time we were// here. Setup the timer to fire at kp.Time seconds from// lastRead time and continue.outstandingPing = false// 上一次读的 kp.time之后执行kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))prevNano = lastReadcontinue}// 第一次 false,然后服务端发送ping,然后置为true// kpTimeoutLeft 超时之后,才会<=0// 发完ping,然后超时未收到响应,调用t.close()if outstandingPing && kpTimeoutLeft <= 0 {infof("transport: closing server transport due to idleness.")// 服务端tcp关闭t.Close()return}if !outstandingPing {if channelz.IsOn() {atomic.AddInt64(&t.czData.kpCount, 1)}// pingt.controlBuf.put(p)kpTimeoutLeft = t.kp.TimeoutoutstandingPing = true}// The amount of time to sleep here is the minimum of kp.Time and// timeoutLeft. This will ensure that we wait only for kp.Time// before sending out the next ping (for cases where the ping is// acked).// 休眠时间,定时|剩余的检查时间sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)kpTimeoutLeft -= sleepDurationkpTimer.Reset(sleepDuration)case <-t.done:return}}}
客户端:
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
timeoutLeft := time.Duration(0)
// Records the last value of t.lastRead before we go block on the timer.
// This is required to check for read activity since then.
// 最后一次读的时间
prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
lastRead := atomic.LoadInt64(&t.lastRead)
// 正常情况,第二次会走这里
if lastRead > prevNano {
// There has been read activity since the last time we were here.
outstandingPing = false
// Next timer should fire at kp.Time seconds from lastRead time.
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
// 超时,则关闭
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
// keepalive goroutine here. If not, we could have a race
// between the call to Signal() from Close() and the call to
// Wait() here, whereby the keepalive goroutine ends up
// blocking on the condition variable which will never be
// signalled again.
t.mu.Unlock()
return
}
// 没有活跃流数量 并且关闭了允许无流情况发送ping
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If a ping was sent out previously (because there were active
// streams at that point) which wasn't acked and its timeout
// hadn't fired, but we got here and are about to go dormant,
// we should make sure that we unconditionally send a ping once
// we awaken.
outstandingPing = false
t.kpDormant = true
// 等待header帧发送
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
// 检测ack返回的时间
sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}
使用方式:
客户端:
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: time.Second, // wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}
conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp))
服务端:
var kasp = keepalive.ServerParameters{
// 空闲时间
MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY
MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
