- 目的:
- 维持provider和consumer之间的长连接
- 实现:
- dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
provider绑定和consumer连接的入口: ```java public class HeaderExchanger implements Exchanger {
public static final String NAME = “header”;
@Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
- provider启动心跳检测:```javapublic HeaderExchangeServer(Server server) {if (server == null) {throw new IllegalArgumentException("server == null");}this.server = server;this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//心跳超时时间默认为心跳时间的3倍this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//如果心跳超时时间小于心跳时间的两倍则抛异常if (heartbeatTimeout < heartbeat * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");}startHeatbeatTimer();}
- startHeatbeatTimer的实现
先停止已有的定时任务,启动新的定时任务:
private void startHeatbeatTimer() {// 停止原有定时任务stopHeartbeatTimer();// 发起新的定时任务if (heartbeat > 0) {heatbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() {public Collection<Channel> getChannels() {return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);}}
HeartBeatTask的实现
遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连
public void run() {try {long now = System.currentTimeMillis();for (Channel channel : channelProvider.getChannels()) {if (channel.isClosed()) {continue;}try {Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);// 读写的时间,任一超过心跳间隔,发送心跳if ((lastRead != null && now - lastRead > heartbeat)|| (lastWrite != null && now - lastWrite > heartbeat)) {Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true); // 需要响应的心跳事件req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);if (logger.isDebugEnabled()) {logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");}}// 最后读的时间,超过心跳超时时间if (lastRead != null && now - lastRead > heartbeatTimeout) {logger.warn("Close channel " + channel+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");// 客户端侧,重新连接服务端if (channel instanceof Client) {try {((Client) channel).reconnect();} catch (Exception e) {//do nothing}// 服务端侧,关闭客户端连接} else {channel.close();}}} catch (Throwable t) {logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);}}} catch (Throwable t) {logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);}}
consumer端的实现
- 默认需要心跳检测
我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的。public HeaderExchangeClient(Client client, boolean needHeartbeat) {if (client == null) {throw new IllegalArgumentException("client == null");}this.client = client;// 创建 HeaderExchangeChannel 对象this.channel = new HeaderExchangeChannel(client);// 读取心跳相关配置String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");}// 发起心跳定时器if (needHeartbeat) {startHeatbeatTimer();}}
