注册中心在 Dubbo 中的作用

服务治理框架可以大致分为 服务通信 和 服务管理 两部分,服务管理可以分为服务注册、服务订阅以及服务发现,服务提供者 Provider 会往注册中心注册服务,而消费者 Consumer 会从注册中心中订阅自己关注的服务,并在关注的服务发生变更时 得到注册中心的通知。Provider、Consumer 以及 Registry 之间的依赖关系 如下图所示。

avatar

dubbo-registry 模块 结构分析

dubbo 的注册中心有多种实现方案,如:zookeeper、redis、multicast 等,本章先看一下 dubbo-registry 模块的核心部分 dubbo-registry-api,具体实现部分放到下章来讲。dubbo-registry 模块 的结构如下图所示。

avatar

Registry 核心组件类图

典型的 接口 -> 抽象类 -> 实现类 的结构设计,如下图所示。

avatar

既然有 Registry 组件,那么按照很多框架的套路,肯定也有一个用于获取 Registry 实例的 RegistryFactory,其中用到了工厂方法模式,不同的工厂类用于获取不同类型的实例。其类图结构如下。

avatar

源码详解

根据上面的类图,我们开始从上往下 详解 dubbo 中对于注册中心的设计以及实现。

RegistryService 接口

RegistryService 是注册中心模块的服务接口,定义了注册、取消注册、订阅、取消订阅以及查询符合条件的已注册数据 等方法。这里统一说明一下 URL,dubbo 是以总线模式来时刻传递和保存配置信息的,配置信息都被放在 URL 上进行传递,随时可以取得相关配置信息,而这里提到了 URL 有别的作用,就是作为类似于节点的作用,首先服务提供者(Provider)启动时需要提供服务,就会向注册中心写下自己的 URL 地址。然后消费者启动时需要去订阅该服务,则会订阅 Provider 注册的地址,并且消费者也会写下自己的 URL。

  1. /**
  2. * RegistryService. (SPI, Prototype, ThreadSafe)
  3. *
  4. * 注册中心服务接口
  5. */
  6. public interface RegistryService {
  7. /**
  8. * 注册数据,比如:提供者地址,消费者地址,路由规则,覆盖规则 等数据。
  9. * <p>
  10. * 注册需处理契约:<br>
  11. * 1. 当URL设置了check=false时,注册失败后不报错,在后台定时重试,否则抛出异常。<br>
  12. * 2. 当URL设置了dynamic=false参数,则需持久存储,否则,当注册者出现断电等情况异常退出时,需自动删除。<br>
  13. * 3. 当URL设置了category=routers时,表示分类存储,缺省类别为providers,可按分类部分通知数据。<br>
  14. * 4. 当注册中心重启,网络抖动,不能丢失数据,包括断线自动删除数据。<br>
  15. * 5. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
  16. *
  17. * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
  18. */
  19. void register(URL url);
  20. /**
  21. * 取消注册.
  22. * <p>
  23. * 取消注册需处理契约:<br>
  24. * 1. 如果是dynamic=false的持久存储数据,找不到注册数据,则抛IllegalStateException,否则忽略。<br>
  25. * 2. 按全URL匹配取消注册。<br>
  26. *
  27. * @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
  28. */
  29. void unregister(URL url);
  30. /**
  31. * 订阅符合条件的已注册数据,当有注册数据变更时自动推送.
  32. * <p>
  33. * 订阅需处理契约:<br>
  34. * 1. 当URL设置了check=false时,订阅失败后不报错,在后台定时重试。<br>
  35. * 2. 当URL设置了category=routers,只通知指定分类的数据,多个分类用逗号分隔,并允许星号通配,表示订阅所有分类数据。<br>
  36. * 3. 允许以interface,group,version,classifier作为条件查询,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
  37. * 4. 并且查询条件允许星号通配,订阅所有接口的所有分组的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
  38. * 5. 当注册中心重启,网络抖动,需自动恢复订阅请求。<br>
  39. * 6. 允许URI相同但参数不同的URL并存,不能覆盖。<br>
  40. * 7. 必须阻塞订阅过程,等第一次通知完后再返回。<br>
  41. *
  42. * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
  43. * @param listener 变更事件监听器,不允许为空
  44. */
  45. void subscribe(URL url, NotifyListener listener);
  46. /**
  47. * 取消订阅.
  48. * <p>
  49. * 取消订阅需处理契约:<br>
  50. * 1. 如果没有订阅,直接忽略。<br>
  51. * 2. 按全URL匹配取消订阅。<br>
  52. *
  53. * @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
  54. * @param listener 变更事件监听器,不允许为空
  55. */
  56. void unsubscribe(URL url, NotifyListener listener);
  57. /**
  58. * 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
  59. *
  60. * @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
  61. * @return 已注册信息列表,可能为空,含义同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
  62. * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
  63. */
  64. List<URL> lookup(URL url);
  65. }

Registry 接口

注册中心接口,把节点 Node 以及注册中心服务 RegistryService 的方法整合在了这个接口里面。该接口并没有自己的方法,就是继承了 Node 和 RegistryService 接口。这里的 Node 是节点的接口,里面协定了关于节点的一些操作方法,源码如下。

  1. /**
  2. * 注册中心接口
  3. */
  4. public interface Registry extends Node, RegistryService {
  5. }
  6. public interface Node {
  7. //获得节点地址
  8. URL getUrl();
  9. //判断节点是否可用
  10. boolean isAvailable();
  11. //销毁节点
  12. void destroy();
  13. }

AbstractRegistry 抽象类

实现了 Registry 接口的抽象类。为了减轻注册中心的压力,该抽象类把本地 URL 缓存到了 property 文件中,并且实现了注册中心的注册、订阅等方法。

  1. /**
  2. * 实现了Registry接口的抽象类,实现了如下方法:
  3. *
  4. * 1、通用的注册、订阅、查询、通知等方法
  5. * 2、读取和持久化注册数据到文件,以 properties 格式存储
  6. */
  7. public abstract class AbstractRegistry implements Registry {
  8. // URL地址分隔符,用于文件缓存中,服务提供者URL分隔
  9. private static final char URL_SEPARATOR = ' ';
  10. // URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表
  11. private static final String URL_SPLIT = "\\s+";
  12. // Log output
  13. protected final Logger logger = LoggerFactory.getLogger(getClass());
  14. /**
  15. * 本地磁盘缓存。
  16. * 1. 其中特殊的 key 值 .registies 记录注册中心列表 TODO 8019 芋艿,特殊的 key 是
  17. * 2. 其它均为 {@link #notified} 服务提供者列表
  18. */
  19. private final Properties properties = new Properties();
  20. /**
  21. * 注册中心缓存写入执行器。
  22. * 线程数=1
  23. */
  24. // File cache timing writing
  25. private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
  26. /**
  27. * 是否同步保存文件
  28. */
  29. private final boolean syncSaveFile;
  30. /**
  31. * 数据版本号
  32. */
  33. private final AtomicLong lastCacheChanged = new AtomicLong();
  34. /**
  35. * 已注册 URL 集合。
  36. * 注册的 URL 可以是服务提供者的,也可以是服务消费者的
  37. */
  38. private final Set<URL> registered = new ConcurrentHashSet<URL>();
  39. /**
  40. * 订阅 URL 的监听器集合
  41. * key:订阅者的 URL ,例如消费者的 URL
  42. */
  43. private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
  44. /**
  45. * 被通知的 URL 集合
  46. * key1:消费者的 URL ,例如消费者的 URL ,和 {@link #subscribed} 的键一致
  47. * key2:分类,例如:providers、consumers、routes、configurators。【实际无 consumers ,因为消费者不会去订阅另外的消费者的列表】
  48. * 在 {@link Constants} 中,以 "_CATEGORY" 结尾
  49. */
  50. private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
  51. /**
  52. * 注册中心 URL
  53. */
  54. private URL registryUrl;
  55. /**
  56. * 本地磁盘缓存文件,缓存注册中心的数据
  57. */
  58. private File file;
  59. /**
  60. * 是否销毁
  61. */
  62. private AtomicBoolean destroyed = new AtomicBoolean(false);
  63. public AbstractRegistry(URL url) {
  64. setUrl(url);
  65. // Start file save timer
  66. syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
  67. // 获得 `file`
  68. String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
  69. File file = null;
  70. if (ConfigUtils.isNotEmpty(filename)) {
  71. file = new File(filename);
  72. if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
  73. if (!file.getParentFile().mkdirs()) {
  74. throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
  75. }
  76. }
  77. }
  78. this.file = file;
  79. // 加载本地磁盘缓存文件到内存缓存
  80. loadProperties();
  81. // 通知监听器,URL 变化结果
  82. notify(url.getBackupUrls()); // 【TODO 8020】为什么构造方法,要通知,连监听器都没注册
  83. }
  84. protected static List<URL> filterEmpty(URL url, List<URL> urls) {
  85. if (urls == null || urls.isEmpty()) {
  86. List<URL> result = new ArrayList<URL>(1);
  87. result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
  88. return result;
  89. }
  90. return urls;
  91. }
  92. @Override
  93. public URL getUrl() {
  94. return registryUrl;
  95. }
  96. protected void setUrl(URL url) {
  97. if (url == null) {
  98. throw new IllegalArgumentException("registry url == null");
  99. }
  100. this.registryUrl = url;
  101. }
  102. public Set<URL> getRegistered() {
  103. return registered;
  104. }
  105. public Map<URL, Set<NotifyListener>> getSubscribed() {
  106. return subscribed;
  107. }
  108. public Map<URL, Map<String, List<URL>>> getNotified() {
  109. return notified;
  110. }
  111. public File getCacheFile() {
  112. return file;
  113. }
  114. public Properties getCacheProperties() {
  115. return properties;
  116. }
  117. public AtomicLong getLastCacheChanged() {
  118. return lastCacheChanged;
  119. }
  120. /**
  121. * 保存内存缓存到本地磁盘缓存文件,即 {@link #properties} => {@link #file}
  122. *
  123. * @param version 数据版本号
  124. */
  125. public void doSaveProperties(long version) {
  126. if (version < lastCacheChanged.get()) {
  127. return;
  128. }
  129. if (file == null) {
  130. return;
  131. }
  132. // Save
  133. try {
  134. // 创建 .lock 文件
  135. File lockfile = new File(file.getAbsolutePath() + ".lock");
  136. if (!lockfile.exists()) {
  137. lockfile.createNewFile();
  138. }
  139. // 随机读写文件操作
  140. RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
  141. try {
  142. FileChannel channel = raf.getChannel();
  143. try {
  144. // 获得文件锁
  145. FileLock lock = channel.tryLock();
  146. // 获取失败
  147. if (lock == null) {
  148. throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
  149. }
  150. // 获取成功,进行保存
  151. // Save
  152. try {
  153. if (!file.exists()) {
  154. file.createNewFile();
  155. }
  156. FileOutputStream outputFile = new FileOutputStream(file);
  157. try {
  158. properties.store(outputFile, "Dubbo Registry Cache");
  159. } finally {
  160. outputFile.close();
  161. }
  162. // 释放文件锁
  163. } finally {
  164. lock.release();
  165. }
  166. // 释放文件 Channel
  167. } finally {
  168. channel.close();
  169. }
  170. // 释放随机读写文件操作
  171. } finally {
  172. raf.close();
  173. }
  174. } catch (Throwable e) {
  175. // 版本号过小,不保存
  176. if (version < lastCacheChanged.get()) {
  177. return;
  178. // 重新异步保存,一般情况下为上面的获取锁失败抛出的异常。通过这样的方式,达到保存成功。
  179. } else {
  180. registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
  181. }
  182. logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
  183. }
  184. }
  185. /**
  186. * 加载本地磁盘缓存文件到内存缓存,即 {@link #file} => {@link #properties}
  187. */
  188. private void loadProperties() {
  189. if (file != null && file.exists()) {
  190. InputStream in = null;
  191. try {
  192. // 文件流
  193. in = new FileInputStream(file);
  194. // 读取文件流
  195. properties.load(in);
  196. if (logger.isInfoEnabled()) {
  197. logger.info("Load registry store file " + file + ", data: " + properties);
  198. }
  199. } catch (Throwable e) {
  200. logger.warn("Failed to load registry store file " + file, e);
  201. } finally {
  202. if (in != null) {
  203. try {
  204. in.close();
  205. } catch (IOException e) {
  206. logger.warn(e.getMessage(), e);
  207. }
  208. }
  209. }
  210. }
  211. }
  212. /**
  213. * 从 `properties` 中获得缓存的 URL 集合
  214. *
  215. * @param url URL
  216. * @return URL 集合
  217. */
  218. public List<URL> getCacheUrls(URL url) {
  219. for (Map.Entry<Object, Object> entry : properties.entrySet()) {
  220. String key = (String) entry.getKey();
  221. String value = (String) entry.getValue();
  222. if (key != null && key.length() > 0 // 非空
  223. && key.equals(url.getServiceKey()) // 服务键匹配
  224. && (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_') // TODO 芋艿,_ 是什么
  225. && value != null && value.length() > 0) { // 值非空
  226. String[] arr = value.trim().split(URL_SPLIT);
  227. List<URL> urls = new ArrayList<URL>();
  228. for (String u : arr) {
  229. urls.add(URL.valueOf(u));
  230. }
  231. return urls;
  232. }
  233. }
  234. return null;
  235. }
  236. @Override
  237. public List<URL> lookup(URL url) {
  238. List<URL> result = new ArrayList<URL>();
  239. Map<String, List<URL>> notifiedUrls = getNotified().get(url);
  240. // 有数据,遍历数据获得
  241. if (notifiedUrls != null && notifiedUrls.size() > 0) {
  242. // 遍历
  243. for (List<URL> urls : notifiedUrls.values()) {
  244. for (URL u : urls) {
  245. if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
  246. result.add(u);
  247. }
  248. }
  249. }
  250. // 无数据,通过发起订阅的方式得到数据后,遍历数据获得
  251. } else {
  252. // 创建 NotifyListener 对象
  253. final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
  254. NotifyListener listener = new NotifyListener() {
  255. public void notify(List<URL> urls) {
  256. reference.set(urls);
  257. }
  258. };
  259. // 订阅获得数据
  260. subscribe(url, listener); // Subscribe logic guarantees the first notify to return
  261. List<URL> urls = reference.get();
  262. // 遍历
  263. if (urls != null && !urls.isEmpty()) {
  264. for (URL u : urls) {
  265. if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
  266. result.add(u);
  267. }
  268. }
  269. }
  270. }
  271. return result;
  272. }
  273. @Override
  274. public void register(URL url) {
  275. if (url == null) {
  276. throw new IllegalArgumentException("register url == null");
  277. }
  278. if (logger.isInfoEnabled()) {
  279. logger.info("Register: " + url);
  280. }
  281. // 添加到 registered 集合
  282. registered.add(url);
  283. }
  284. @Override
  285. public void unregister(URL url) {
  286. if (url == null) {
  287. throw new IllegalArgumentException("unregister url == null");
  288. }
  289. if (logger.isInfoEnabled()) {
  290. logger.info("Unregister: " + url);
  291. }
  292. // 移除出 registered 集合
  293. registered.remove(url);
  294. }
  295. @Override
  296. public void subscribe(URL url, NotifyListener listener) {
  297. if (url == null) {
  298. throw new IllegalArgumentException("subscribe url == null");
  299. }
  300. if (listener == null) {
  301. throw new IllegalArgumentException("subscribe listener == null");
  302. }
  303. if (logger.isInfoEnabled()) {
  304. logger.info("Subscribe: " + url);
  305. }
  306. // 添加到 subscribed 集合
  307. Set<NotifyListener> listeners = subscribed.get(url);
  308. if (listeners == null) {
  309. subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
  310. listeners = subscribed.get(url);
  311. }
  312. listeners.add(listener);
  313. }
  314. @Override
  315. public void unsubscribe(URL url, NotifyListener listener) {
  316. if (url == null) {
  317. throw new IllegalArgumentException("unsubscribe url == null");
  318. }
  319. if (listener == null) {
  320. throw new IllegalArgumentException("unsubscribe listener == null");
  321. }
  322. if (logger.isInfoEnabled()) {
  323. logger.info("Unsubscribe: " + url);
  324. }
  325. // 移除出 subscribed 集合
  326. Set<NotifyListener> listeners = subscribed.get(url);
  327. if (listeners != null) {
  328. listeners.remove(listener);
  329. }
  330. }
  331. /**
  332. * 恢复注册和订阅
  333. *
  334. * @throws Exception 发生异常
  335. */
  336. protected void recover() throws Exception {
  337. // register 恢复注册
  338. Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
  339. if (!recoverRegistered.isEmpty()) {
  340. if (logger.isInfoEnabled()) {
  341. logger.info("Recover register url " + recoverRegistered);
  342. }
  343. for (URL url : recoverRegistered) {
  344. register(url);
  345. }
  346. }
  347. // subscribe 恢复订阅
  348. Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
  349. if (!recoverSubscribed.isEmpty()) {
  350. if (logger.isInfoEnabled()) {
  351. logger.info("Recover subscribe url " + recoverSubscribed.keySet());
  352. }
  353. for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
  354. URL url = entry.getKey();
  355. for (NotifyListener listener : entry.getValue()) {
  356. subscribe(url, listener);
  357. }
  358. }
  359. }
  360. }
  361. /**
  362. * 通知监听器,URL 变化结果。
  363. *
  364. * @param urls 通知的 URL 变化结果(全量数据)
  365. */
  366. protected void notify(List<URL> urls) {
  367. if (urls == null || urls.isEmpty()) return;
  368. // 循环 `subscribed` ,通知监听器们
  369. for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
  370. URL url = entry.getKey();
  371. // 匹配
  372. if (!UrlUtils.isMatch(url, urls.get(0))) {
  373. continue;
  374. }
  375. // 通知监听器
  376. Set<NotifyListener> listeners = entry.getValue();
  377. if (listeners != null) {
  378. for (NotifyListener listener : listeners) {
  379. try {
  380. notify(url, listener, filterEmpty(url, urls));
  381. } catch (Throwable t) {
  382. logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
  383. }
  384. }
  385. }
  386. }
  387. }
  388. /**
  389. * 通知监听器,URL 变化结果。
  390. *
  391. * 数据流向 `urls` => {@link #notified} => {@link #properties} => {@link #file}
  392. *
  393. * @param url 消费者 URL
  394. * @param listener 监听器
  395. * @param urls 通知的 URL 变化结果(全量数据)
  396. */
  397. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  398. if (url == null) {
  399. throw new IllegalArgumentException("notify url == null");
  400. }
  401. if (listener == null) {
  402. throw new IllegalArgumentException("notify listener == null");
  403. }
  404. if ((urls == null || urls.isEmpty())
  405. && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  406. logger.warn("Ignore empty notify urls for subscribe url " + url);
  407. return;
  408. }
  409. if (logger.isInfoEnabled()) {
  410. logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
  411. }
  412. // 将 `urls` 按照 `url.parameter.category` 分类,添加到集合
  413. // 注意,特殊情况,使用 curator 连接 Zookeeper 时,若是服务消费者,连接断开,会出现 category=providers,configurations,routes
  414. Map<String, List<URL>> result = new HashMap<String, List<URL>>();
  415. for (URL u : urls) {
  416. if (UrlUtils.isMatch(url, u)) {
  417. String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  418. List<URL> categoryList = result.get(category);
  419. if (categoryList == null) {
  420. categoryList = new ArrayList<URL>();
  421. result.put(category, categoryList);
  422. }
  423. categoryList.add(u);
  424. }
  425. }
  426. if (result.size() == 0) {
  427. return;
  428. }
  429. // 获得消费者 URL 对应的在 `notified` 中,通知的 URL 变化结果(全量数据)
  430. Map<String, List<URL>> categoryNotified = notified.get(url);
  431. if (categoryNotified == null) {
  432. notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
  433. categoryNotified = notified.get(url);
  434. }
  435. // 【按照分类循环】处理通知的 URL 变化结果(全量数据)
  436. for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
  437. String category = entry.getKey();
  438. List<URL> categoryList = entry.getValue();
  439. // 覆盖到 `notified`
  440. // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
  441. categoryNotified.put(category, categoryList);
  442. // 保存到文件
  443. saveProperties(url);
  444. // 通知监听器
  445. listener.notify(categoryList);
  446. }
  447. }
  448. /**
  449. * 保存单个消费者 URL 对应,在 `notified` 的数据,到文件。
  450. *
  451. * @param url 消费者 URL
  452. */
  453. private void saveProperties(URL url) {
  454. if (file == null) {
  455. return;
  456. }
  457. try {
  458. // 拼接 URL
  459. StringBuilder buf = new StringBuilder();
  460. Map<String, List<URL>> categoryNotified = notified.get(url);
  461. if (categoryNotified != null) {
  462. for (List<URL> us : categoryNotified.values()) {
  463. for (URL u : us) {
  464. if (buf.length() > 0) {
  465. buf.append(URL_SEPARATOR);
  466. }
  467. buf.append(u.toFullString());
  468. }
  469. }
  470. }
  471. // 设置到 properties 中
  472. properties.setProperty(url.getServiceKey(), buf.toString());
  473. // 增加数据版本号
  474. long version = lastCacheChanged.incrementAndGet();
  475. // 保存到文件
  476. if (syncSaveFile) {
  477. doSaveProperties(version);
  478. } else {
  479. registryCacheExecutor.execute(new SaveProperties(version));
  480. }
  481. } catch (Throwable t) {
  482. logger.warn(t.getMessage(), t);
  483. }
  484. }
  485. /**
  486. * 取消注册和订阅
  487. */
  488. @Override
  489. public void destroy() {
  490. // 已销毁,跳过
  491. if (!destroyed.compareAndSet(false, true)) {
  492. return;
  493. }
  494. if (logger.isInfoEnabled()) {
  495. logger.info("Destroy registry:" + getUrl());
  496. }
  497. // 取消注册
  498. Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
  499. if (!destroyRegistered.isEmpty()) {
  500. for (URL url : new HashSet<URL>(getRegistered())) {
  501. if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
  502. try {
  503. unregister(url); // 取消注册
  504. if (logger.isInfoEnabled()) {
  505. logger.info("Destroy unregister url " + url);
  506. }
  507. } catch (Throwable t) {
  508. logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
  509. }
  510. }
  511. }
  512. }
  513. // 取消订阅
  514. Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
  515. if (!destroySubscribed.isEmpty()) {
  516. for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
  517. URL url = entry.getKey();
  518. for (NotifyListener listener : entry.getValue()) {
  519. try {
  520. unsubscribe(url, listener); // 取消订阅
  521. if (logger.isInfoEnabled()) {
  522. logger.info("Destroy unsubscribe url " + url);
  523. }
  524. } catch (Throwable t) {
  525. logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
  526. }
  527. }
  528. }
  529. }
  530. }
  531. public String toString() {
  532. return getUrl().toString();
  533. }
  534. /**
  535. * 保存配置的 Runnable任务
  536. */
  537. private class SaveProperties implements Runnable {
  538. /**
  539. * 数据版本号
  540. */
  541. private long version;
  542. private SaveProperties(long version) {
  543. this.version = version;
  544. }
  545. public void run() {
  546. doSaveProperties(version);
  547. }
  548. }
  549. }

FailbackRegistry 抽象类

FailbackRegistry 抽象类 继承了上面的 AbstractRegistry,AbstractRegistry 中的注册、订阅等方法,实际上就是一些内存缓存的变化,而真正的注册订阅的实现逻辑在 FailbackRegistry 实现,并且 FailbackRegistry 提供了失败重试的机制。

  1. /**
  2. * 支持失败重试的 FailbackRegistry抽象类
  3. */
  4. public abstract class FailbackRegistry extends AbstractRegistry {
  5. /**
  6. * 定时任务执行器
  7. */
  8. private final ScheduledExecutorService retryExecutor = Executors.
  9. newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
  10. /**
  11. * 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
  12. */
  13. private final ScheduledFuture<?> retryFuture;
  14. /**
  15. * 注册失败的 URL 集合
  16. */
  17. private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
  18. /**
  19. * 取消注册失败的 URL 集合
  20. */
  21. private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
  22. /**
  23. * 订阅失败的监听器集合
  24. */
  25. private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
  26. /**
  27. * 取消订阅失败的监听器集合
  28. */
  29. private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
  30. /**
  31. * 通知失败的 URL 集合
  32. */
  33. private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
  34. /**
  35. * 是否销毁
  36. */
  37. private AtomicBoolean destroyed = new AtomicBoolean(false);
  38. public FailbackRegistry(URL url) {
  39. super(url);
  40. // 重试频率,单位:毫秒
  41. int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
  42. // 创建失败重试定时器
  43. this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
  44. public void run() {
  45. // Check and connect to the registry
  46. try {
  47. retry();
  48. } catch (Throwable t) { // Defensive fault tolerance
  49. logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
  50. }
  51. }
  52. }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
  53. }
  54. public Future<?> getRetryFuture() {
  55. return retryFuture;
  56. }
  57. public Set<URL> getFailedRegistered() {
  58. return failedRegistered;
  59. }
  60. public Set<URL> getFailedUnregistered() {
  61. return failedUnregistered;
  62. }
  63. public Map<URL, Set<NotifyListener>> getFailedSubscribed() {
  64. return failedSubscribed;
  65. }
  66. public Map<URL, Set<NotifyListener>> getFailedUnsubscribed() {
  67. return failedUnsubscribed;
  68. }
  69. public Map<URL, Map<NotifyListener, List<URL>>> getFailedNotified() {
  70. return failedNotified;
  71. }
  72. /**
  73. * 添加到 `failedSubscribed`
  74. */
  75. private void addFailedSubscribed(URL url, NotifyListener listener) {
  76. Set<NotifyListener> listeners = failedSubscribed.get(url);
  77. if (listeners == null) {
  78. failedSubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
  79. listeners = failedSubscribed.get(url);
  80. }
  81. listeners.add(listener);
  82. }
  83. /**
  84. * 移除出 `failedSubscribed` `failedUnsubscribed` `failedNotified`
  85. */
  86. private void removeFailedSubscribed(URL url, NotifyListener listener) {
  87. // 移除出 `failedSubscribed`
  88. Set<NotifyListener> listeners = failedSubscribed.get(url);
  89. if (listeners != null) {
  90. listeners.remove(listener);
  91. }
  92. // 移除出 `failedUnsubscribed`
  93. listeners = failedUnsubscribed.get(url);
  94. if (listeners != null) {
  95. listeners.remove(listener);
  96. }
  97. // 移除出 `failedNotified`
  98. Map<NotifyListener, List<URL>> notified = failedNotified.get(url);
  99. if (notified != null) {
  100. notified.remove(listener);
  101. }
  102. }
  103. @Override
  104. public void register(URL url) {
  105. // 已销毁,跳过
  106. if (destroyed.get()){
  107. return;
  108. }
  109. // 添加到 `registered` 变量
  110. super.register(url);
  111. // 移除出 `failedRegistered` `failedUnregistered` 变量
  112. failedRegistered.remove(url);
  113. failedUnregistered.remove(url);
  114. // 向注册中心发送注册请求
  115. try {
  116. doRegister(url);
  117. } catch (Exception e) {
  118. Throwable t = e;
  119. // 如果开启了启动时检测,则直接抛出异常
  120. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  121. && url.getParameter(Constants.CHECK_KEY, true)
  122. && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); // 非消费者。消费者会在 `ReferenceConfig#createProxy(...)` 方法中,调用 `Invoker#avalible()` 方法,进行检查。
  123. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  124. if (check || skipFailback) {
  125. if (skipFailback) {
  126. t = t.getCause();
  127. }
  128. throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  129. } else {
  130. logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  131. }
  132. // 将失败的注册请求记录到 `failedRegistered`,定时重试
  133. failedRegistered.add(url);
  134. }
  135. }
  136. @Override
  137. public void unregister(URL url) {
  138. // 已销毁,跳过
  139. if (destroyed.get()){
  140. return;
  141. }
  142. // 移除出 `registered` 变量
  143. super.unregister(url);
  144. // 移除出 `failedRegistered` `failedUnregistered` 变量
  145. failedRegistered.remove(url);
  146. failedUnregistered.remove(url);
  147. // 向注册中心发送取消注册请求
  148. try {
  149. doUnregister(url);
  150. } catch (Exception e) {
  151. Throwable t = e;
  152. // 如果开启了启动时检测,则直接抛出异常
  153. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  154. && url.getParameter(Constants.CHECK_KEY, true)
  155. && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
  156. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  157. if (check || skipFailback) {
  158. if (skipFailback) {
  159. t = t.getCause();
  160. }
  161. throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  162. } else {
  163. logger.error("Failed to uregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  164. }
  165. // 将失败的取消注册请求记录到 `failedUnregistered`,定时重试
  166. failedUnregistered.add(url);
  167. }
  168. }
  169. @Override
  170. public void subscribe(URL url, NotifyListener listener) {
  171. // 已销毁,跳过
  172. if (destroyed.get()){
  173. return;
  174. }
  175. // 移除出 `subscribed` 变量
  176. super.subscribe(url, listener);
  177. // 移除出 `failedSubscribed` `failedUnsubscribed` `failedNotified`
  178. removeFailedSubscribed(url, listener);
  179. // 向注册中心发送订阅请求
  180. try {
  181. doSubscribe(url, listener);
  182. } catch (Exception e) {
  183. Throwable t = e;
  184. // 如果有缓存的 URL 集合,进行通知。后续订阅成功后,会使用最新的 URL 集合,进行通知。
  185. List<URL> urls = getCacheUrls(url);
  186. if (urls != null && !urls.isEmpty()) {
  187. notify(url, listener, urls);
  188. logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
  189. } else {
  190. // 如果开启了启动时检测,则直接抛出异常
  191. // If the startup detection is opened, the Exception is thrown directly.
  192. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  193. && url.getParameter(Constants.CHECK_KEY, true);
  194. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  195. if (check || skipFailback) {
  196. if (skipFailback) {
  197. t = t.getCause();
  198. }
  199. throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
  200. } else {
  201. logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  202. }
  203. }
  204. // 将失败的订阅请求记录到 `failedSubscribed`,定时重试
  205. // Record a failed registration request to a failed list, retry regularly
  206. addFailedSubscribed(url, listener);
  207. }
  208. }
  209. @Override
  210. public void unsubscribe(URL url, NotifyListener listener) {
  211. // 已销毁,跳过
  212. if (destroyed.get()){
  213. return;
  214. }
  215. // 移除出 `unsubscribed` 变量
  216. super.unsubscribe(url, listener);
  217. // 移除出 `failedSubscribed` `failedUnsubscribed` `failedNotified`
  218. removeFailedSubscribed(url, listener);
  219. // 向注册中心发送取消订阅请求
  220. try {
  221. // Sending a canceling subscription request to the server side
  222. doUnsubscribe(url, listener);
  223. } catch (Exception e) {
  224. Throwable t = e;
  225. // 如果开启了启动时检测,则直接抛出异常
  226. // If the startup detection is opened, the Exception is thrown directly.
  227. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  228. && url.getParameter(Constants.CHECK_KEY, true);
  229. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  230. if (check || skipFailback) {
  231. if (skipFailback) {
  232. t = t.getCause();
  233. }
  234. throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  235. } else {
  236. logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  237. }
  238. // 将失败的订阅请求记录到 `failedUnsubscribed`,定时重试
  239. // Record a failed registration request to a failed list, retry regularly
  240. Set<NotifyListener> listeners = failedUnsubscribed.get(url);
  241. if (listeners == null) {
  242. failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
  243. listeners = failedUnsubscribed.get(url);
  244. }
  245. listeners.add(listener);
  246. }
  247. }
  248. @Override
  249. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  250. if (url == null) {
  251. throw new IllegalArgumentException("notify url == null");
  252. }
  253. if (listener == null) {
  254. throw new IllegalArgumentException("notify listener == null");
  255. }
  256. // 通知监听器
  257. try {
  258. doNotify(url, listener, urls);
  259. } catch (Exception t) {
  260. // 将失败的通知记录到 `failedNotified`,定时重试
  261. Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
  262. if (listeners == null) {
  263. failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
  264. listeners = failedNotified.get(url);
  265. }
  266. listeners.put(listener, urls);
  267. logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  268. }
  269. }
  270. protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
  271. super.notify(url, listener, urls);
  272. }
  273. @Override
  274. protected void recover() throws Exception {
  275. // register 恢复注册,添加到 `failedRegistered` ,定时重试
  276. Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
  277. if (!recoverRegistered.isEmpty()) {
  278. if (logger.isInfoEnabled()) {
  279. logger.info("Recover register url " + recoverRegistered);
  280. }
  281. for (URL url : recoverRegistered) {
  282. failedRegistered.add(url);
  283. }
  284. }
  285. // subscribe 恢复订阅,添加到 `failedSubscribed` ,定时重试
  286. Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
  287. if (!recoverSubscribed.isEmpty()) {
  288. if (logger.isInfoEnabled()) {
  289. logger.info("Recover subscribe url " + recoverSubscribed.keySet());
  290. }
  291. for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
  292. URL url = entry.getKey();
  293. for (NotifyListener listener : entry.getValue()) {
  294. addFailedSubscribed(url, listener);
  295. }
  296. }
  297. }
  298. }
  299. /**
  300. * 重试
  301. */
  302. protected void retry() {
  303. // 重试执行注册
  304. if (!failedRegistered.isEmpty()) {
  305. Set<URL> failed = new HashSet<URL>(failedRegistered); // 避免并发冲突
  306. if (failed.size() > 0) {
  307. if (logger.isInfoEnabled()) {
  308. logger.info("Retry register " + failed);
  309. }
  310. try {
  311. for (URL url : failed) {
  312. try {
  313. // 执行注册
  314. doRegister(url);
  315. // 移除出 `failedRegistered`
  316. failedRegistered.remove(url);
  317. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  318. logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  319. }
  320. }
  321. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  322. logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  323. }
  324. }
  325. }
  326. // 重试执行取消注册
  327. if (!failedUnregistered.isEmpty()) {
  328. Set<URL> failed = new HashSet<URL>(failedUnregistered); // 避免并发冲突
  329. if (!failed.isEmpty()) {
  330. if (logger.isInfoEnabled()) {
  331. logger.info("Retry unregister " + failed);
  332. }
  333. try {
  334. for (URL url : failed) {
  335. try {
  336. // 执行取消注册
  337. doUnregister(url);
  338. // 移除出 `failedUnregistered`
  339. failedUnregistered.remove(url);
  340. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  341. logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  342. }
  343. }
  344. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  345. logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  346. }
  347. }
  348. }
  349. // 重试执行注册
  350. if (!failedSubscribed.isEmpty()) {
  351. Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); // 避免并发冲突
  352. for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
  353. if (entry.getValue() == null || entry.getValue().size() == 0) {
  354. failed.remove(entry.getKey());
  355. }
  356. }
  357. if (failed.size() > 0) {
  358. if (logger.isInfoEnabled()) {
  359. logger.info("Retry subscribe " + failed);
  360. }
  361. try {
  362. for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
  363. URL url = entry.getKey();
  364. Set<NotifyListener> listeners = entry.getValue();
  365. for (NotifyListener listener : listeners) {
  366. try {
  367. // 执行注册
  368. doSubscribe(url, listener);
  369. // 移除出监听器
  370. listeners.remove(listener);
  371. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  372. logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  373. }
  374. }
  375. }
  376. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  377. logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  378. }
  379. }
  380. }
  381. // 重试执行取消注册
  382. if (!failedUnsubscribed.isEmpty()) {
  383. Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
  384. for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
  385. if (entry.getValue() == null || entry.getValue().isEmpty()) {
  386. failed.remove(entry.getKey());
  387. }
  388. }
  389. if (failed.size() > 0) {
  390. if (logger.isInfoEnabled()) {
  391. logger.info("Retry unsubscribe " + failed);
  392. }
  393. try {
  394. for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
  395. URL url = entry.getKey();
  396. Set<NotifyListener> listeners = entry.getValue();
  397. for (NotifyListener listener : listeners) {
  398. try {
  399. // 执行取消注册
  400. doUnsubscribe(url, listener);
  401. // 移除出监听器
  402. listeners.remove(listener);
  403. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  404. logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  405. }
  406. }
  407. }
  408. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  409. logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  410. }
  411. }
  412. }
  413. // 重试执行通知监听器
  414. if (!failedNotified.isEmpty()) {
  415. Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
  416. for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
  417. if (entry.getValue() == null || entry.getValue().size() == 0) {
  418. failed.remove(entry.getKey());
  419. }
  420. }
  421. if (failed.size() > 0) {
  422. if (logger.isInfoEnabled()) {
  423. logger.info("Retry notify " + failed);
  424. }
  425. try {
  426. for (Map<NotifyListener, List<URL>> values : failed.values()) {
  427. for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
  428. try {
  429. NotifyListener listener = entry.getKey();
  430. List<URL> urls = entry.getValue();
  431. // 通知监听器
  432. listener.notify(urls);
  433. // 移除出监听器
  434. values.remove(listener);
  435. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  436. logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  437. }
  438. }
  439. }
  440. } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
  441. logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
  442. }
  443. }
  444. }
  445. }
  446. @Override
  447. public void destroy() {
  448. // 忽略,若已经销毁
  449. if (!canDestroy()) {
  450. return;
  451. }
  452. // 调用父方法,取消注册和订阅
  453. super.destroy();
  454. // 销毁重试任务
  455. try {
  456. retryFuture.cancel(true);
  457. } catch (Throwable t) {
  458. logger.warn(t.getMessage(), t);
  459. }
  460. }
  461. // TODO: 2017/8/30 to abstract this method
  462. protected boolean canDestroy(){
  463. return destroyed.compareAndSet(false, true);
  464. }
  465. // ==== Template method ====
  466. protected abstract void doRegister(URL url);
  467. protected abstract void doUnregister(URL url);
  468. protected abstract void doSubscribe(URL url, NotifyListener listener);
  469. protected abstract void doUnsubscribe(URL url, NotifyListener listener);
  470. }

RegistryFactory 和 AbstractRegistryFactory

RegistryFactory 接口 是 Registry 的工厂接口,用来返回 Registry 实例。该接口是一个可扩展接口,可以看到该接口上有个@SPI 注解,并且默认值为 dubbo,也就是默认扩展的是 DubboRegistryFactory。AbstractRegistryFactory 则是实现了 RegistryFactory 接口 的抽象类。其源码如下。

  1. /**
  2. * 注册中心工厂
  3. */
  4. @SPI("dubbo")
  5. public interface RegistryFactory {
  6. /**
  7. * 根据注册中心连接地址,获取注册中心实例
  8. * <p>
  9. * 连接注册中心需处理契约:<br>
  10. * 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
  11. * 2. 支持URL上的username:password权限认证。<br>
  12. * 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
  13. * 4. 支持file=registry.cache本地磁盘文件缓存。<br>
  14. * 5. 支持timeout=1000请求超时设置。<br>
  15. * 6. 支持session=60000会话超时或过期设置。<br>
  16. *
  17. * @param url 注册中心地址,不允许为空
  18. * @return 注册中心引用,总不返回空
  19. */
  20. @Adaptive({"protocol"})
  21. Registry getRegistry(URL url);
  22. }
  23. /**
  24. * 注册中心抽象类
  25. */
  26. public abstract class AbstractRegistryFactory implements RegistryFactory {
  27. // Log output
  28. private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
  29. // The lock for the acquisition process of the registry
  30. private static final ReentrantLock LOCK = new ReentrantLock();
  31. /**
  32. * Registry 集合
  33. */
  34. private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
  35. /**
  36. * Get all registries
  37. */
  38. public static Collection<Registry> getRegistries() {
  39. return Collections.unmodifiableCollection(REGISTRIES.values());
  40. }
  41. /**
  42. * 销毁所有 Registry
  43. */
  44. // TODO: 2017/8/30 to move somewhere else better
  45. public static void destroyAll() {
  46. if (LOGGER.isInfoEnabled()) {
  47. LOGGER.info("Close all registries " + getRegistries());
  48. }
  49. // 获得锁
  50. LOCK.lock();
  51. try {
  52. // 销毁
  53. for (Registry registry : getRegistries()) {
  54. try {
  55. registry.destroy();
  56. } catch (Throwable e) {
  57. LOGGER.error(e.getMessage(), e);
  58. }
  59. }
  60. // 清空缓存
  61. REGISTRIES.clear();
  62. } finally {
  63. // 释放锁
  64. LOCK.unlock();
  65. }
  66. }
  67. /**
  68. * 获得注册中心 Registry 对象
  69. *
  70. * @param url 注册中心地址,不允许为空
  71. * @return Registry 对象
  72. */
  73. @Override
  74. public Registry getRegistry(URL url) {
  75. // 修改 URL
  76. url = url.setPath(RegistryService.class.getName()) // + `path`
  77. .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) // + `parameters.interface`
  78. .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); // - `export`
  79. // 计算 key
  80. String key = url.toServiceString();
  81. // 获得锁
  82. // Lock the registry access process to ensure a single instance of the registry
  83. LOCK.lock();
  84. try {
  85. // 从缓存中获得 Registry 对象
  86. Registry registry = REGISTRIES.get(key);
  87. if (registry != null) {
  88. return registry;
  89. }
  90. // 缓存不存在,进行创建 Registry 对象
  91. registry = createRegistry(url);
  92. if (registry == null) {
  93. throw new IllegalStateException("Can not create registry " + url);
  94. }
  95. // 添加到缓存
  96. REGISTRIES.put(key, registry);
  97. return registry;
  98. } finally {
  99. // 释放锁
  100. // Release the lock
  101. LOCK.unlock();
  102. }
  103. }
  104. /**
  105. * 创建 Registry 对象
  106. *
  107. * @param url 注册中心地址
  108. * @return Registry 对象
  109. */
  110. protected abstract Registry createRegistry(URL url);
  111. }

NotifyListener 和 RegistryDirectory

最后我们来看一下 dubbo-registry-api 模块下的另一个比较重要的组件,NotifyListener 接口 和 RegistryDirectory 抽象类。NotifyListener 接口 只有一个 notify 方法,通知监听器。当收到服务变更通知时触发。RegistryDirectory 是注册中心服务,维护着所有可用的远程 Invoker 或者本地的 Invoker,它的 Invoker 集合是从注册中心获取的,另外,它实现了 NotifyListener 接口。比如消费方要调用某远程服务,会向注册中心订阅这个服务的所有 服务提供方,在订阅 及 服务提供方数据有变动时,回调消费方的 NotifyListener 服务的 notify 方法,回调接口传入所有服务提供方的 url 地址然后将 urls 转化为 invokers,也就是 refer 应用远程服务。源码如下。

  1. /**
  2. * 通知监听器
  3. */
  4. public interface NotifyListener {
  5. /**
  6. * 当收到服务变更通知时触发。
  7. * <p>
  8. * 通知需处理契约:<br>
  9. * 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
  10. * 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
  11. * 3. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
  12. * 4. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
  13. * 5. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
  14. *
  15. * @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
  16. */
  17. void notify(List<URL> urls);
  18. }
  19. /**
  20. * 基于注册中心的 Directory 实现类
  21. */
  22. public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
  23. private static final Logger logger = LoggerFactory.getLogger(RegistryDirectory.class);
  24. // ========== Dubbo SPI Adaptive 对象 BEGIN ==========
  25. /**
  26. * Cluster$Adaptive 对象
  27. */
  28. private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
  29. /**
  30. * RouterFactory$Adaptive 对象
  31. */
  32. private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
  33. /**
  34. * ConfiguratorFactory$Adaptive 对象
  35. */
  36. private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
  37. // ========== 服务消费者相关 BEGIN ==========
  38. /**
  39. * 服务类型,例如:com.alibaba.dubbo.demo.DemoService
  40. */
  41. private final Class<T> serviceType; // Initialization at construction time, assertion not null
  42. /**
  43. * Consumer URL 的配置项 Map
  44. */
  45. private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
  46. /**
  47. * 服务方法数组
  48. */
  49. private final String[] serviceMethods;
  50. /**
  51. * 是否引用多分组
  52. *
  53. * 服务分组:https://dubbo.gitbooks.io/dubbo-user-book/demos/service-group.html
  54. */
  55. private final boolean multiGroup;
  56. // ========== 注册中心相关 BEGIN ==========
  57. /**
  58. * 注册中心的 Protocol 对象
  59. */
  60. private Protocol protocol; // Initialization at the time of injection, the assertion is not null
  61. /**
  62. * 注册中心
  63. */
  64. private Registry registry; // Initialization at the time of injection, the assertion is not null
  65. /**
  66. * 注册中心的服务类,目前是 com.alibaba.dubbo.registry.RegistryService
  67. *
  68. * 通过 {@link #url} 的 {@link URL#getServiceKey()} 获得
  69. */
  70. private final String serviceKey; // Initialization at construction time, assertion not null
  71. /**
  72. * 是否禁止访问。
  73. *
  74. * 有两种情况会导致:
  75. *
  76. * 1. 没有服务提供者
  77. * 2. 服务提供者被禁用
  78. */
  79. private volatile boolean forbidden = false;
  80. // ========== 配置规则相关 BEGIN ==========
  81. /**
  82. * 原始的目录 URL
  83. *
  84. * 例如:zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&callbacks=1000&check=false&client=netty4&cluster=failback&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,callbackParam,save,update,say03,delete,say04,demo,say01,bye,say02,saves&payload=1000&pid=63400&qos.port=33333&register.ip=192.168.16.23&sayHello.async=true&side=consumer&timeout=10000&timestamp=1527056491064
  85. */
  86. private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
  87. /**
  88. * 覆写的目录 URL ,结合配置规则
  89. */
  90. private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
  91. /**
  92. * 配置规则数组
  93. *
  94. * override rules
  95. * Priority: override>-D>consumer>provider
  96. * Rule one: for a certain provider <ip:port,timeout=100>
  97. * Rule two: for all providers <* ,timeout=5000>
  98. */
  99. private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
  100. // ========== 服务提供者相关 BEGIN ==========
  101. /**
  102. * [url]与[服务提供者 Invoker 集合]的映射缓存
  103. */
  104. // Map<url, Invoker> cache service url to invoker mapping.
  105. private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
  106. /**
  107. * [方法名]与[服务提供者 Invoker 集合]的映射缓存
  108. */
  109. // Map<methodName, Invoker> cache service method to invokers mapping.
  110. private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
  111. /**
  112. * [服务提供者 Invoker 集合]缓存
  113. */
  114. // Set<invokerUrls> cache invokeUrls to invokers mapping.
  115. private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
  116. public RegistryDirectory(Class<T> serviceType, URL url) {
  117. super(url);
  118. if (serviceType == null) {
  119. throw new IllegalArgumentException("service type is null.");
  120. }
  121. if (url.getServiceKey() == null || url.getServiceKey().length() == 0) {
  122. throw new IllegalArgumentException("registry serviceKey is null.");
  123. }
  124. this.serviceType = serviceType;
  125. this.serviceKey = url.getServiceKey();
  126. // 获得 queryMap
  127. this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
  128. // 获得 overrideDirectoryUrl 和 directoryUrl
  129. this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
  130. // 初始化 multiGroup
  131. String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
  132. this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
  133. // 初始化 serviceMethods
  134. String methods = queryMap.get(Constants.METHODS_KEY);
  135. this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
  136. }
  137. /**
  138. * 将overrideURL 转换为 map,供重新 refer 时使用.
  139. * 每次下发全部规则,全部重新组装计算
  140. *
  141. * @param urls 契约:
  142. * </br>1.override://0.0.0.0/...(或override://ip:port...?anyhost=true)&para1=value1...表示全局规则(对所有的提供者全部生效)
  143. * </br>2.override://ip:port...?anyhost=false 特例规则(只针对某个提供者生效)
  144. * </br>3.不支持override://规则... 需要注册中心自行计算.
  145. * </br>4.不带参数的override://0.0.0.0/ 表示清除override
  146. *
  147. * @return Configurator 集合
  148. */
  149. public static List<Configurator> toConfigurators(List<URL> urls) {
  150. // 忽略,若配置规则 URL 集合为空
  151. if (urls == null || urls.isEmpty()) {
  152. return Collections.emptyList();
  153. }
  154. // 创建 Configurator 集合
  155. List<Configurator> configurators = new ArrayList<Configurator>(urls.size());
  156. for (URL url : urls) {
  157. // 若协议为 `empty://` ,意味着清空所有配置规则,因此返回空 Configurator 集合
  158. if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
  159. configurators.clear();
  160. break;
  161. }
  162. // 对应第 4 条契约,不带参数的 override://0.0.0.0/ 表示清除 override
  163. Map<String, String> override = new HashMap<String, String>(url.getParameters());
  164. // The anyhost parameter of override may be added automatically, it can't change the judgement of changing url
  165. // override 上的 anyhost 可能是自动添加的,不能影响改变url判断
  166. override.remove(Constants.ANYHOST_KEY);
  167. if (override.size() == 0) {
  168. configurators.clear();
  169. continue;
  170. }
  171. // 获得 Configurator 对象,并添加到 `configurators` 中
  172. configurators.add(configuratorFactory.getConfigurator(url));
  173. }
  174. // 排序
  175. Collections.sort(configurators);
  176. return configurators;
  177. }
  178. public void setProtocol(Protocol protocol) {
  179. this.protocol = protocol;
  180. }
  181. public void setRegistry(Registry registry) {
  182. this.registry = registry;
  183. }
  184. /**
  185. * 发起订阅
  186. *
  187. * @param url 消费者 URL
  188. */
  189. public void subscribe(URL url) {
  190. // 设置消费者 URL
  191. setConsumerUrl(url);
  192. // 向注册中心,发起订阅
  193. registry.subscribe(url, this);
  194. }
  195. @Override
  196. public void destroy() {
  197. if (isDestroyed()) {
  198. return;
  199. }
  200. // 取消订阅
  201. // unsubscribe.
  202. try {
  203. if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
  204. registry.unsubscribe(getConsumerUrl(), this);
  205. }
  206. } catch (Throwable t) {
  207. logger.warn("unexpeced error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
  208. }
  209. // 标记已经销毁
  210. super.destroy(); // must be executed after unsubscribing
  211. // 销毁所有 Invoker
  212. try {
  213. destroyAllInvokers();
  214. } catch (Throwable t) {
  215. logger.warn("Failed to destroy service " + serviceKey, t);
  216. }
  217. }
  218. @Override
  219. public synchronized void notify(List<URL> urls) {
  220. // 根据 URL 的分类或协议,分组成三个集合 。
  221. List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 URL 集合
  222. List<URL> routerUrls = new ArrayList<URL>();
  223. List<URL> configuratorUrls = new ArrayList<URL>();
  224. for (URL url : urls) {
  225. String protocol = url.getProtocol();
  226. String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  227. if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {
  228. routerUrls.add(url);
  229. } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
  230. configuratorUrls.add(url);
  231. } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
  232. invokerUrls.add(url);
  233. } else {
  234. logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
  235. }
  236. }
  237. // 处理配置规则 URL 集合
  238. // configurators
  239. if (!configuratorUrls.isEmpty()) {
  240. this.configurators = toConfigurators(configuratorUrls);
  241. }
  242. // 处理路由规则 URL 集合
  243. // routers
  244. if (!routerUrls.isEmpty()) {
  245. List<Router> routers = toRouters(routerUrls);
  246. if (routers != null) { // null - do nothing
  247. setRouters(routers);
  248. }
  249. }
  250. // 合并配置规则,到 `directoryUrl` 中,形成 `overrideDirectoryUrl` 变量。
  251. List<Configurator> localConfigurators = this.configurators; // local reference
  252. // merge override parameters
  253. this.overrideDirectoryUrl = directoryUrl;
  254. if (localConfigurators != null && !localConfigurators.isEmpty()) {
  255. for (Configurator configurator : localConfigurators) {
  256. this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
  257. }
  258. }
  259. // 处理服务提供者 URL 集合
  260. // providers
  261. refreshInvoker(invokerUrls);
  262. }
  263. /**
  264. * 根据invokerURL列表转换为invoker列表。转换规则如下:
  265. *
  266. * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用
  267. * 2.如果传入的invoker列表不为空,则表示最新的invoker列表
  268. * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。
  269. *
  270. * @param invokerUrls 传入的参数不能为null
  271. */
  272. // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
  273. private void refreshInvoker(List<URL> invokerUrls) {
  274. if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  275. && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  276. // 设置禁止访问
  277. this.forbidden = true; // Forbid to access
  278. // methodInvokerMap 置空
  279. this.methodInvokerMap = null; // Set the method invoker map to null
  280. // 销毁所有 Invoker 集合
  281. destroyAllInvokers(); // Close all invokers
  282. } else {
  283. // 设置允许访问
  284. this.forbidden = false; // Allow to access
  285. // 引用老的 urlInvokerMap
  286. Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
  287. // 传入的 invokerUrls 为空,说明是路由规则或配置规则发生改变,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。
  288. if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
  289. invokerUrls.addAll(this.cachedInvokerUrls);
  290. // 传入的 invokerUrls 非空,更新 cachedInvokerUrls 。
  291. } else {
  292. this.cachedInvokerUrls = new HashSet<URL>();
  293. this.cachedInvokerUrls.addAll(invokerUrls); //Cached invoker urls, convenient for comparison //缓存invokerUrls列表,便于交叉对比
  294. }
  295. // 忽略,若无 invokerUrls
  296. if (invokerUrls.isEmpty()) {
  297. return;
  298. }
  299. // 将传入的 invokerUrls ,转成新的 urlInvokerMap
  300. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
  301. // 转换出新的 methodInvokerMap
  302. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
  303. // state change
  304. // If the calculation is wrong, it is not processed. 如果计算错误,则不进行处理.
  305. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  306. logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
  307. return;
  308. }
  309. // 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合
  310. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  311. this.urlInvokerMap = newUrlInvokerMap;
  312. // 销毁不再使用的 Invoker 集合
  313. try {
  314. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
  315. } catch (Exception e) {
  316. logger.warn("destroyUnusedInvokers error. ", e);
  317. }
  318. }
  319. }
  320. /**
  321. * 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合
  322. */
  323. private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
  324. Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
  325. // 循环方法,按照 method + group 聚合 Invoker 集合
  326. for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
  327. String method = entry.getKey();
  328. List<Invoker<T>> invokers = entry.getValue();
  329. // 按照 Group 聚合 Invoker 集合的结果。其中,KEY:group VALUE:Invoker 集合。
  330. Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
  331. // 循环 Invoker 集合,按照 group 聚合 Invoker 集合
  332. for (Invoker<T> invoker : invokers) {
  333. String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
  334. List<Invoker<T>> groupInvokers = groupMap.get(group);
  335. if (groupInvokers == null) {
  336. groupInvokers = new ArrayList<Invoker<T>>();
  337. groupMap.put(group, groupInvokers);
  338. }
  339. groupInvokers.add(invoker);
  340. }
  341. // 大小为 1,使用第一个
  342. if (groupMap.size() == 1) {
  343. result.put(method, groupMap.values().iterator().next());
  344. // 大于 1,将每个 Group 的 Invoker 集合,创建成 Cluster Invoker 对象。
  345. } else if (groupMap.size() > 1) {
  346. List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
  347. for (List<Invoker<T>> groupList : groupMap.values()) {
  348. groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
  349. }
  350. result.put(method, groupInvokers);
  351. // 大小为 0 ,使用原有值
  352. } else {
  353. result.put(method, invokers);
  354. }
  355. }
  356. return result;
  357. }
  358. /**
  359. * @param urls
  360. * @return null : no routers ,do nothing
  361. * else :routers list
  362. */
  363. private List<Router> toRouters(List<URL> urls) {
  364. List<Router> routers = new ArrayList<Router>();
  365. if (urls == null || urls.isEmpty()) {
  366. return routers;
  367. }
  368. for (URL url : urls) {
  369. // 忽略,若是 "empty://" 。一般情况下,所有路由规则被删除时,有且仅有一条协议为 "empty://" 的路由规则 URL
  370. if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
  371. continue;
  372. }
  373. // 获得 "router"
  374. String routerType = url.getParameter(Constants.ROUTER_KEY);
  375. if (routerType != null && routerType.length() > 0) {
  376. url = url.setProtocol(routerType);
  377. }
  378. try {
  379. // 创建 Router 对象
  380. Router router = routerFactory.getRouter(url);
  381. // 添加到返回结果
  382. if (!routers.contains(router)) {
  383. routers.add(router);
  384. }
  385. } catch (Throwable t) {
  386. logger.error("convert router url to router error, url: " + url, t);
  387. }
  388. }
  389. return routers;
  390. }
  391. /**
  392. * 将服务提供者 URL 集合,转成 Invoker 集合。若该服务提供者 URL 已经转换,则直接复用,不重新引用。
  393. *
  394. * @param urls URL 集合
  395. * @return invokers
  396. */
  397. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  398. // 新的 `newUrlInvokerMap`
  399. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
  400. // 若为空,直接返回
  401. if (urls == null || urls.isEmpty()) {
  402. return newUrlInvokerMap;
  403. }
  404. // 已初始化的服务器提供 URL 集合
  405. Set<String> keys = new HashSet<String>();
  406. // 获得引用服务的协议
  407. String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
  408. // 循环服务提供者 URL 集合,转成 Invoker 集合
  409. for (URL providerUrl : urls) {
  410. // If protocol is configured at the reference side, only the matching protocol is selected
  411. // 如果 reference 端配置了 protocol ,则只选择匹配的 protocol
  412. if (queryProtocols != null && queryProtocols.length() > 0) {
  413. boolean accept = false;
  414. String[] acceptProtocols = queryProtocols.split(","); // 可配置多个协议
  415. for (String acceptProtocol : acceptProtocols) {
  416. if (providerUrl.getProtocol().equals(acceptProtocol)) {
  417. accept = true;
  418. break;
  419. }
  420. }
  421. if (!accept) {
  422. continue;
  423. }
  424. }
  425. // 忽略,若为 `empty://` 协议
  426. if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
  427. continue;
  428. }
  429. // 忽略,若应用程序不支持该协议
  430. if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
  431. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
  432. + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
  433. continue;
  434. }
  435. // 合并 URL 参数
  436. URL url = mergeUrl(providerUrl);
  437. // 忽略,若已经初始化
  438. String key = url.toFullString(); // The parameter urls are sorted
  439. if (keys.contains(key)) { // Repeated url
  440. continue;
  441. }
  442. // 添加到 `keys` 中
  443. keys.add(key);
  444. // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
  445. // 如果服务端 URL 发生变化,则重新 refer 引用
  446. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
  447. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
  448. if (invoker == null) { // Not in the cache, refer again 未在缓存中,重新引用
  449. try {
  450. // 判断是否开启
  451. boolean enabled;
  452. if (url.hasParameter(Constants.DISABLED_KEY)) {
  453. enabled = !url.getParameter(Constants.DISABLED_KEY, false);
  454. } else {
  455. enabled = url.getParameter(Constants.ENABLED_KEY, true);
  456. }
  457. // 若开启,创建 Invoker 对象
  458. if (enabled) {
  459. // 注意,引用服务
  460. invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
  461. }
  462. } catch (Throwable t) {
  463. logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
  464. }
  465. // 添加到 newUrlInvokerMap 中
  466. if (invoker != null) { // Put new invoker in cache
  467. newUrlInvokerMap.put(key, invoker);
  468. }
  469. } else { // 在缓存中,直接使用缓存的 Invoker 对象,添加到 newUrlInvokerMap 中
  470. newUrlInvokerMap.put(key, invoker);
  471. }
  472. }
  473. // 清空 keys
  474. keys.clear();
  475. return newUrlInvokerMap;
  476. }
  477. /**
  478. * Merge url parameters. the order is: override > -D >Consumer > Provider
  479. *
  480. * 合并 URL 参数,优先级为配置规则 > 服务消费者配置 > 服务提供者配置
  481. *
  482. * @param providerUrl 服务提供者 URL
  483. * @return 合并后的 URL
  484. */
  485. private URL mergeUrl(URL providerUrl) {
  486. // 合并消费端参数
  487. providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
  488. // 合并配置规则
  489. List<Configurator> localConfigurators = this.configurators; // local reference
  490. if (localConfigurators != null && !localConfigurators.isEmpty()) {
  491. for (Configurator configurator : localConfigurators) {
  492. providerUrl = configurator.configure(providerUrl);
  493. }
  494. }
  495. // 不检查连接是否成功,总是创建 Invoker !因为,启动检查,只有启动阶段需要。此时在检查,已经没必要了。
  496. providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
  497. // The combination of directoryUrl and override is at the end of notify, which can't be handled here
  498. // 仅合并提供者参数,因为 directoryUrl 与 override 合并是在 notify 的最后,这里不能够处理
  499. this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
  500. // 【忽略】因为是对 1.0 版本的兼容
  501. if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)
  502. && "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0
  503. //fix by tony.chenl DUBBO-44
  504. String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);
  505. if (path != null) {
  506. int i = path.indexOf('/');
  507. if (i >= 0) {
  508. path = path.substring(i + 1);
  509. }
  510. i = path.lastIndexOf(':');
  511. if (i >= 0) {
  512. path = path.substring(0, i);
  513. }
  514. providerUrl = providerUrl.setPath(path);
  515. }
  516. }
  517. // 返回服务提供者 URL
  518. return providerUrl;
  519. }
  520. private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {
  521. // 创建 Invocation 对象
  522. Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
  523. // 获得 Router 数组
  524. List<Router> routers = getRouters();
  525. // 根据路由规则,筛选 Invoker 集合
  526. if (routers != null) {
  527. for (Router router : routers) {
  528. if (router.getUrl() != null) {
  529. invokers = router.route(invokers, getConsumerUrl(), invocation);
  530. }
  531. }
  532. }
  533. return invokers;
  534. }
  535. /**
  536. * 将invokers列表转成与方法的映射关系
  537. *
  538. * @param invokersMap Invoker列表
  539. * @return Invoker与方法的映射关系
  540. */
  541. private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
  542. // 创建新的 `methodInvokerMap`
  543. Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
  544. // 创建 Invoker 集合
  545. List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
  546. // According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
  547. // 按服务提供者 URL 所声明的 methods 分类,兼容注册中心执行路由过滤掉的 methods
  548. if (invokersMap != null && invokersMap.size() > 0) {
  549. // 循环每个服务提供者 Invoker
  550. for (Invoker<T> invoker : invokersMap.values()) {
  551. String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY); // methods
  552. if (parameter != null && parameter.length() > 0) {
  553. String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
  554. if (methods != null && methods.length > 0) {
  555. // 循环每个方法,按照方法名为维度,聚合到 `methodInvokerMap` 中
  556. for (String method : methods) {
  557. if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { // 当服务提供者的方法为 "*" ,代表泛化调用
  558. List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
  559. if (methodInvokers == null) {
  560. methodInvokers = new ArrayList<Invoker<T>>();
  561. newMethodInvokerMap.put(method, methodInvokers);
  562. }
  563. methodInvokers.add(invoker);
  564. }
  565. }
  566. }
  567. }
  568. // 添加到 `invokersList` 中
  569. invokersList.add(invoker);
  570. }
  571. }
  572. // 路由全 `invokersList` ,匹配合适的 Invoker 集合
  573. List<Invoker<T>> newInvokersList = route(invokersList, null);
  574. // 添加 `newInvokersList` 到 `newMethodInvokerMap` 中,表示该服务提供者的全量 Invoker 集合
  575. newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
  576. // 循环,基于每个方法路由,匹配合适的 Invoker 集合
  577. if (serviceMethods != null && serviceMethods.length > 0) {
  578. for (String method : serviceMethods) {
  579. List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
  580. if (methodInvokers == null || methodInvokers.isEmpty()) {
  581. methodInvokers = newInvokersList;
  582. }
  583. newMethodInvokerMap.put(method, route(methodInvokers, method));
  584. }
  585. }
  586. // 循环排序每个方法的 Invoker 集合,并设置为不可变
  587. // sort and unmodifiable
  588. for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
  589. List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
  590. Collections.sort(methodInvokers, InvokerComparator.getComparator());
  591. newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
  592. }
  593. return Collections.unmodifiableMap(newMethodInvokerMap);
  594. }
  595. /**
  596. * Close all invokers
  597. */
  598. private void destroyAllInvokers() {
  599. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference 本地引用,避免并发问题
  600. if (localUrlInvokerMap != null) {
  601. // 循环 urlInvokerMap ,销毁所有服务提供者 Invoker
  602. for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) {
  603. try {
  604. invoker.destroy();
  605. } catch (Throwable t) {
  606. logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t);
  607. }
  608. }
  609. // urlInvokerMap 清空
  610. localUrlInvokerMap.clear();
  611. }
  612. // methodInvokerMap 置空
  613. methodInvokerMap = null;
  614. }
  615. /**
  616. * Check whether the invoker in the cache needs to be destroyed
  617. * If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak
  618. *
  619. * @param oldUrlInvokerMap
  620. * @param newUrlInvokerMap
  621. */
  622. private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
  623. // 防御性编程,目前不存在这个情况
  624. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  625. // 销毁所有服务提供者 Invoker
  626. destroyAllInvokers();
  627. return;
  628. }
  629. // check deleted invoker
  630. // 对比新老集合,计算需要销毁的 Invoker 集合
  631. List<String> deleted = null;
  632. if (oldUrlInvokerMap != null) {
  633. Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
  634. for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
  635. // 若不存在,添加到 `deleted` 中
  636. if (!newInvokers.contains(entry.getValue())) {
  637. if (deleted == null) {
  638. deleted = new ArrayList<String>();
  639. }
  640. deleted.add(entry.getKey());
  641. }
  642. }
  643. }
  644. // 若有需要销毁的 Invoker ,则进行销毁
  645. if (deleted != null) {
  646. for (String url : deleted) {
  647. if (url != null) {
  648. // 移除出 `urlInvokerMap`
  649. Invoker<T> invoker = oldUrlInvokerMap.remove(url);
  650. if (invoker != null) {
  651. try {
  652. // 销毁 Invoker
  653. invoker.destroy();
  654. if (logger.isDebugEnabled()) {
  655. logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
  656. }
  657. } catch (Exception e) {
  658. logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
  659. }
  660. }
  661. }
  662. }
  663. }
  664. }
  665. @Override
  666. public List<Invoker<T>> doList(Invocation invocation) {
  667. if (forbidden) {
  668. // 1. No service provider 2. Service providers are disabled
  669. throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
  670. "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
  671. + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
  672. }
  673. List<Invoker<T>> invokers = null;
  674. Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
  675. // 获得 Invoker 集合
  676. if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
  677. // 获得方法名、方法参数
  678. String methodName = RpcUtils.getMethodName(invocation);
  679. Object[] args = RpcUtils.getArguments(invocation);
  680. // 【第一】可根据第一个参数枚举路由
  681. if (args != null && args.length > 0 && args[0] != null
  682. && (args[0] instanceof String || args[0].getClass().isEnum())) {
  683. // invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
  684. invokers = localMethodInvokerMap.get(methodName + args[0]); // The routing can be enumerated according to the first parameter
  685. }
  686. // 【第二】根据方法名获得 Invoker 集合
  687. if (invokers == null) {
  688. invokers = localMethodInvokerMap.get(methodName);
  689. }
  690. // 【第三】使用全量 Invoker 集合。例如,`#$echo(name)` ,回声方法
  691. if (invokers == null) {
  692. invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
  693. }
  694. // 【第四】使用 `methodInvokerMap` 第一个 Invoker 集合。防御性编程。
  695. if (invokers == null) {
  696. Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
  697. if (iterator.hasNext()) {
  698. invokers = iterator.next();
  699. }
  700. }
  701. }
  702. return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
  703. }
  704. @Override
  705. public Class<T> getInterface() {
  706. return serviceType;
  707. }
  708. @Override
  709. public URL getUrl() {
  710. return this.overrideDirectoryUrl;
  711. }
  712. @Override
  713. public boolean isAvailable() {
  714. // 若已销毁,返回不可用
  715. if (isDestroyed()) {
  716. return false;
  717. }
  718. // 任意一个 Invoker 可用,则返回可用
  719. Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
  720. if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
  721. for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) {
  722. if (invoker.isAvailable()) {
  723. return true;
  724. }
  725. }
  726. }
  727. return false;
  728. }
  729. /**
  730. * Haomin: added for test purpose
  731. */
  732. public Map<String, Invoker<T>> getUrlInvokerMap() {
  733. return urlInvokerMap;
  734. }
  735. /**
  736. * Haomin: added for test purpose
  737. */
  738. public Map<String, List<Invoker<T>>> getMethodInvokerMap() {
  739. return methodInvokerMap;
  740. }
  741. /**
  742. * Invoker 排序器,根据 URL 升序
  743. */
  744. private static class InvokerComparator implements Comparator<Invoker<?>> {
  745. /**
  746. * 单例
  747. */
  748. private static final InvokerComparator comparator = new InvokerComparator();
  749. private InvokerComparator() {
  750. }
  751. public static InvokerComparator getComparator() {
  752. return comparator;
  753. }
  754. @Override
  755. public int compare(Invoker<?> o1, Invoker<?> o2) {
  756. return o1.getUrl().toString().compareTo(o2.getUrl().toString());
  757. }
  758. }
  759. /**
  760. *
  761. * Invoker 代理类,主要用于存储注册中心下发的 url 地址,用于重新重新 refer 时能够根据 providerURL queryMap overrideMap 重新组装
  762. *
  763. * @param <T>
  764. */
  765. private static class InvokerDelegate<T> extends InvokerWrapper<T> {
  766. /**
  767. * 服务提供者 URL
  768. *
  769. * 未经过配置合并
  770. */
  771. private URL providerUrl;
  772. public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
  773. super(invoker, url);
  774. this.providerUrl = providerUrl;
  775. }
  776. public URL getProviderUrl() {
  777. return providerUrl;
  778. }
  779. }
  780. }