所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性,如果服务端一段时间内没有收到客户端信息则视客户端断开。

IdleStateHandler

在 Netty 中,为我们提供了一个 IdleStateHandler,该 Handler 是实现心跳机制的关键,我们来看一看其构造函数(主要做的是为属性赋值)

  1. public IdleStateHandler
  2. (int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
  3. this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
  4. }
  5. --------------------------------------
  6. public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
  7. this.writeListener = new ChannelFutureListener() {
  8. public void operationComplete(ChannelFuture future) throws Exception {
  9. IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
  10. IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
  11. }
  12. };
  13. this.firstReaderIdleEvent = true;
  14. this.firstWriterIdleEvent = true;
  15. this.firstAllIdleEvent = true;
  16. if (unit == null) {
  17. throw new NullPointerException("unit");
  18. } else {
  19. this.observeOutput = observeOutput;
  20. if (readerIdleTime <= 0L) {
  21. this.readerIdleTimeNanos = 0L;
  22. } else {
  23. //为readerIdleTimeNanos属性赋值为传进来的参数readerIdleTime
  24. this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
  25. }
  26. if (writerIdleTime <= 0L) {
  27. this.writerIdleTimeNanos = 0L;
  28. } else {
  29. //为writerIdleTimeNanos属性赋值为传进来的参数writerIdleTime
  30. this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
  31. }
  32. if (allIdleTime <= 0L) {
  33. this.allIdleTimeNanos = 0L;
  34. } else {
  35. //为allIdleTimeNanos属性赋值为传进来的参数allIdleTime
  36. this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
  37. }
  38. }
  39. }
  • readerIdleTime:读超时时间,即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件(主要用于检查服务端在该时间内有无收到数据)
  • writerIdleTime:写超时,即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件(主要用于检查客户端在该时间内有无向服务端发送数据)
  • allIdleTime:读/写超时,即当在指定的时间间隔内没有读或写操作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件

下面我们以一个简单的例子实现心跳检测机制

客户端

客户端定时向服务端发送数据

  1. public class HeartBeatClient {
  2. public static void main(String[] args) throws Exception {
  3. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
  7. .handler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ChannelPipeline pipeline = ch.pipeline();
  11. pipeline.addLast("decoder", new StringDecoder());
  12. pipeline.addLast("encoder", new StringEncoder());
  13. pipeline.addLast(new HeartBeatClientHandler());
  14. }
  15. });
  16. System.out.println("netty client start。。");
  17. Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
  18. String text = "Heartbeat Packet";
  19. Random random = new Random();
  20. //定时向服务端发送数据
  21. while (channel.isActive()) {
  22. int num = random.nextInt(8);
  23. Thread.sleep(num * 1000);
  24. channel.writeAndFlush(text);
  25. }
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. } finally {
  29. eventLoopGroup.shutdownGracefully();
  30. }
  31. }
  32. }
  33. -------------------------------------
  34. public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
  35. @Override
  36. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  37. System.out.println(" client received :" + msg);
  38. if (msg != null && msg.equals("idle close")) {
  39. System.out.println(" 服务端关闭连接,客户端也关闭");
  40. ctx.channel().closeFuture();
  41. }
  42. }
  43. }

服务端

添加两个Handler,一个是 dleStateHandler,设置对应的超时时间;一个是我们自己定义的Handler,处理超时逻辑

  1. public class HeartBeatServer {
  2. public static void main(String[] args) throws Exception {
  3. EventLoopGroup boss = new NioEventLoopGroup();
  4. EventLoopGroup worker = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap bootstrap = new ServerBootstrap();
  7. bootstrap.group(boss, worker)
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel ch) throws Exception {
  12. ChannelPipeline pipeline = ch.pipeline();
  13. pipeline.addLast("decoder", new StringDecoder());
  14. pipeline.addLast("encoder", new StringEncoder());
  15. //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
  16. //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
  17. //实现userEventTriggered方法处理对应事件
  18. pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
  19. pipeline.addLast(new HeartBeatServerHandler());
  20. }
  21. });
  22. System.out.println("netty server start。。");
  23. ChannelFuture future = bootstrap.bind(9000).sync();
  24. future.channel().closeFuture().sync();
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. } finally {
  28. worker.shutdownGracefully();
  29. boss.shutdownGracefully();
  30. }
  31. }
  32. }

HeartBeatServerHandler() ,重写 userEventTriggered() 实现超时逻辑

  1. public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
  2. int readIdleTimes = 0;
  3. @Override
  4. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  5. //获取超时事件
  6. IdleStateEvent event = (IdleStateEvent) evt;
  7. String eventType = null;
  8. switch (event.state()) {
  9. //如果是读超时事件
  10. case READER_IDLE:
  11. eventType = "读空闲";
  12. readIdleTimes++; // 读空闲的计数加1
  13. break;
  14. //如果是写超时事件
  15. case WRITER_IDLE:
  16. eventType = "写空闲";
  17. // 不处理
  18. break;
  19. case ALL_IDLE:
  20. eventType = "读写空闲";
  21. // 不处理
  22. break;
  23. }
  24. System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
  25. if (readIdleTimes > 3) {
  26. System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
  27. ctx.channel().writeAndFlush("idle close");
  28. ctx.channel().close();
  29. }
  30. }
  31. @Override
  32. protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
  33. System.out.println(" ====== > [server] message received : " + s);
  34. if ("Heartbeat Packet".equals(s)) {
  35. ctx.channel().writeAndFlush("ok");
  36. } else {
  37. System.out.println(" 其他信息处理 ... ");
  38. }
  39. }
  40. @Override
  41. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  42. System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
  43. }
  44. }

管道激活

当客户端连接服务端后,服务端会触发 Handler 中的 channelActive() 方法

  1. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  2. this.initialize(ctx);
  3. super.channelActive(ctx);
  4. }

我们来看一看这个initialize() 方法干了什么

  1. private void initialize(ChannelHandlerContext ctx) {
  2. switch(this.state) {
  3. case 1:
  4. case 2:
  5. return;
  6. default:
  7. this.state = 1;
  8. this.initOutputChanged(ctx);
  9. this.lastReadTime = this.lastWriteTime = this.ticksInNanos();
  10. //创建定时任务检查管道中是否有数据读取
  11. if (this.readerIdleTimeNanos > 0L) {
  12. this.readerIdleTimeout = this.schedule
  13. (ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  14. }
  15. //创建定时任务检查是否往管道中写入数据
  16. if (this.writerIdleTimeNanos > 0L) {
  17. this.writerIdleTimeout = this.schedule
  18. (ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);
  19. }
  20. if (this.allIdleTimeNanos > 0L) {
  21. this.allIdleTimeout = this.schedule
  22. (ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);
  23. }
  24. }
  25. }

可以看到,该方法就是创建一些定时任务放入到 scheduledTaskQueue 中,其作用无非就是定时检查客户端有没有定时往服务端发送心跳数据

  1. <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
  2. //如果当前线程是NioEventLoop所绑定的线程
  3. if (this.inEventLoop()) {
  4. //直接将定时任务放入到scheduledTaskQueue
  5. this.scheduledTaskQueue().add(task);
  6. //如果当前线程不是NioEventLoop所绑定的线程
  7. } else {
  8. this.execute(new Runnable() {
  9. //调用runAllTask()方法时会将该定时任务放入到scheduledTaskQueue()中
  10. public void run() {
  11. AbstractScheduledEventExecutor.this.scheduledTaskQueue().add(task);
  12. }
  13. });
  14. }
  15. return task;
  16. }

心跳检测

心跳检测主要就是靠上面创建的定时任务来实现的,我们来看一下该任务做了什么

  1. private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {
  2. protected void run(ChannelHandlerContext ctx) {
  3. //将nextDelay赋值为我们设置的定时时间
  4. long nextDelay = readerIdleTimeNanos;
  5. if (!reading) {
  6. //nextDelay = nextDelay - (当前时间 - 上次从channel读取数据的时间)
  7. //即当前距离上次从channel中获取数据的时间
  8. nextDelay -= ticksInNanos() - lastReadTime;
  9. }
  10. //如果 nextDelay<0,即在指定时间类没有收到数据
  11. if (nextDelay <= 0L) {
  12. //重新设置定时任务,进行下一次检查
  13. IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);
  14. boolean first = firstReaderIdleEvent;
  15. IdleStateHandler.this.firstReaderIdleEvent = false;
  16. try {
  17. //构造读超时事件
  18. IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);
  19. //针对超时时间执行对应的逻辑
  20. IdleStateHandler.this.channelIdle(ctx, event);
  21. } catch (Throwable var6) {
  22. ctx.fireExceptionCaught(var6);
  23. }
  24. //在指定时间类收到数据
  25. } else {
  26. //重新设置一个定时任务,等待下一次检查
  27. IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  28. }
  29. }
  30. }
  • 如果在指定时间内收到了数据,重新设置一个定时任务进行下一次检查
  • 如果在指定时间内没有收到数据,还是会设置同样的定时任务,同时会触发下一个 handler 的 userEventTriggered() 方法,即我们自己实现的超时逻辑
    1. protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    2. ctx.fireUserEventTriggered(evt);
    3. }

到这里应该知道 IdleStateHandler 的作用是什么了吧。不过有一点要注意的是,定时任务的实现是通过任务的嵌套调用实现的。而且在指定时间内收到消息后,下一次执行任务的时间并不是我们指定的时间,而是 指定时间 - 当前距离上次读取数据的时间,如果我们指定超时时间为 3s ,而 2s 前从channel中读取了数据,那么下次定时任务在1s 后执行

  1. IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);

为什么要这么设计呢?我们看下面的图就知道了
微信截图_20210816234810.png
当执行定时任务时发现上次读取数据是 2s 前,如果下次定时任务继续是 3s 后执行,但是这期间再没有发送数据,那么此次检验距离上次发送数据是 5s 后,显然是超时了。所以定时任务是以读取到数据的时刻往后 3s 执行