




当使用org.apache.dubbo.container.Main这种容器方式来使用 Dubbo 时,也可以通过配置dubbo.shutdown.hook为true来开启优雅停机。



3)当使用容器方式运行 Dubbo 时,在容器准备退出前,可进行一系列的资源释放和清理工。
例如使用 SpringContainer时,Dubbo 的ShutdownHook线程会执行ApplicationContext的stop和close方法,保证 Bean的生命周期完整。


dubbo之优雅停机原理 - 图1



  1. static {
  2. //AbstractConfig是ServiceConfig和ReferenceConfig,所以无论是提供者还是消费者,启动时都会执行以下代码
  3. LEGACY_PROPERTIES.put("dubbo.protocol.name", "dubbo.service.protocol");
  4. LEGACY_PROPERTIES.put("dubbo.protocol.host", "dubbo.service.server.host");
  5. LEGACY_PROPERTIES.put("dubbo.protocol.port", "dubbo.service.server.port");
  6. LEGACY_PROPERTIES.put("dubbo.protocol.threads", "dubbo.service.max.thread.pool.size");
  7. LEGACY_PROPERTIES.put("dubbo.consumer.timeout", "dubbo.service.invoke.timeout");
  8. LEGACY_PROPERTIES.put("dubbo.consumer.retries", "dubbo.service.max.retry.providers");
  9. LEGACY_PROPERTIES.put("dubbo.consumer.check", "dubbo.service.allow.no.provider");
  10. LEGACY_PROPERTIES.put("dubbo.service.url", "dubbo.service.address");
  11. //钩子,优雅关机必备
  12. DubboShutdownHook.getDubboShutdownHook().register();
  13. }


  1. public class DubboShutdownHook extends Thread {
  2. public void run() {
  3. if (logger.isInfoEnabled()) {
  4. logger.info("Run shutdown hook now.");
  5. }
  6. //应用关闭时,会执行钩子的run方法
  7. doDestroy();
  8. }
  9. //Destroy all the resources, including registries and protocols.
  10. public void doDestroy() {
  11. if (!destroyed.compareAndSet(false, true)) {
  12. return;
  13. }
  14. //1)删除应用本身在注册中心的服务,2)取消应用订阅的服务信息
  15. //详情参见第3、4目录源码分析
  16. AbstractRegistryFactory.destroyAll();
  17. //1)向所有消费端发送只读事件,告诉它们服务要关闭了,它们只能够从通道中读取未读完的数据
  18. //2)关闭
  19. //详情参见第5、7目录源码分析
  20. destroyProtocols();
  21. }
  22. //Destroy all the protocols
  23. private void destroyProtocols() {
  24. ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
  25. //loader.getLoadedExtensions()将会获取三个Protocol实例,分别是DubbboProtocol、InjvmProtocol、
  26. //RegistryProtocol,但这些实例都被包装了。
  27. for (String protocolName : loader.getLoadedExtensions()) {
  28. try {
  29. Protocol protocol = loader.getLoadedExtension(protocolName);
  30. if (protocol != null) {
  31. protocol.destroy();
  32. }
  33. } catch (Throwable t) {
  34. logger.warn(t.getMessage(), t);
  35. }
  36. }
  37. }
  38. }


  1. public static void destroyAll() {
  2. //优秀的中间件/框架在核心步骤都会打日志,启应用如果有问题,多看启动日志
  3. if (LOGGER.isInfoEnabled()) {
  4. LOGGER.info("Close all registries " + getRegistries());
  5. }
  6. //上锁
  7. LOCK.lock();
  8. try {
  9. //getRegistries()获取所有的注册实例。应用向多少个注册中心注册,getRegistries便会获取多少个注册实例
  10. //使用zookeeper的话,registry的实例就是ZookeeperRegistry
  11. for (Registry registry : getRegistries()) {
  12. try {
  13. registry.destroy();
  14. } catch (Throwable e) {
  15. LOGGER.error(e.getMessage(), e);
  16. }
  17. }
  18. REGISTRIES.clear();
  19. } finally {
  20. // Release the lock
  21. LOCK.unlock();
  22. }
  23. }

dubbo之优雅停机原理 - 图2

  1. public class ZookeeperRegistry extends FailbackRegistry {
  2. public void destroy() {
  3. //super指FailbackRegistry
  4. super.destroy();
  5. try {
  6. //zk客户端关闭
  7. zkClient.close();
  8. } catch (Exception e) {
  9. logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
  10. }
  11. }
  12. public void doUnregister(URL url) {
  13. try {
  14. //删除zk上注册的服务节点
  15. zkClient.delete(toUrlPath(url));
  16. } catch (Throwable e) {
  17. throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  18. }
  19. }
  20. public void doUnsubscribe(URL url, NotifyListener listener) {
  21. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  22. if (listeners != null) {
  23. ChildListener zkListener = listeners.get(listener);
  24. if (zkListener != null) {
  25. if (ANY_VALUE.equals(url.getServiceInterface())) {
  26. String root = toRootPath();
  27. zkClient.removeChildListener(root, zkListener);
  28. } else {
  29. for (String path : toCategoriesPath(url)) {
  30. zkClient.removeChildListener(path, zkListener);
  31. }
  32. }
  33. }
  34. }
  35. }
  36. }
  37. public abstract class FailbackRegistry extends AbstractRegistry {
  38. public void destroy() {
  39. super.destroy();
  40. retryTimer.stop();
  41. }
  42. public void unregister(URL url) {
  43. super.unregister(url);
  44. removeFailedRegistered(url);
  45. removeFailedUnregistered(url);
  46. try {
  47. // Sending a cancellation request to the server side
  48. doUnregister(url);
  49. } catch (Exception e) {
  50. Throwable t = e;
  51. // If the startup detection is opened, the Exception is thrown directly.
  52. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true)
  53. && !CONSUMER_PROTOCOL.equals(url.getProtocol());
  54. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  55. if (check || skipFailback) {
  56. if (skipFailback) {
  57. t = t.getCause();
  58. }
  59. throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  60. } else {
  61. logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  62. }
  63. // Record a failed registration request to a failed list, retry regularly
  64. addFailedUnregistered(url);
  65. }
  66. }
  67. public void unsubscribe(URL url, NotifyListener listener) {
  68. super.unsubscribe(url, listener);
  69. removeFailedSubscribed(url, listener);
  70. try {
  71. doUnsubscribe(url, listener);
  72. } catch (Exception e) {
  73. Throwable t = e;
  74. // If the startup detection is opened, the Exception is thrown directly.
  75. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true);
  76. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  77. if (check || skipFailback) {
  78. if (skipFailback) {
  79. t = t.getCause();
  80. }
  81. throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  82. } else {
  83. logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  84. }
  85. // Record a failed registration request to a failed list, retry regularly
  86. addFailedUnsubscribed(url, listener);
  87. }
  88. }
  89. }
  90. public abstract class AbstractRegistry implements Registry {
  91. ...
  92. public void destroy() {
  93. if (logger.isInfoEnabled()) {
  94. logger.info("Destroy registry:" + getUrl());
  95. }
  96. Set<URL> destroyRegistered = new HashSet<>(getRegistered());
  97. if (!destroyRegistered.isEmpty()) {
  98. //遍历注册的url信息
  99. for (URL url : new HashSet<>(getRegistered())) {
  100. if (url.getParameter(DYNAMIC_KEY, true)) {
  101. try {
  102. //取消在注册中心注册的服务,如zk,则是删除服务节点
  103. unregister(url);
  104. if (logger.isInfoEnabled()) {
  105. logger.info("Destroy unregister url " + url);
  106. }
  107. } catch (Throwable t) {
  108. logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
  109. }
  110. }
  111. }
  112. }
  113. Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
  114. if (!destroySubscribed.isEmpty()) {
  115. for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
  116. URL url = entry.getKey();
  117. for (NotifyListener listener : entry.getValue()) {
  118. try {
  119. //取消监听,本服务不在监听zk上的节点的变化
  120. unsubscribe(url, listener);
  121. if (logger.isInfoEnabled()) {
  122. logger.info("Destroy unsubscribe url " + url);
  123. }
  124. } catch (Throwable t) {
  125. logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
  126. }
  127. }
  128. }
  129. }
  130. }
  131. }

dubbo之优雅停机原理 - 图3

  1. public class ProtocolListenerWrapper implements Protocol {
  2. public void destroy() {
  3. //protocol指QosProtocolWrapper
  4. protocol.destroy();
  5. }
  6. }
  7. public class QosProtocolWrapper implements Protocol {
  8. public void destroy() {
  9. //protocol指QosProtocolWrapper
  10. protocol.destroy();
  11. stopServer();
  12. }
  13. }
  14. public class ProtocolFilterWrapper implements Protocol {
  15. public void destroy() {
  16. //protocol指DubboProtocol
  17. protocol.destroy();
  18. }
  19. }
  20. public class DubboProtocol extends AbstractProtocol {
  21. public void destroy() {
  22. for (String key : new ArrayList<>(serverMap.keySet())) {
  23. //server就是HeaderExchangeServer,就是远程发布服务时,DubboProtocol开启端口监听时启动的
  24. //服务,最终是调用NettyServer(使用netty通信协议的话)
  25. ExchangeServer server = serverMap.remove(key);
  26. if (server == null) {
  27. continue;
  28. }
  29. ...
  30. //调用HeaderExchangeServer的close方法
  31. server.close(ConfigurationUtils.getServerShutdownTimeout());
  32. ...
  33. }
  34. for (String key : new ArrayList<>(referenceClientMap.keySet())) {
  35. List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);
  36. if (CollectionUtils.isEmpty(clients)) {
  37. continue;
  38. }
  39. for (ReferenceCountExchangeClient client : clients) {
  40. closeReferenceCountExchangeClient(client);
  41. }
  42. }
  43. stubServiceMethodsMap.clear();
  44. //super指AbstractProtocol
  45. super.destroy();
  46. }
  47. }
  48. public abstract class AbstractProtocol implements Protocol {
  49. public void destroy() {
  50. //1、遍历移除invokers缓存中的invoker实例
  51. //2、遍历调用invoker实例的destroy方法,其实是空方法,目前没有实际处理逻辑
  52. for (Invoker<?> invoker : invokers) {
  53. if (invoker != null) {
  54. invokers.remove(invoker);
  55. try {
  56. if (logger.isInfoEnabled()) {
  57. logger.info("Destroy reference: " + invoker.getUrl());
  58. }
  59. invoker.destroy();
  60. } catch (Throwable t) {
  61. logger.warn(t.getMessage(), t);
  62. }
  63. }
  64. }
  65. //1、遍历移除AbstractProtocol.exporterMap缓存中的exporter实例
  66. //2、遍历移除DubboExporter.exporterMap缓存中的exporter实例
  67. for (String key : new ArrayList<String>(exporterMap.keySet())) {
  68. Exporter<?> exporter = exporterMap.remove(key);
  69. if (exporter != null) {
  70. try {
  71. if (logger.isInfoEnabled()) {
  72. logger.info("Unexport service: " + exporter.getInvoker().getUrl());
  73. }
  74. exporter.unexport();
  75. } catch (Throwable t) {
  76. logger.warn(t.getMessage(), t);
  77. }
  78. }
  79. }
  80. }
  81. }


  1. public class HeaderExchangeServer implements ExchangeServer {
  2. public void close(final int timeout) {
  3. //将关闭的标识置为true
  4. startClose();
  5. if (timeout > 0) {
  6. final long max = (long) timeout;
  7. final long start = System.currentTimeMillis();
  8. if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
  9. //向所有连接的通道发送只读事件,告诉消费者服务即将关闭,只能从通道读取为未读完的数据
  10. //??猜测:当消费者收到该事件时,将进行中的任务处理完后,将会关闭与该服务提供者的通道,断开连接
  11. sendChannelReadOnlyEvent();
  12. }
  13. //如果还有通道是连接状态,并且未超时,则sleep 10毫秒
  14. while (HeaderExchangeServer.this.isRunning() && System.currentTimeMillis() - start < max) {
  15. try {
  16. Thread.sleep(10);
  17. } catch (InterruptedException e) {
  18. logger.warn(e.getMessage(), e);
  19. }
  20. }
  21. }
  22. //这一步是调用closeTimerTask定时任务,但它的实例是null,根本不会执行。所以不明白这个功能。
  23. doClose();
  24. //server值NettyServer
  25. server.close(timeout);
  26. }
  27. }
  28. public class NettyServer extends AbstractServer implements Server {
  29. protected void doClose() throws Throwable {
  30. try {
  31. if (channel != null) {
  32. //关闭netty的boss组负责连接的通道。这个通道关闭之后,不会再有新的连接进来。
  33. channel.close();
  34. }
  35. } catch (Throwable e) {
  36. logger.warn(e.getMessage(), e);
  37. }
  38. try {
  39. //遍历关闭netty worker线程组的通道,worker组的通道通常负责读、写事件的处理
  40. Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
  41. if (channels != null && channels.size() > 0) {
  42. for (org.apache.dubbo.remoting.Channel channel : channels) {
  43. try {
  44. channel.close();
  45. } catch (Throwable e) {
  46. logger.warn(e.getMessage(), e);
  47. }
  48. }
  49. }
  50. } catch (Throwable e) {
  51. logger.warn(e.getMessage(), e);
  52. }
  53. try {
  54. //优雅关闭boss线程组、worker线程组
  55. if (bootstrap != null) {
  56. bossGroup.shutdownGracefully();
  57. workerGroup.shutdownGracefully();
  58. }
  59. } catch (Throwable e) {
  60. logger.warn(e.getMessage(), e);
  61. }
  62. try {
  63. if (channels != null) {
  64. channels.clear();
  65. }
  66. } catch (Throwable e) {
  67. logger.warn(e.getMessage(), e);
  68. }
  69. }
  70. }
  71. public abstract class AbstractServer extends AbstractEndpoint implements Server {
  72. public void close() {
  73. //在经过优雅关闭线程池后,立马关闭线程池,这种关闭比较粗暴。但通常在超时时间内(10s),
  74. //线程池的任务都能够处理完,所以也不会造成数据问题、程序异常问题。
  75. ExecutorUtil.shutdownNow(executor, 100);
  76. try {
  77. super.close();
  78. } catch (Throwable e) {
  79. logger.warn(e.getMessage(), e);
  80. }
  81. try {
  82. doClose();
  83. } catch (Throwable e) {
  84. logger.warn(e.getMessage(), e);
  85. }
  86. }
  87. public void close(int timeout) {
  88. //优雅关闭线程池,executor就是指dubbo线程池,是从DataStore中取出来的。
  89. //线程池的关闭,需要研究一下
  90. ExecutorUtil.gracefulShutdown(executor, timeout);
  91. close();
  92. }
  93. }

dubbo之优雅停机原理 - 图4

  1. InjvmProtocoldestroy方法继承的是AbstractProtocol方法,参见DubboProtocol中对
  2. AbstractProtocol源码的分析

dubbo之优雅停机原理 - 图5

  1. public class RegistryProtocol implements Protocol {
  2. public void destroy() {
  3. List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
  4. //1)遍历移除bounds中的exporter实例 2)取消服务注册 3)取消zk服务监听
  5. for (Exporter<?> exporter : exporters) {
  6. exporter.unexport();
  7. }
  8. //清除bounds缓存
  9. bounds.clear();
  10. DynamicConfiguration.getDynamicConfiguration()removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
  11. }
  12. }