• 目的:
    • 维持provider和consumer之间的长连接
    • 实现:
    • dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
    • provider绑定和consumer连接的入口: ```java public class HeaderExchanger implements Exchanger {

      public static final String NAME = “header”;

      @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {

      1. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);

      }

      @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

      1. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

      }

    }

    1. - provider启动心跳检测:
    2. ```java
    3. public HeaderExchangeServer(Server server) {
    4. if (server == null) {
    5. throw new IllegalArgumentException("server == null");
    6. }
    7. this.server = server;
    8. this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    9. //心跳超时时间默认为心跳时间的3倍
    10. this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    11. //如果心跳超时时间小于心跳时间的两倍则抛异常
    12. if (heartbeatTimeout < heartbeat * 2) {
    13. throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    14. }
    15. startHeatbeatTimer();
    16. }
    • startHeatbeatTimer的实现
    • 先停止已有的定时任务,启动新的定时任务:

      1. private void startHeatbeatTimer() {
      2. // 停止原有定时任务
      3. stopHeartbeatTimer();
      4. // 发起新的定时任务
      5. if (heartbeat > 0) {
      6. heatbeatTimer = scheduled.scheduleWithFixedDelay(
      7. new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
      8. public Collection<Channel> getChannels() {
      9. return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
      10. }
      11. }, heartbeat, heartbeatTimeout),
      12. heartbeat, heartbeat, TimeUnit.MILLISECONDS);
      13. }
      14. }
    • HeartBeatTask的实现

    • 遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连

      1. public void run() {
      2. try {
      3. long now = System.currentTimeMillis();
      4. for (Channel channel : channelProvider.getChannels()) {
      5. if (channel.isClosed()) {
      6. continue;
      7. }
      8. try {
      9. Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
      10. Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
      11. // 读写的时间,任一超过心跳间隔,发送心跳
      12. if ((lastRead != null && now - lastRead > heartbeat)
      13. || (lastWrite != null && now - lastWrite > heartbeat)) {
      14. Request req = new Request();
      15. req.setVersion("2.0.0");
      16. req.setTwoWay(true); // 需要响应的心跳事件
      17. req.setEvent(Request.HEARTBEAT_EVENT);
      18. channel.send(req);
      19. if (logger.isDebugEnabled()) {
      20. logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
      21. + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
      22. }
      23. }
      24. // 最后读的时间,超过心跳超时时间
      25. if (lastRead != null && now - lastRead > heartbeatTimeout) {
      26. logger.warn("Close channel " + channel
      27. + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
      28. // 客户端侧,重新连接服务端
      29. if (channel instanceof Client) {
      30. try {
      31. ((Client) channel).reconnect();
      32. } catch (Exception e) {
      33. //do nothing
      34. }
      35. // 服务端侧,关闭客户端连接
      36. } else {
      37. channel.close();
      38. }
      39. }
      40. } catch (Throwable t) {
      41. logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
      42. }
      43. }
      44. } catch (Throwable t) {
      45. logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
      46. }
      47. }
    • consumer端的实现

    • 默认需要心跳检测
      1. public HeaderExchangeClient(Client client, boolean needHeartbeat) {
      2. if (client == null) {
      3. throw new IllegalArgumentException("client == null");
      4. }
      5. this.client = client;
      6. // 创建 HeaderExchangeChannel 对象
      7. this.channel = new HeaderExchangeChannel(client);
      8. // 读取心跳相关配置
      9. String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
      10. this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
      11. this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
      12. if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短
      13. throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
      14. }
      15. // 发起心跳定时器
      16. if (needHeartbeat) {
      17. startHeatbeatTimer();
      18. }
      19. }
      我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的。