概述

由前面【Dubbo】集群容错机制 文章可知,整个容错过程中,会先调用Directory#list方法来获取所有可用的Invoker列表。

【Dubbo】服务字典 - 图1

首先,来看一下Directory的类图,Directory是顶层的接口。AbstractDirectory封装了通用的实现逻辑。抽象类包含RegistryDirectoryStaticDirectory两个子类。

  • AbstractDirectory:封装了通用逻辑,
  • RegistryDirectoryDirectory的动态列表实现,会自动从注册中心更新Invoker列表、配置信息、路由列表。
  • StaticDirectoryDirectory的静态列表实现,即将传入的Invoker列表封装成静态的Directory对象,里面的列表不会改变。

源码解析

AbstractDirectory#list方法

AbstractDirectory#list方法主要包含以下逻辑:

  1. 调用子类的doList方法获取所有Invoker列表
  2. 遍历所有路由列表,过滤Invoker,返回过滤之后的Invoker

代码如下所示:

  1. public List<Invoker<T>> list(Invocation invocation) throws RpcException {
  2. if (destroyed) {
  3. throw new RpcException("Directory already destroyed .url: " + getUrl());
  4. }
  5. //调用子类实现的doList方法获取可用的Invoker列表
  6. List<Invoker<T>> invokers = doList(invocation);
  7. //获取路由列表
  8. List<Router> localRouters = this.routers; // local reference
  9. //如果路由列表不为空,则遍历路由列表,进行路由
  10. if (localRouters != null && !localRouters.isEmpty()) {
  11. for (Router router : localRouters) {
  12. try {
  13. if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
  14. invokers = router.route(invokers, getConsumerUrl(), invocation);
  15. }
  16. } catch (Throwable t) {
  17. logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
  18. }
  19. }
  20. }
  21. return invokers;
  22. }
  23. //模板方法
  24. protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

StaticDirectory

StaticDirectory#doList方法逻辑比较简单,因为其Invoker列表是不变的,所以该方法会直接返回传入的Invoker列表。代码如下所示:

  1. protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
  2. return invokers;
  3. }

RegistryDirectory

RegistryDirectory是一种动态服务目录,实现了NotifyListener接口。当注册中心服务配置发生变化后,RegistryDirectory可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory可根据配置变更信息刷新Invoker列表。

RegistryDirectory中有两条比较重要的逻辑线,第一条,子类实现父类的doList方法;第二条,框架与注册中心的订阅,并动态更新本地Invoker列表、路由列表、配置信息的逻辑。

doList方法

首先我们来看一下doList方法,该方法主要用来从本地缓存里获取Invokers列表,代码如下所示:

  1. public List<Invoker<T>> doList(Invocation invocation) {
  2. //当没有invoker时,该值会被置为true
  3. if (forbidden) {
  4. //省略noProvider异常抛出
  5. }
  6. List<Invoker<T>> invokers = null;
  7. //获取本地缓存,方法对应的Invoker列表
  8. Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
  9. if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
  10. //方法名
  11. String methodName = RpcUtils.getMethodName(invocation);
  12. //参数
  13. Object[] args = RpcUtils.getArguments(invocation);
  14. //第一个参数是否为String或者enum类型,通过方法名加第一个参数查询Invokers列表
  15. if (args != null && args.length > 0 && args[0] != null
  16. && (args[0] instanceof String || args[0].getClass().isEnum())) {
  17. invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
  18. }
  19. //通过方法名查询Invokers列表
  20. if (invokers == null) {
  21. invokers = localMethodInvokerMap.get(methodName);
  22. }
  23. //通过*查询Invokers列表
  24. if (invokers == null) {
  25. invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
  26. }
  27. //遍历map,取第一个
  28. if (invokers == null) {
  29. Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
  30. if (iterator.hasNext()) {
  31. invokers = iterator.next();
  32. }
  33. }
  34. }
  35. return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
  36. }

可以看到,doList方法主要是从本地缓存methodInvokerMap里获取Invoker列表,那么本地缓存是何时更新的呢?

notify方法

RegistryDirectory实现了NotifyListener接口,通过notify这个方法获取注册中心变更通知。该方法主要包含以下逻辑:

  1. 根据urlcategoryprotocolurl进行分类
  2. 将对应的列表转成RouterConfigurator列表
  3. 刷新Invoker列表

代码如下所示:

  1. public synchronized void notify(List<URL> urls) {
  2. //provider url, 路由url, 配置器url
  3. List<URL> invokerUrls = new ArrayList<URL>();
  4. List<URL> routerUrls = new ArrayList<URL>();
  5. List<URL> configuratorUrls = new ArrayList<URL>();
  6. //遍历urls列表,通过protocol和category将url放入不同的列表
  7. for (URL url : urls) {
  8. String protocol = url.getProtocol();
  9. String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  10. if (Constants.ROUTERS_CATEGORY.equals(category)
  11. || Constants.ROUTE_PROTOCOL.equals(protocol)) {
  12. routerUrls.add(url);
  13. } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
  14. || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
  15. configuratorUrls.add(url);
  16. } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
  17. invokerUrls.add(url);
  18. } //省略日志输出代码
  19. }
  20. //将configuratorUrls里的url转成configurators
  21. if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
  22. this.configurators = toConfigurators(configuratorUrls);
  23. }
  24. //将routerUrls里的url转成routers
  25. if (routerUrls != null && !routerUrls.isEmpty()) {
  26. List<Router> routers = toRouters(routerUrls);
  27. if (routers != null) { // null - do nothing
  28. setRouters(routers);
  29. }
  30. }
  31. List<Configurator> localConfigurators = this.configurators; // local reference
  32. // merge override parameters
  33. this.overrideDirectoryUrl = directoryUrl;
  34. if (localConfigurators != null && !localConfigurators.isEmpty()) {
  35. for (Configurator configurator : localConfigurators) {
  36. this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
  37. }
  38. }
  39. //刷新Invoker列表
  40. refreshInvoker(invokerUrls);
  41. }

refreshInvoker方法

接下来看一下refreshInvoker方法,该方法主要包含以下逻辑:

  1. 判断是否禁用所有服务
  2. url转成Invoker,再转成方法名对应的Invoker列表
  3. 缓存第2步结果到methodInvokerMap
  4. 销毁无用的Invoker

代码如下所示:

  1. private void refreshInvoker(List<URL> invokerUrls) {
  2. //如果invokerUrls列表只有一条记录且协议头=empty,则表示禁用所有服务
  3. if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  4. && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  5. //这个标志在doList方法的开头就用到了,如果为true,doList直接抛NoProvider异常
  6. this.forbidden = true; // Forbid to access
  7. //将缓存设置为空,并销毁所有Invokers
  8. this.methodInvokerMap = null; // Set the method invoker map to null
  9. destroyAllInvokers(); // Close all invokers
  10. }
  11. //正常的情况
  12. else {
  13. this.forbidden = false; // Allow to access
  14. Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
  15. //缓存invokerUrls
  16. if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
  17. invokerUrls.addAll(this.cachedInvokerUrls);
  18. } else {
  19. this.cachedInvokerUrls = new HashSet<URL>();
  20. this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
  21. }
  22. if (invokerUrls.isEmpty()) {
  23. return;
  24. }
  25. //将url转成Invoker
  26. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
  27. //转成方法名->Invoker列表
  28. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
  29. //转换出错了
  30. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
  31. //省略日志代码
  32. return;
  33. }
  34. //合并多个组的Invoker
  35. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  36. this.urlInvokerMap = newUrlInvokerMap;
  37. try {
  38. //销毁无用的Invoker
  39. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
  40. } //省略异常抛出代码
  41. }
  42. }

关于refreshInvoker方法中所调用的方法此处不做分析。

参考

Dubbo官网

《深入理解Apache Dubbo与实战》