一、背景
对于任何一个线上应用,如何在服务更新部署过程中保证客户端无感知是开发者必须要解决的问题,即从应用停止到重启恢复服务这个阶段不能影响正常的业务请求。理想条件下,在没有请求的时候再进行更新是最安全可靠的。然而互联网应用必须要保证可用性,因此在技术层面上优化应用更新流程来保证服务在更新时无损是必要的。
传统的解决方式是通过将应用更新流程划分为手工摘流量、停应用、更新重启三个步骤,由人工操作实现客户端对更新无感知。这种方式简单而高效,但是限制很多:不仅需要使用借助网关的支持来摘流量,还需要在停应用前人工判断来保证在途请求已经处理完毕。这种需要人工介入的方式运维复杂度较高,只能适用规模较小的应用,无法在大规模系统上使用。
因此,如果能在容器/框架级别提供某种自动化机制,来自动进行摘流量并确保处理完已经到达的请求,不仅能保证业务不受更新影响,还可以极大的提升更新应用时的运维效率。
这个机制也就是优雅停机,目前tomcat、undertow/dubbo等容器/框架都有提供相关实现。下面给出一些正式的定义:优雅停机是指在停止应用时,执行的一系列保证应用正常关闭的操作。这些操作往往包括等待已有请求执行完成、关闭线程、关闭连接和释放资源等。优雅停机可以避免非正常关闭程序可能造成的数据异常或丢失、应用异常等问题。优雅停机本质上是JVM即将关闭前执行的一些额外的处理代码。
二、适用场景
1)JVM主动关闭(System.exit(int))
2)JVM由于资源问题退出(OOM)
3)应用程序接收到SIGTERM或SIGINT信号
三、配置方式
1)服务的优雅停机
在dubbo中优雅停机是默认开启的,停机等待时间是10ms。可以通过配置dubbo.service.shutdown.wait来修改等待时间
2)容器的优雅停机
当使用org.apache.dubbo.container.Main这种容器方式来使用 Dubbo 时,也可以通过配置dubbo.shutdown.hook为true来开启优雅停机。
四、流程
Provider在接收到停机指令后:
1)从注册中心上注销所有服务;
2)从配置中心取消监听动态配置;
3)向所有连接的客户端发送只读事件,停止接收新请求;
4)等待一段时间以处理已到达的请求,然后关闭请求处理线程池;
5)断开所有客户端连接。
Consumer在接收到停机指令后:
1)拒绝新到请求,直接返回调用异常;
2)等待当前已发送请求执行完毕,如果响应超时则强制关闭连接。
3)当使用容器方式运行 Dubbo 时,在容器准备退出前,可进行一系列的资源释放和清理工。
例如使用 SpringContainer时,Dubbo 的ShutdownHook线程会执行ApplicationContext的stop和close方法,保证 Bean的生命周期完整。
五、时序图
六、源码分析
dubbo的优雅停机是通过JVM的ShutdownHook钩子函数完成的。入口在AbstractConfig类中。
1、AbstractConfig
static {//AbstractConfig是ServiceConfig和ReferenceConfig,所以无论是提供者还是消费者,启动时都会执行以下代码LEGACY_PROPERTIES.put("dubbo.protocol.name", "dubbo.service.protocol");LEGACY_PROPERTIES.put("dubbo.protocol.host", "dubbo.service.server.host");LEGACY_PROPERTIES.put("dubbo.protocol.port", "dubbo.service.server.port");LEGACY_PROPERTIES.put("dubbo.protocol.threads", "dubbo.service.max.thread.pool.size");LEGACY_PROPERTIES.put("dubbo.consumer.timeout", "dubbo.service.invoke.timeout");LEGACY_PROPERTIES.put("dubbo.consumer.retries", "dubbo.service.max.retry.providers");LEGACY_PROPERTIES.put("dubbo.consumer.check", "dubbo.service.allow.no.provider");LEGACY_PROPERTIES.put("dubbo.service.url", "dubbo.service.address");//钩子,优雅关机必备DubboShutdownHook.getDubboShutdownHook().register();}
2、DubboShutdownHook
public class DubboShutdownHook extends Thread {public void run() {if (logger.isInfoEnabled()) {logger.info("Run shutdown hook now.");}//应用关闭时,会执行钩子的run方法doDestroy();}//Destroy all the resources, including registries and protocols.public void doDestroy() {if (!destroyed.compareAndSet(false, true)) {return;}//1)删除应用本身在注册中心的服务,2)取消应用订阅的服务信息//详情参见第3、4目录源码分析AbstractRegistryFactory.destroyAll();//1)向所有消费端发送只读事件,告诉它们服务要关闭了,它们只能够从通道中读取未读完的数据//2)关闭//详情参见第5、7目录源码分析destroyProtocols();}//Destroy all the protocolsprivate void destroyProtocols() {ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);//loader.getLoadedExtensions()将会获取三个Protocol实例,分别是DubbboProtocol、InjvmProtocol、//RegistryProtocol,但这些实例都被包装了。for (String protocolName : loader.getLoadedExtensions()) {try {Protocol protocol = loader.getLoadedExtension(protocolName);if (protocol != null) {protocol.destroy();}} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}}
3、AbstractRegistryFactory
public static void destroyAll() {//优秀的中间件/框架在核心步骤都会打日志,启应用如果有问题,多看启动日志if (LOGGER.isInfoEnabled()) {LOGGER.info("Close all registries " + getRegistries());}//上锁LOCK.lock();try {//getRegistries()获取所有的注册实例。应用向多少个注册中心注册,getRegistries便会获取多少个注册实例//使用zookeeper的话,registry的实例就是ZookeeperRegistryfor (Registry registry : getRegistries()) {try {registry.destroy();} catch (Throwable e) {LOGGER.error(e.getMessage(), e);}}REGISTRIES.clear();} finally {// Release the lockLOCK.unlock();}}
4、ZookeeperRegistry
看一下类关系图:
public class ZookeeperRegistry extends FailbackRegistry {public void destroy() {//super指FailbackRegistrysuper.destroy();try {//zk客户端关闭zkClient.close();} catch (Exception e) {logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);}}public void doUnregister(URL url) {try {//删除zk上注册的服务节点zkClient.delete(toUrlPath(url));} catch (Throwable e) {throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}}public void doUnsubscribe(URL url, NotifyListener listener) {ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);if (listeners != null) {ChildListener zkListener = listeners.get(listener);if (zkListener != null) {if (ANY_VALUE.equals(url.getServiceInterface())) {String root = toRootPath();zkClient.removeChildListener(root, zkListener);} else {for (String path : toCategoriesPath(url)) {zkClient.removeChildListener(path, zkListener);}}}}}}public abstract class FailbackRegistry extends AbstractRegistry {public void destroy() {super.destroy();retryTimer.stop();}public void unregister(URL url) {super.unregister(url);removeFailedRegistered(url);removeFailedUnregistered(url);try {// Sending a cancellation request to the server sidedoUnregister(url);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true)&& !CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyaddFailedUnregistered(url);}}public void unsubscribe(URL url, NotifyListener listener) {super.unsubscribe(url, listener);removeFailedSubscribed(url, listener);try {doUnsubscribe(url, listener);} catch (Exception e) {Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true);boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);} else {logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyaddFailedUnsubscribed(url, listener);}}}public abstract class AbstractRegistry implements Registry {...public void destroy() {if (logger.isInfoEnabled()) {logger.info("Destroy registry:" + getUrl());}Set<URL> destroyRegistered = new HashSet<>(getRegistered());if (!destroyRegistered.isEmpty()) {//遍历注册的url信息for (URL url : new HashSet<>(getRegistered())) {if (url.getParameter(DYNAMIC_KEY, true)) {try {//取消在注册中心注册的服务,如zk,则是删除服务节点unregister(url);if (logger.isInfoEnabled()) {logger.info("Destroy unregister url " + url);}} catch (Throwable t) {logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);}}}}Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());if (!destroySubscribed.isEmpty()) {for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {URL url = entry.getKey();for (NotifyListener listener : entry.getValue()) {try {//取消监听,本服务不在监听zk上的节点的变化unsubscribe(url, listener);if (logger.isInfoEnabled()) {logger.info("Destroy unsubscribe url " + url);}} catch (Throwable t) {logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);}}}}}}
5、DubboProtocol实例:
调用链如下,根据”dubbo”获取到的实际上是包装类,层层包装,层层调用
public class ProtocolListenerWrapper implements Protocol {public void destroy() {//protocol指QosProtocolWrapperprotocol.destroy();}}public class QosProtocolWrapper implements Protocol {public void destroy() {//protocol指QosProtocolWrapperprotocol.destroy();stopServer();}}public class ProtocolFilterWrapper implements Protocol {public void destroy() {//protocol指DubboProtocolprotocol.destroy();}}public class DubboProtocol extends AbstractProtocol {public void destroy() {for (String key : new ArrayList<>(serverMap.keySet())) {//server就是HeaderExchangeServer,就是远程发布服务时,DubboProtocol开启端口监听时启动的//服务,最终是调用NettyServer(使用netty通信协议的话)ExchangeServer server = serverMap.remove(key);if (server == null) {continue;}...//调用HeaderExchangeServer的close方法server.close(ConfigurationUtils.getServerShutdownTimeout());...}for (String key : new ArrayList<>(referenceClientMap.keySet())) {List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);if (CollectionUtils.isEmpty(clients)) {continue;}for (ReferenceCountExchangeClient client : clients) {closeReferenceCountExchangeClient(client);}}stubServiceMethodsMap.clear();//super指AbstractProtocolsuper.destroy();}}public abstract class AbstractProtocol implements Protocol {public void destroy() {//1、遍历移除invokers缓存中的invoker实例//2、遍历调用invoker实例的destroy方法,其实是空方法,目前没有实际处理逻辑for (Invoker<?> invoker : invokers) {if (invoker != null) {invokers.remove(invoker);try {if (logger.isInfoEnabled()) {logger.info("Destroy reference: " + invoker.getUrl());}invoker.destroy();} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}//1、遍历移除AbstractProtocol.exporterMap缓存中的exporter实例//2、遍历移除DubboExporter.exporterMap缓存中的exporter实例for (String key : new ArrayList<String>(exporterMap.keySet())) {Exporter<?> exporter = exporterMap.remove(key);if (exporter != null) {try {if (logger.isInfoEnabled()) {logger.info("Unexport service: " + exporter.getInvoker().getUrl());}exporter.unexport();} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}}}
6、HeaderExchangeServer:
public class HeaderExchangeServer implements ExchangeServer {public void close(final int timeout) {//将关闭的标识置为truestartClose();if (timeout > 0) {final long max = (long) timeout;final long start = System.currentTimeMillis();if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {//向所有连接的通道发送只读事件,告诉消费者服务即将关闭,只能从通道读取为未读完的数据//??猜测:当消费者收到该事件时,将进行中的任务处理完后,将会关闭与该服务提供者的通道,断开连接sendChannelReadOnlyEvent();}//如果还有通道是连接状态,并且未超时,则sleep 10毫秒while (HeaderExchangeServer.this.isRunning() && System.currentTimeMillis() - start < max) {try {Thread.sleep(10);} catch (InterruptedException e) {logger.warn(e.getMessage(), e);}}}//这一步是调用closeTimerTask定时任务,但它的实例是null,根本不会执行。所以不明白这个功能。doClose();//server值NettyServerserver.close(timeout);}}public class NettyServer extends AbstractServer implements Server {protected void doClose() throws Throwable {try {if (channel != null) {//关闭netty的boss组负责连接的通道。这个通道关闭之后,不会再有新的连接进来。channel.close();}} catch (Throwable e) {logger.warn(e.getMessage(), e);}try {//遍历关闭netty worker线程组的通道,worker组的通道通常负责读、写事件的处理Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();if (channels != null && channels.size() > 0) {for (org.apache.dubbo.remoting.Channel channel : channels) {try {channel.close();} catch (Throwable e) {logger.warn(e.getMessage(), e);}}}} catch (Throwable e) {logger.warn(e.getMessage(), e);}try {//优雅关闭boss线程组、worker线程组if (bootstrap != null) {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} catch (Throwable e) {logger.warn(e.getMessage(), e);}try {if (channels != null) {channels.clear();}} catch (Throwable e) {logger.warn(e.getMessage(), e);}}}public abstract class AbstractServer extends AbstractEndpoint implements Server {public void close() {//在经过优雅关闭线程池后,立马关闭线程池,这种关闭比较粗暴。但通常在超时时间内(10s),//线程池的任务都能够处理完,所以也不会造成数据问题、程序异常问题。ExecutorUtil.shutdownNow(executor, 100);try {super.close();} catch (Throwable e) {logger.warn(e.getMessage(), e);}try {doClose();} catch (Throwable e) {logger.warn(e.getMessage(), e);}}public void close(int timeout) {//优雅关闭线程池,executor就是指dubbo线程池,是从DataStore中取出来的。//线程池的关闭,需要研究一下ExecutorUtil.gracefulShutdown(executor, timeout);close();}}
7、InjvmProtocol实例:
调用链如下,根据”injvm”获取到的实际上是包装类,层层包装,层层调用
InjvmProtocol的destroy方法继承的是AbstractProtocol方法,参见DubboProtocol中对AbstractProtocol源码的分析
8、RegistryProtocol实例:
调用链如下,根据”injvm”获取到的实际上是包装类,层层包装,层层调用
public class RegistryProtocol implements Protocol {public void destroy() {List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());//1)遍历移除bounds中的exporter实例 2)取消服务注册 3)取消zk服务监听for (Exporter<?> exporter : exporters) {exporter.unexport();}//清除bounds缓存bounds.clear();DynamicConfiguration.getDynamicConfiguration()removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);}}
