1、源码下载和编译

源码下载、编译和导入步骤如下:
(1)dubbo的项目在github中的地址为: https://github.com/apache/dubbo
(2)进入需要进行下载的地址,执行 git clone https://github.com/apache/dubbo.git
(3)为了防止master中代码不稳定,进入dubbo项目 cd dubbo 可以切入到最近的release分支 git checkout 2.7.6-release
(4)进行本地编译,进入dubbo项目 cd dubbo , 进行编译操作 mvn clean install -DskipTests
(5)使用IDE引入项目。

2、架构整体设计

2.1 Dubbo调用关系说明

image.png
在这里主要由四部分组成:

  • Provider: 暴露服务的服务提供方

Protocol 负责提供者和消费者之间协议交互数据
Service 真实的业务服务信息 可以理解成接口 和 实现
Container Dubbo的运行环境

  • Consumer: 调用远程服务的服务消费方

Protocol 负责提供者和消费者之间协议交互数据
Cluster 感知提供者端的列表信息
Proxy 可以理解成 提供者的服务调用代理类 由它接管 Consumer中的接口调用逻辑

  • Registry: 注册中心,用于作为服务发现和路由配置等工作,提供者和消费者都会在这里进行注册
  • Monitor: 用于提供者和消费者中的数据统计,比如调用频次,成功失败次数等信息。

启动和执行流程说明:
提供者端启动 容器负责把Service信息加载 并通过Protocol 注册到注册中心
消费者端启动 通过监听提供者列表来感知提供者信息 并在提供者发生改变时 通过注册中心及时
通知消费端
消费方发起 请求 通过Proxy模块
利用Cluster模块 来选择真实的要发送给的提供者信息
交由Consumer中的Protocol 把信息发送给提供者
提供者同样需要通过 Protocol 模块来处理消费者的信息
最后由真正的服务提供者 Service 来进行处理

2.2 整体的调用链路

**image.png

说明 淡绿色代表了 服务生产者的范围 淡蓝色 代表了服务消费者的范围 红色箭头代表了调用的方向

从外到内分为三层: 业务逻辑层 RPC层(远程过程调用) Remoting (远程数据传输层)

整体链路调用的流程:(面试中经常会问到)
1. 消费者通过Interface进行方法调用 统一交由消费者端的 Proxy 通过ProxyFactory 来进行代理对象的创建 使用到了jdk javassist技术
2.交给Filter 这个模块 做一个统一的过滤请求 在SPI案例中涉及过
3.接下来会进入最主要的Invoker调用逻辑
通过Directory 去配置中新读取信息 最终通过list方法获取所有的Invoker
通过Cluster模块 根据选择的具体路由规则 来选取Invoker列表
通过LoadBalance模块 根据负载均衡策略 选择一个具体的Invoker 来处理我们的请求
如果执行中出现错误 并且Consumer阶段配置了重试机制 则会重新尝试执行
4. 继续经过Filter 进行执行功能的前后封装 Invoker 选择具体的执行协议(此处Filter与第二步针对的对象不同,此处做一样计数,统计、上下文数据处理等)
5. 客户端 进行编码和序列化 然后发送数据
6. 到达Consumer中的 Server 在这里进行 反编码 和 反序列化的接收数据
7. 使用Exporter选择执行器
8. 交给Filter 进行一个提供者端的过滤 到达 Invoker 执行器
9. 通过Invoker 调用接口的具体实现 然后返回

2.3 Dubbo源码整体设计

image.png
图例说明:

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Confifig 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

调用链路结合上图需要记住
Dubbo源码整体设计与调用链路十分相似。只不过这里可以看到接口的一些具体实现以及左侧也有更为详细的层次划分,我们在后面的源码解析时也会着重介绍其中比较重要的模块实现。

分层介绍:(需记住)

  • Business 业务逻辑层
  • service 业务层 包括我们的业务代码 比如 接口 实现类 直接面向开发者RPC层 远程过程调用层

  • config 配置层 对外提供配置 以ServiceConfifig ReferenceConfifig 为核心 可以直接初始化配置类 也可以解析配置文件生成

  • proxy 服务代理层 无论是生产者 还是消费者 框架都会产生一个代理类 整个过程对上层透明 就是业务层对远程调用无感
  • registry 注册中心层 封装服务地址的注册与发现 以服务的URL为中心
  • cluster 路由层 (集群容错层) 提供了多个提供者的路由和负载均衡 并且它桥接注册中心 以Invoker为核心
  • monitor 监控层 RPC调用相关的信息 如 调用次数 成功失败的情况 调用时间等 在这一层完成
  • protocol 远程调用层 封装RPC调用 无论是服务的暴露 还是 服务的引用 都是在Protocol中作为主功能入口 负责Invoker的整个生命周期 Dubbo中所有的模型都向Invoker靠拢
  • Remoting层 远程数据传输层

  • exchange 信息交换层 封装请求和响应的模式 如把请求由同步 转换成异步

  • transport 网络传输层 统一网络传输的接口 比如 netty 和 mina 统一为一个网络传输接口
  • serialize 数据序列化层 负责管理整个框架中的数据传输的序列化 和反序列化

    3、服务注册与消费源码剖析

    3.1 注册中心Zookeeper剖析

    注册中心是Dubbo的重要组成部分,主要用于服务的注册与发现,我们可以选择Redis、Nacos、
    Zookeeper作为Dubbo的注册中心,Dubbo推荐用户使用Zookeeper作为注册中心。
    注册中心Zookeeper目录结构
    我们使用一个最基本的服务的注册与消费的Demo来进行说明。
    例如:只有一个提供者和消费者。 com.lagou.service.HelloService 为我们所提供的服务。

    1. public interface HelloService { String sayHello(String name); }

    则Zookeeper的目录结构如下:
    image.png

  • 可以在这里看到所有的都是在dubbo层级下的

  • dubbo跟节点下面是当前所拥有的接口名称,如果有多个接口,则会以多个子节点的形式展开
  • 每个服务下面又分别有四个配置项
    • consumers: 当前服务下面所有的消费者列表(URL)
    • providers: 当前服务下面所有的提供者列表(URL)
    • configuration: 当前服务下面的配置信息信息,provider或者consumer会通过读取这里的配置信息来获取配置
    • routers: 当消费者在进行获取提供者的时,会通过这里配置好的路由来进行适配匹配规则。
  • 可以看到,dubbo基本上很多时候都是通过URL的形式来进行交互获取数据的,在URL中也会保存很多的信息。后面也会对URL的规则做详细介绍。

image.png
通过这张图我们可以了解到如下信息:

  • 提供者会在 providers 目录下进行自身的进行注册。
  • 消费者会在 consumers 目录下进行自身注册,并且监听 provider 目录,以此通过监听提供者增加或者减少,实现服务发现。
  • Monitor模块会对整个服务级别做监听,用来得知整体的服务情况。以此就能更多的对整体情况做监控。

3.2 服务的注册过程分析

服务注册(暴露)过程(面试重点) 下图流程必须能说清楚
(注册中心挂了之后,如果远程服务没有调用过,没有本地缓存, 可以通过直连方式进行远程调用)
**image.png
查看ServiceConfifig类,重点查看 ProxyFactory 和 Protocol 类型的属性 以及 ref
ServiceConfig类中定义了静态属性Protocol和ProxyFactory 都是通过SPI方式获取到实例
image.png
ref的定义位于其父类ServiceConfigBase
image.png

ServiceConfig 类拿到对外提供服务的实际类 ref(如:HelloServiceImpl),然后通过ProxyFactory 接口实现类中的 getInvoker 方法使用 ref 生成一个 AbstractProxyInvoker 实例,到这一步就完成具体服务到 Invoker 的转化。
image.png
接下来就是 Invoker 转换到 Exporter 的过程。
其中会涉及到 RegistryService接口 RegistryFactory 接口 和 注册provider到注册中心流程的过程
(1)RegistryService代码解读,这块儿的代码比较简单,主要是对指定的路径进行注册,解绑,监听和
取消监听,查询操作。也是注册中心中最为基础的类。

  1. package org.apache.dubbo.registry;
  2. import org.apache.dubbo.common.URL;
  3. import java.util.List;
  4. /**
  5. * RegistryService. (SPI, Prototype, ThreadSafe)
  6. *
  7. * @see org.apache.dubbo.registry.Registry
  8. * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
  9. */
  10. public interface RegistryService {
  11. /*** 进行对URL的注册操作,比如provider,consumer,routers等 */
  12. void register(URL url);
  13. /*** 解除对指定URL的注册,比如provider,consumer,routers等 */
  14. void unregister(URL url);
  15. /*** 增加对指定URL的路径监听,当有变化的时候进行通知操作 */
  16. void subscribe(URL url, NotifyListener listener);
  17. /*** 解除对指定URL的路径监听,取消指定的listener */
  18. void unsubscribe(URL url, NotifyListener listener);
  19. /*** 查询指定URL下面的URL列表,比如查询指定服务下面的consumer列表 */
  20. List<URL> lookup(URL url);
  21. }

(2)我们再来看 RegistryFactory ,是通过他来生成真实的注册中心。通过这种方式,也可以保证一
个应用中可以使用多个注册中心。可以看到这里也是通过不同的protocol参数,来选择不同的协议。

  1. package org.apache.dubbo.registry;
  2. import org.apache.dubbo.common.URL;
  3. import org.apache.dubbo.common.extension.Adaptive;
  4. import org.apache.dubbo.common.extension.SPI;
  5. /**
  6. * RegistryFactory. (SPI, Singleton, ThreadSafe)
  7. *
  8. * @see org.apache.dubbo.registry.support.AbstractRegistryFactory
  9. */
  10. @SPI("dubbo")
  11. public interface RegistryFactory {
  12. /**
  13. * Connect to the registry
  14. * <p>
  15. * Connecting the registry needs to support the contract: <br>
  16. * 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
  17. * 2. Support username:password authority authentication on URL.<br>
  18. * 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
  19. * 4. Support file=registry.cache local disk file cache.<br>
  20. * 5. Support the timeout=1000 request timeout setting.<br>
  21. * 6. Support session=60000 session timeout or expiration settings.<br>
  22. *
  23. * @param url Registry address, is not allowed to be empty
  24. * @return Registry reference, never return empty value
  25. */
  26. /*** 获取注册中心地址 */
  27. @Adaptive({"protocol"})
  28. Registry getRegistry(URL url);
  29. }

(3)下面我们就来跟踪一下,一个服务是如何注册到注册中心上去的。其中比较关键的一个类是RegistryProtocol ,他负责管理整个注册中心相关协议。并且统一对外提供服务。这里我们主要以RegistryProtocol.export 方法作为入口,这个方法主要的作用就是将我们需要执行的信息注册并且导出。
根据上面调试可以看出,将具体服务ref封装成Invoker后是调用 RegistryProtocol.export方法来将Invoker转换成exporter的
image.png
进入RegistryProtocol.export方法
image.png
在这里获取一些相关参数 源码如下

  1. @Override
  2. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  3. // 获取注册中心的地址
  4. URL registryUrl = getRegistryUrl(originInvoker);
  5. // url to export locally // 获取当前提供者需要注册的地址
  6. URL providerUrl = getProviderUrl(originInvoker);
  7. // 获取进行注册override协议的访问地址
  8. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
  9. // 增加override的监听器
  10. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
  11. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  12. // 根据现有的override协议,对注册地址进行改写操作
  13. providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
  14. //export invoker
  15. // 对当前的服务进行本地导出
  16. // 完成后即可在看到本地的20880端口号已经启动,并且暴露服务
  17. final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
  18. // url to registry // 获取真实的注册中心, 比如我们常用的ZookeeperRegistry
  19. final Registry registry = getRegistry(originInvoker);
  20. // 获取当前服务需要注册到注册中心的providerURL,主要用于去除一些没有必要的参数(比如在本地导出时所使用的qos参数等值)
  21. final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
  22. // decide if we need to delay publish
  23. // 获取当前url是否需要进行注册参数
  24. boolean register = providerUrl.getParameter(REGISTER_KEY, true);
  25. if (register) {
  26. // 将当前的提供者注册到注册中心上去(重点)
  27. register(registryUrl, registeredProviderUrl);
  28. }
  29. // Deprecated! Subscribe to override rules in 2.6.x or before.
  30. // 对override协议进行注册,用于在接收到override请求时做适配,这种方式用于适配2.6.x及之 前的版本(混用)
  31. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  32. // 设置当前导出中的相关信息
  33. exporter.setRegisterUrl(registeredProviderUrl);
  34. exporter.setSubscribeUrl(overrideSubscribeUrl);
  35. //Ensure that a new exporter instance is returned every time export
  36. // 返回导出对象(对数据进行封装)
  37. return new DestroyableExporter<>(exporter);
  38. }

(4)下面我们再来看看 register 方法, 这里面做的比较简单,主要是从 RegistoryFactory 中获取注册中心,并且进行地址注册。
image.png

  1. public void register(URL registryUrl, URL registeredProviderUrl) {
  2. // 获取注册中心
  3. Registry registry = registryFactory.getRegistry(registryUrl);
  4. // 对当前的服务进行注册
  5. registry.register(registeredProviderUrl);
  6. // ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。
  7. // 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
  8. ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
  9. model.addStatedUrl(new ProviderModel.RegisterStatedURL( registeredProviderUrl, registryUrl, true ));
  10. }

(5)这里我们再跟里面的register方法之前,先来介绍一下Registry中的类目录结构
image.png
image.png
目录结构描述如下:

  • 在这里每个层级代表继承自父级
  • 这里面 RegistryService 就是我们之前所讲对外提供注册机制的接口。
  • 其下面 Registry 也同样是一个接口,是对 RegistryService 的集成,并且继承了 Node 接口,说明注册中心也是基于URL去做的。
  • AbstractRegistry 是对注册中心的封装,其主要会对本地注册地址的封装,主要功能在于远程注册中心不可用的时候,可以采用本地的注册中心来使用。
  • FailbackRegistry 从名字中可以看出来,失败自动恢复,后台记录失败请求,定时重发功能。
  • 最深的一层则更多是真实的第三方渠道实现。

继续跟踪register 发现进入了FailbackRegistry类
(6)下面我们来看一下在 FailbackRegistry 中的实现, 可以在这里看到他的主要作用是调用第三方的实现方式,并且在出现错误时增加重试机制。
image.png

  1. @Override
  2. public void register(URL url) {
  3. if (!acceptable(url)) {
  4. logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
  5. return;
  6. }
  7. // 上层调用 // 主要用于保存已经注册的地址列表
  8. super.register(url);
  9. // 将一些错误的信息移除(确保当前地址可以在出现一些错误的地址时可以被删除)
  10. removeFailedRegistered(url);
  11. removeFailedUnregistered(url);
  12. try {
  13. // Sending a registration request to the server side
  14. // 发送给第三方渠道进行注册操作
  15. doRegister(url);
  16. } catch (Exception e) {
  17. Throwable t = e;
  18. // 记录日志
  19. // If the startup detection is opened, the Exception is thrown directly.
  20. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  21. && url.getParameter(Constants.CHECK_KEY, true)
  22. && !CONSUMER_PROTOCOL.equals(url.getProtocol());
  23. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  24. if (check || skipFailback) {
  25. if (skipFailback) {
  26. t = t.getCause();
  27. }
  28. throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
  29. } else {
  30. logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  31. }
  32. // Record a failed registration request to a failed list, retry regularly
  33. // 后台异步进行重试,也是Failback比较关键的代码
  34. addFailedRegistered(url);
  35. }
  36. }
  37. //重试代码
  38. private void addFailedRegistered(URL url) {
  39. FailedRegisteredTask oldOne = failedRegistered.get(url);
  40. if (oldOne != null) {
  41. return;
  42. }
  43. FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
  44. oldOne = failedRegistered.putIfAbsent(url, newTask);
  45. if (oldOne == null) {
  46. // never has a retry task. then start a new task for retry.
  47. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
  48. }
  49. }

然后跟踪doRegistry方法 进入ZookeeperRegistry类
(7)下面我们再来看看Zookeeper中 doRegister 方法的实现, 可以看到这里的实现也比较简单,关键在于 toUrlPath 方法的实现。关于 dynamic 的值,我们也在上面有看到,他的URL也是true的。
image.png
(8)解读 toUrlPath 方法。可以看到这里的实现也是比较简单,也验证了我们之前的路径规则。

  1. private String toUrlPath(URL url) {
  2. // 分类地址 + url字符串
  3. return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
  4. }
  5. private String toCategoryPath(URL url) {
  6. // 服务名称 + category(在当前的例子中是providers)
  7. return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
  8. }
  9. private String toServicePath(URL url) {
  10. // 接口地址
  11. String name = url.getServiceInterface();
  12. if (ANY_VALUE.equals(name)) {
  13. return toRootPath();
  14. }
  15. //根节点+ 接口地址
  16. return toRootDir() + URL.encode(name);
  17. }

至此注册过程完成

3.3 URL规则详解 和 服务本地缓存

1、URL规则详解
URL地址如下:

  1. protocol://host:port/path?key=value&key=value
  2. provider://192.168.20.1:20883/com.lagou.service.HelloService?anyhost=true&application=service-provider2
  3. &bind.ip=192.168.20.1&bind.port=20883&category=configurators
  4. &check=fals e&deprecated=false&dubbo=2.0.2&dynamic=true
  5. &generic=false&interface=com.lagou.service

URL主要有以下几部分组成:

  • protocol: 协议,一般像我们的 provider 或者 consumer 在这里都是人为具体的协议
  • host: 当前 provider 或者其他协议所具体针对的地址,比较特殊的像 override 协议所指定的
  • host就是 0.0.0.0 代表所有的机器都生效
  • port: 和上面相同,代表所处理的端口号
  • path: 服务路径,在 provider 或者 consumer 等其他中代表着我们真实的业务接口
  • key=value: 这些则代表具体的参数,这里我们可以理解为对这个地址的配置。比如我们 provider中需要具体机器的服务应用名,就可以是一个配置的方式设置上去。

注意:Dubbo中的URL与java中的URL是有一些区别的,如下:

  • 这里提供了针对于参数的 parameter 的增加和减少(支持动态更改)
  • 提供缓存功能,对一些基础的数据做缓存.

2、服务本地缓存
在上面我们有讲到dubbo有对路径进行本地缓存操作。这里我们就对本地缓存进行讲解。
dubbo调用者需要通过注册中心(例如:ZK)注册信息,获取提供者,但是如果频繁往从ZK获取信息,肯定会存在单点故障问题,所以dubbo提供了将提供者信息缓存在本地的方法。
Dubbo在订阅注册中心的回调处理逻辑当中会保存服务提供者信息到本地缓存文件当中(同步/异步两种方式),以URL纬度进行全量保存。
Dubbo在服务引用过程中会创建registry对象并加载本地缓存文件,会优先订阅注册中心,订阅注册中心失败后会访问本地缓存文件内容获取服务提供信息。

(1)首先从构造方法讲起, 这里方法比较简单,主要用于确定需要保存的文件信息。并且从系统中读取已有的配置信息。

  1. public AbstractRegistry(URL url) {
  2. setUrl(url);
  3. // Start file save timer
  4. syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
  5. //默认保存路径(home/.dubbo/dubbo-registry-appName-address-port.cache)
  6. String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
  7. String filename = url.getParameter(FILE_KEY, defaultFilename);
  8. // 创建文件
  9. File file = null;
  10. if (ConfigUtils.isNotEmpty(filename)) {
  11. file = new File(filename);
  12. if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
  13. if (!file.getParentFile().mkdirs()) {
  14. throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
  15. }
  16. }
  17. }
  18. this.file = file;
  19. // When starting the subscription center,
  20. // we need to read the local cache file for future Registry fault tolerance processing.
  21. //加载已有的配置文件
  22. loadProperties();
  23. notify(url.getBackupUrls());
  24. }

通过跟踪代码notify(url.getBackupUrls())=>notify(url, listener, filterEmpty(url, urls))=>saveProperties(url);
(2)我们可以看到这个类中最为关键的一个属性为 properties ,我们可以通过寻找,得知这个属性的设置值只有在一个地方: saveProperties ,我们来看一下这个方法。这里也有一个我们值得关注的点,就是基于版本号的的更改。下面看saveProperties(url)方法

  1. private void saveProperties(URL url) {
  2. if (file == null) {
  3. return;
  4. }
  5. try {
  6. StringBuilder buf = new StringBuilder();
  7. // 获取所有通知到的地址
  8. Map<String, List<URL>> categoryNotified = notified.get(url);
  9. if (categoryNotified != null) {
  10. for (List<URL> us : categoryNotified.values()) {
  11. for (URL u : us) {
  12. //多个地址拼接
  13. if (buf.length() > 0) {
  14. buf.append(URL_SEPARATOR);
  15. }
  16. buf.append(u.toFullString());
  17. }
  18. }
  19. }
  20. // 保存数据
  21. properties.setProperty(url.getServiceKey(), buf.toString());
  22. // 保存为一个新的版本号
  23. // 通过这种机制可以保证后面保存的记录,在重试的时候,不会重试之前的版本
  24. long version = lastCacheChanged.incrementAndGet();
  25. // 需要同步保存则进行保存
  26. if (syncSaveFile) {
  27. doSaveProperties(version);
  28. } else {
  29. // 否则则异步去进行处理
  30. registryCacheExecutor.execute(new SaveProperties(version));
  31. }
  32. } catch (Throwable t) {
  33. logger.warn(t.getMessage(), t);
  34. }
  35. }

(3)下面我们再来看看是如何进行保存文件的。这里的实现也比较简单,主要比较关键的代码在于利用文件级锁来保证同一时间只会有一个线程执行(跨VM的锁)

  1. public void doSaveProperties(long version) {
  2. if (version < lastCacheChanged.get()) {
  3. return;
  4. }
  5. if (file == null) {
  6. return;
  7. }
  8. // Save
  9. try {
  10. // 使用文件级别所,来保证同一段时间只会有一个线程进行读取操作
  11. File lockfile = new File(file.getAbsolutePath() + ".lock");
  12. if (!lockfile.exists()) {
  13. lockfile.createNewFile();
  14. }
  15. try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
  16. FileChannel channel = raf.getChannel()) {
  17. // 利用文件锁来保证并发的执行的情况下,只会有一个线程执行成功(原因在于可能是跨 VM的)
  18. FileLock lock = channel.tryLock();
  19. if (lock == null) {
  20. throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
  21. }
  22. // Save
  23. try {
  24. if (!file.exists()) {
  25. file.createNewFile();
  26. }
  27. // 将配置的文件信息保存到文件中
  28. try (FileOutputStream outputFile = new FileOutputStream(file)) {
  29. properties.store(outputFile, "Dubbo Registry Cache");
  30. }
  31. } finally {
  32. // 解开文件锁
  33. lock.release();
  34. }
  35. }
  36. } catch (Throwable e) {
  37. // 执行出现错误时,则交给专门的线程去进行重试
  38. savePropertiesRetryTimes.incrementAndGet();
  39. if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
  40. logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
  41. savePropertiesRetryTimes.set(0);
  42. return;
  43. }
  44. if (version < lastCacheChanged.get()) {
  45. savePropertiesRetryTimes.set(0);
  46. return;
  47. } else {
  48. registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
  49. }
  50. logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
  51. }
  52. }

3.4 Dubbo 消费过程分析

服务消费流程(面试下图流程需说清楚)
image.png
首先 ReferenceConfig 类的 init 方法调用 createProxy() ,期间 使用 Protocol 调用 refer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来使用ProxyFactory把 Invoker转换为客户端需要的接口(如:HelloService)。
三个对象和ServiceConfig类中一样

image.png

跟踪init方法,内部调用createProxy创建接口代理对象
image.png
跟踪进入createProxy 在方法内调用refer()生成invoker对象,此处Protocol具体逻辑有子类实现 继续跟踪
image.png
=> org.apache.dubbo.registry.integration.RegistryProtocol#refer
image.png
跟踪进入doRefer方法 在此生成Invoker对象
image.png
看一下invoker对象里内容内部的directory包含了消费者请求的所有信息
image.png
在此生成invoker对象结束

继续往下跟踪createProxy方法 在末尾处调用了getProxy方法

image.png
根据invoker里包含的接口 生成对应的代理对象。此代理对象就是进行远程服务调用的对象,默认使用的javassist进行对象代理生成
image.png

image.png

4、Dubbo扩展SPI源码剖析

SPI在之前都有使用过,其中最重要的类就是 ExtensionLoader ,它是所有Dubbo中SPI的入口。
我们通过分析源码来学习 ExtensionLoader 是怎么加载的。这里会具体介绍
org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader 和
org.apache.dubbo.common.extension.ExtensionLoader.getExtension 方法。
getExtensionLoader 获取扩展点加载器 并加载所对应的所有的扩展点实现
getExtension 根据name 获取扩展的指定实现

4.1 getExtensionLoader 加载过程

先回顾一下dubbo中spi调用示例:
先根据类型获取ExtensionLoader,然后在获取支持的扩展点实现

  1. public class App
  2. {
  3. public static void main( String[] args )
  4. {
  5. //获取扩展加载器
  6. ExtensionLoader<HelloService> extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class);
  7. //遍历所有支持的扩展点
  8. final Set<String> supportedExtensions = extensionLoader.getSupportedExtensions();
  9. for(String extension : supportedExtensions){
  10. final String s = extensionLoader.getExtension(extension).sayHello();
  11. System.out.println(s);
  12. }
  13. }
  14. }

(1)是如何进行实例化 ExtensionLoader 的 加载扩展点的路径image.png

  1. private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);
  2. private static <T> boolean withExtensionAnnotation(Class<T> type) {
  3. // 包含`@SPI`注解在接口上
  4. return type.isAnnotationPresent(SPI.class);
  5. }
  6. @SuppressWarnings("unchecked")
  7. public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
  8. // 必须传入类型
  9. if (type == null) {
  10. throw new IllegalArgumentException("Extension type == null");
  11. }
  12. // 必须是接口类型
  13. if (!type.isInterface()) {
  14. throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
  15. }
  16. // 必须包含SPI的注解
  17. if (!withExtensionAnnotation(type)) {
  18. throw new IllegalArgumentException("Extension type (" + type +
  19. ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
  20. }
  21. // 尝试从已经加载过的数据中去读取(缓存功能) 此处缓存容器是ConcurrentHashMap
  22. ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  23. if (loader == null) {
  24. // 如果没有的话,才会进行初始化,并且放入到缓存汇总 每一个接口类型都对应一个ExtensionLoader对象
  25. EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
  26. loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
  27. }
  28. return loader;
  29. }

(2)具体看一下 ExtensionLoader 的构造器函数, 这里他的实现比较简单,并没有做太多的操作。主要是对type进行赋值操作,然后获取 ExtensionFactory 对象。

  1. private ExtensionLoader(Class<?> type) {
  2. this.type = type;
  3. // 这里需要对对象的工厂做额外的创建,可以看到扩展的工厂也是一个扩展点
  4. objectFactory = (type == ExtensionFactory.class ? null :
  5. ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
  6. }

(3)具体再来关注一下 ExtensionFactory 是做什么用的, 从这里我们可以大概的看出来,他是通过传入扩展点类型和真正的名称来获取扩展的。这里就和我们SPI中的具体名称实现相挂钩。

  1. package org.apache.dubbo.common.extension;
  2. /**
  3. * ExtensionFactory
  4. */
  5. @SPI
  6. public interface ExtensionFactory {
  7. /**
  8. * Get extension.
  9. *根据传入的扩展点类型 和名字 来获取扩展 这里和我们SPI中的具体实现挂钩了
  10. * @param type object type.
  11. * @param name object name.
  12. * @return object instance.
  13. */
  14. <T> T getExtension(Class<T> type, String name);
  15. }

(4)可以在 dubbo-common/src/main/resources/META-
INF/dubbo/internal/org.apache.dubbo.common.extension.ExtensionFactory 中看到,他默认
有两个实现的提供

  1. adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory
  2. spi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory

(5)可以看到在 AdaptiveExtensionFactory 中是使用 @Adaptive 标记的。这里可以通过类名基本
看出来,他其实最主要的作用是进行代理其他的ExtensionFactory。其中比较重要的方法在于getSupportedExtensions 方法,获取所有支持的扩展信息实现。

  1. @Adaptive
  2. public class AdaptiveExtensionFactory implements ExtensionFactory {
  3. private final List<ExtensionFactory> factories;
  4. public AdaptiveExtensionFactory() {
  5. // 获取针对ExtensionFactory扩展加载器
  6. ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
  7. List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
  8. //获取支持的扩展
  9. for (String name : loader.getSupportedExtensions()) {
  10. //对扩展进行缓存
  11. list.add(loader.getExtension(name));
  12. }
  13. factories = Collections.unmodifiableList(list);
  14. }
  15. @Override
  16. public <T> T getExtension(Class<T> type, String name) {
  17. for (ExtensionFactory factory : factories) {
  18. // 交给每个真实的ExtensionFactory来处理
  19. T extension = factory.getExtension(type, name);
  20. if (extension != null) {
  21. return extension;
  22. }
  23. }
  24. return null;
  25. }
  26. }

(6)获取所有支持的扩展信息实现: ExtensionLoader.getSupportedExtensions ,这里可以看到,其实比较关键的方法在于 getExtensionClasses 方法

  1. public Set<String> getSupportedExtensions() {
  2. // 获取所有的扩展类信息
  3. Map<String, Class<?>> clazzes = getExtensionClasses();
  4. // 返回所有的扩展点名称
  5. return Collections.unmodifiableSet(new TreeSet<>(clazzes.keySet()));
  6. }

(7)观察 getExtensionClasses 的实现,可以看到这里其实主要做的就是一件事情,防止重复被加
载,所以真正的的实现还需要专门去查看 loadExtensionClasses 方法
在我们通过名称获取扩展类之前,首先需要根据配置文件解析出扩展类名称到扩展类的映射关系表
classes之后再根据扩展项名称 从映射关系表中获取取对应的扩展类即可。相关过程代码分析如下

  1. private Map<String, Class<?>> getExtensionClasses() {
  2. // 从缓存中获取已加载的扩展类
  3. Map<String, Class<?>> classes = cachedClasses.get();
  4. // 双重检查
  5. if (classes == null) {
  6. // 为空的话,则锁住,标识只会被执行一次
  7. synchronized (cachedClasses) {
  8. classes = cachedClasses.get();
  9. if (classes == null) {
  10. // 进行加载信息 加载扩展类
  11. classes = loadExtensionClasses();
  12. cachedClasses.set(classes);
  13. }
  14. }
  15. }
  16. return classes;
  17. }

(8)观察 loadExtensionClasses 方法实现。这里主要做了两件事情。
1: 加载当前SPI的默认实现。
2:加载这个类的所有扩展点实现,并且按照name和Class对象的形式存储,下面会专门针对于cacheDefaultExtensionName 和 loadDirectory 方法做说明

观察 cacheDefaultExtensionName 方法实现。这里面的是实现比较简单,主要用于读取注解中value
值来获取到默认的名称。

  1. private void cacheDefaultExtensionName() {
  2. // 获取当前类是否包含SPI注解,一般走到这里都是拥有这个注解的
  3. final SPI defaultAnnotation = type.getAnnotation(SPI.class);
  4. if (defaultAnnotation == null) {
  5. return;
  6. }
  7. // 来获取其的value值,这个值主要的作用是设置这个SPI中的默认扩展名
  8. // 比如LoadBalance的默认实现就是random。就是通过这里进行的设置
  9. String value = defaultAnnotation.value();
  10. if ((value = value.trim()).length() > 0) {
  11. String[] names = NAME_SEPARATOR.split(value);
  12. if (names.length > 1) {
  13. throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
  14. + ": " + Arrays.toString(names));
  15. }
  16. if (names.length == 1) {
  17. cachedDefaultName = names[0];
  18. }
  19. }
  20. }

image.png

观察 loadDirectory 方法实现。这里的主要功能是从这个文件夹中寻找真正的文件列表,并且对其中
的文件内容解析并且放入到 extensionClasses Map中,具体解析文件的内容实现,还要参考
loadResource 实现。

  1. private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
  2. boolean extensionLoaderClassLoaderFirst, String... excludedPackages) {
  3. // 文件名称规则: 路径/包名.接口名
  4. String fileName = dir + type;
  5. try {
  6. // 寻找classloader和url列表
  7. Enumeration<java.net.URL> urls = null;
  8. // try to load from ExtensionLoader's ClassLoader first
  9. // 如果需要的话, 需要先从当前类的ClassLoader中寻找
  10. ClassLoader classLoader = findClassLoader();
  11. // try to load from ExtensionLoader's ClassLoader first
  12. if (extensionLoaderClassLoaderFirst) {
  13. ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
  14. if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
  15. urls = extensionLoaderClassLoader.getResources(fileName);
  16. }
  17. }
  18. // 如果找不到任何的URL列表,则继续尝试去其当前线程的ClassLoader中寻找
  19. if(urls == null || !urls.hasMoreElements()) {
  20. if (classLoader != null) {
  21. urls = classLoader.getResources(fileName);
  22. } else {
  23. urls = ClassLoader.getSystemResources(fileName);
  24. }
  25. }
  26. // 如果存在文件的话
  27. if (urls != null) {
  28. while (urls.hasMoreElements()) {
  29. // 遍历每一个资源文件,并且进行加载资源信息到extensionClasses, 主要功能 是读取文件内容
  30. java.net.URL resourceURL = urls.nextElement();
  31. loadResource(extensionClasses, classLoader, resourceURL, excludedPackages);
  32. }
  33. }
  34. } catch (Throwable t) {
  35. logger.error("Exception occurred when loading extension class (interface: " +
  36. type + ", description file: " + fileName + ").", t);
  37. }
  38. }

(9)进行观察 loadResource 实现,主要是用于读取文件操作,并且将方法交由 loadClass 来加载类信息。加载类信息也是最重要的方法所在。

  1. private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
  2. java.net.URL resourceURL, String... excludedPackages) {
  3. try {
  4. // 读取文件
  5. try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
  6. String line;
  7. while ((line = reader.readLine()) != null) {
  8. //#后是注释 截取文件#前面的内容
  9. final int ci = line.indexOf('#');
  10. if (ci >= 0) {
  11. line = line.substring(0, ci);
  12. }
  13. line = line.trim();
  14. // 如果有内容的话
  15. if (line.length() > 0) {
  16. try {
  17. // 则进行加载key=value的形式数据
  18. String name = null;
  19. int i = line.indexOf('=');
  20. if (i > 0) {
  21. name = line.substring(0, i).trim();
  22. line = line.substring(i + 1).trim();
  23. }
  24. if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
  25. // 对类信息进行加载操作
  26. loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
  27. }
  28. } catch (Throwable t) {
  29. IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
  30. exceptions.put(line, e);
  31. }
  32. }
  33. }
  34. }
  35. } catch (Throwable t) {
  36. logger.error("Exception occurred when loading extension class (interface: " +
  37. type + ", class file: " + resourceURL + ") in " + resourceURL, t);
  38. }
  39. }

(10)观察 loadClass 类的实现,可以看到这里是最终进行完成类映射的地方。关于Adaptive中的类
实现原理,我们放在这个章节中的偏后面进行细讲。

  1. private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
  2. // 当前扩展点的实现,必须是当前扩展接口的实现才可以
  3. if (!type.isAssignableFrom(clazz)) {
  4. throw new IllegalStateException("Error occurred when loading extension class (interface: " +
  5. type + ", class line: " + clazz.getName() + "), class "
  6. + clazz.getName() + " is not subtype of interface.");
  7. }
  8. // 如果是包含了Adaptive注解,则认为是需要对扩展点包装的方法,这里只做了存储操作,存储至 cachedAdaptiveClass中
  9. if (clazz.isAnnotationPresent(Adaptive.class)) {
  10. cacheAdaptiveClass(clazz);
  11. } else if (isWrapperClass(clazz)) {
  12. // 判断是否是wapper类型, 是否构造函数中有该接口类型的传入
  13. // wrapper类型的意思是,对当前的扩展点实现封装功能处理
  14. cacheWrapperClass(clazz);
  15. } else {
  16. clazz.getConstructor();
  17. // 寻找他是否已经定义过了名称, 这里就不继续往里面细看了,主要是获取当前类的
  18. // org.apache.dubbo.common.Extension注解,如果有的话就使用这个名称,否则的话就是用当前类的 简单名称
  19. if (StringUtils.isEmpty(name)) {
  20. name = findAnnotationName(clazz);
  21. if (name.length() == 0) {
  22. throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
  23. }
  24. }
  25. // 否则的话,就对这个名称和class做映射关系
  26. String[] names = NAME_SEPARATOR.split(name);
  27. if (ArrayUtils.isNotEmpty(names)) {
  28. // 如果当前类拥有Activate注解,则将其进行添加到cachedActivates对象中,意味 着需要执行
  29. cacheActivateClass(clazz, names[0]);
  30. // 进行名称映射保存
  31. for (String n : names) {
  32. cacheName(clazz, n);
  33. saveInExtensionClass(extensionClasses, clazz, n);
  34. }
  35. }
  36. }
  37. }

当执行完这几个方法之后,会对一下几个字段进行更新:

  • cachedAdaptiveClass: 当前Extension类型对应的AdaptiveExtension类型(只能一个)
  • cachedWrapperClasses: 当前Extension类型对应的所有Wrapper实现类型(无顺序)
  • cachedActivates: 当前Extension实现自动激活实现缓存(map,无序)
  • cachedNames: 扩展点实现类对应的名称(如配置多个名称则值为第一个)

4.2 根据name获取扩展点的方法 getExtension

(1) getExtension 方法实现。这里面同样主要作用是根据name对扩展点进行处理和进行加锁来创建真实的引用,其中都是有使用缓存来处理。

  1. public T getExtension(String name) {
  2. if (StringUtils.isEmpty(name)) {
  3. throw new IllegalArgumentException("Extension name == null");
  4. }
  5. // 获取当前SPi的默认扩展实现类
  6. if ("true".equals(name)) {
  7. return getDefaultExtension();
  8. }
  9. // 获取当前类的holder,实现原理和cachedClasses的方式相同,都是建立同一个引用后再进行 加锁
  10. final Holder<Object> holder = getOrCreateHolder(name);
  11. Object instance = holder.get();
  12. if (instance == null) {
  13. synchronized (holder) {
  14. instance = holder.get();
  15. if (instance == null) {
  16. //进行真正的实例创建
  17. instance = createExtension(name);
  18. holder.set(instance);
  19. }
  20. }
  21. }
  22. return (T) instance;
  23. }

holder结构:
image.png
(2)下面来看看 getOrCreateHolder 是如何保证缓存的。

  1. private Holder<Object> getOrCreateHolder(String name) {
  2. // 获取当前名称的和对象Holder的映射关系
  3. Holder<Object> holder = cachedInstances.get(name);
  4. if (holder == null) {
  5. // 如果不存在的话,则使用putIfAbsent的原子操作来设置值,
  6. // 这个值可以保证多线程的额情 况下有值的时候不处理,没有值进行保存
  7. cachedInstances.putIfAbsent(name, new Holder<>());
  8. // 获取真实的holder处理器
  9. holder = cachedInstances.get(name);
  10. }
  11. return holder;
  12. }

(3)然后我们再来看看 createExtension 的实现,他是具体根据扩展的class名称来进行创建实例的
类。这里也是创建扩展点类的主要实现。下面我们也对其他扩展点注册的方法做说明。

  1. private T createExtension(String name) {
  2. // 从配置文件中加载所有的扩展类 可以得到配置项名称 到配置类的映射关系
  3. Class<?> clazz = getExtensionClasses().get(name);
  4. if (clazz == null) {
  5. throw findException(name);
  6. }
  7. try {
  8. // 获取是否已经有实例了
  9. T instance = (T) EXTENSION_INSTANCES.get(clazz);
  10. if (instance == null) {
  11. // 没有的话,同样适用putIfAbsent的方式来保证只会创建一个对象并且保存
  12. EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
  13. instance = (T) EXTENSION_INSTANCES.get(clazz);
  14. }
  15. // 注入其他扩展点的实体,用于扩展点和其他的扩展点相互打通
  16. injectExtension(instance);
  17. // 进行遍历所有的包装类信息,分别对包装的类进行包装实例化,并且返回自身引用
  18. Set<Class<?>> wrapperClasses = cachedWrapperClasses;
  19. if (CollectionUtils.isNotEmpty(wrapperClasses)) {
  20. for (Class<?> wrapperClass : wrapperClasses) {
  21. instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
  22. }
  23. }
  24. initExtension(instance);
  25. return instance;
  26. } catch (Throwable t) {
  27. throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
  28. type + ") couldn't be instantiated: " + t.getMessage(), t);
  29. }
  30. }

(4) injectExtension 方法观察 此方法是将当前创建的扩展点实例中所需要注入的其他扩展点注入进来

  1. private T injectExtension(T instance) {
  2. if (objectFactory == null) {
  3. return instance;
  4. }
  5. try {
  6. // 遍历其中的所有方法
  7. for (Method method : instance.getClass().getMethods()) {
  8. // 是否是set方法
  9. // 1. 以"set"开头
  10. // 2. 参数长度为1
  11. // 3. 是公开的方法
  12. if (!isSetter(method)) {
  13. continue;
  14. }
  15. /**
  16. * Check {@link DisableInject} to see if we need auto injection for this property
  17. */
  18. // 如果设置了取消注册,则不进行处理
  19. if (method.getAnnotation(DisableInject.class) != null) {
  20. continue;
  21. }
  22. // 获取参数类型,并且非基础类型(String, Integer等类型)
  23. Class<?> pt = method.getParameterTypes()[0];
  24. if (ReflectUtils.isPrimitives(pt)) {
  25. continue;
  26. }
  27. try {
  28. // 获取需要set的扩展点名称
  29. String property = getSetterProperty(method);
  30. // 从ExtensionLoader中加载指定的扩展点
  31. // 比如有一个方法为setRandom(LoadBalance loadBalance),那么则以为着需 要加载负载均衡中名为random的扩展点
  32. Object object = objectFactory.getExtension(pt, property);
  33. if (object != null) {
  34. method.invoke(instance, object);
  35. }
  36. } catch (Exception e) {
  37. logger.error("Failed to inject via method " + method.getName()
  38. + " of interface " + type.getName() + ": " + e.getMessage(), e);
  39. }
  40. }
  41. } catch (Exception e) {
  42. logger.error(e.getMessage(), e);
  43. }
  44. return instance;
  45. }

4.3 Adaptive功能实现原理

Adaptive的主要功能是对所有的扩展点进行封装为一个类,通过URL传入参数的时动态选择需要使用的
扩展点。其底层的实现原理就是动态代理,这里我们会通过源码的形式告诉大家,他是如何通过动态代
理进行加载的。
先来看一下Adaptive示例:

  1. URL url = URL.valueOf("test://localhost/hello?hello.service=human");
  2. final HelloService adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();
  3. String s = adaptiveExtension.sayHello(url);
  4. System.out.println(s);

(1)这里我们 getAdaptiveExtension 方法讲起,这个里面就是真正获取该类。这里可以看到,
ExtentionLoader 中大量的使用了Holder和加锁的方式去进行唯一创建。

  1. public T getAdaptiveExtension() {
  2. // 和原先是用相同的方式,进行Holder和加锁的方式来保证只会被创建一次
  3. Object instance = cachedAdaptiveInstance.get();
  4. if (instance == null) {
  5. // 如果直接已经有创建并且错误的情况,则直接返回错误信息,防止重复没必要的创建
  6. if (createAdaptiveInstanceError != null) {
  7. throw new IllegalStateException("Failed to create adaptive instance: " +
  8. createAdaptiveInstanceError.toString(),
  9. createAdaptiveInstanceError);
  10. }
  11. synchronized (cachedAdaptiveInstance) {
  12. instance = cachedAdaptiveInstance.get();
  13. if (instance == null) {
  14. try {
  15. // 这里真实的进行创建操作
  16. instance = createAdaptiveExtension();
  17. cachedAdaptiveInstance.set(instance);
  18. } catch (Throwable t) {
  19. createAdaptiveInstanceError = t;
  20. throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
  21. }
  22. }
  23. }
  24. }
  25. return (T) instance;
  26. }

(2)这里我们继续从 createAdaptiveExtension 来去查看实现。这里主要是进行了一些方法封装。

  1. private T createAdaptiveExtension() {
  2. try {
  3. // 这里使用`getAdaptiveExtensionClass`方法进行构建类并且执行实例化
  4. // 然后和普通的其他class相同,依旧使用injectExtension进行扩展
  5. return injectExtension((T) getAdaptiveExtensionClass().newInstance());
  6. } catch (Exception e) {
  7. throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
  8. }
  9. }
  10. private Class<?> getAdaptiveExtensionClass() {
  11. // 确保已经加载了所有的扩展类信息
  12. getExtensionClasses();
  13. // 如果已经加载过了,则直接返回
  14. if (cachedAdaptiveClass != null) {
  15. return cachedAdaptiveClass;
  16. }
  17. // 否则进行构建操作
  18. return cachedAdaptiveClass = createAdaptiveExtensionClass();

(3)具体再来看 createAdaptiveExtensionClass 方法。这里主要是进行生成Adaptive的代码,并且
进行编译生成class。

  1. private T createAdaptiveExtension() {
  2. try {
  3. // 这里使用`getAdaptiveExtensionClass`方法进行构建类并且执行实例化
  4. // 然后和普通的其他class相同,依旧使用injectExtension进行扩展
  5. return injectExtension((T) getAdaptiveExtensionClass().newInstance());
  6. } catch (Exception e) {
  7. throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
  8. }
  9. }
  10. private Class<?> getAdaptiveExtensionClass() {
  11. // 确保已经加载了所有的扩展类信息
  12. getExtensionClasses();
  13. // 如果已经加载过了,则直接返回
  14. if (cachedAdaptiveClass != null) {
  15. return cachedAdaptiveClass;
  16. }
  17. // 否则进行构建操作
  18. return cachedAdaptiveClass = createAdaptiveExtensionClass();
  19. }

(3)具体再来看 createAdaptiveExtensionClass 方法。这里主要是进行生成Adaptive的代码,并且
进行编译生成class。

  1. private Class<?> createAdaptiveExtensionClass() {
  2. // 实例化一个新的Adaptive的代码生成器,并且进行代码生成
  3. String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
  4. // 获取类加载器
  5. ClassLoader classLoader = findClassLoader();
  6. // 通过扩展点,寻找编译器, 目前有Java自带的编译器和Javassist的编译器,这里不做细展开
  7. org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
  8. // 编译并且生成class
  9. return compiler.compile(code, classLoader);
  10. }

(4)具体通过 AdaptiveClassLoaderCodeGenerator.generate 方法来进行实现真正的代码生成。

  1. public String generate() {
  2. // no need to generate adaptive class since there's no adaptive method found.
  3. // 如果没有任何方法标记为Adaptive,则不做处理
  4. if (!hasAdaptiveMethod()) {
  5. throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
  6. }
  7. // 进行编写代码
  8. StringBuilder code = new StringBuilder();
  9. // 生成包信息
  10. code.append(generatePackageInfo());
  11. // 生成引用信息
  12. code.append(generateImports());
  13. // 生成类声明
  14. code.append(generateClassDeclaration());
  15. // 生成每一个方法
  16. Method[] methods = type.getMethods();
  17. for (Method method : methods) {
  18. code.append(generateMethod(method));
  19. }
  20. // 输出最后的一个"}"来结束当前类
  21. code.append("}");
  22. if (logger.isDebugEnabled()) {
  23. logger.debug(code.toString());
  24. }
  25. return code.toString();
  26. }

(5)这里主要对其中的每一个方法来做处理。具体主要观看 generateMethod 方法。这里的很多方法
主要是依赖反射机制去进行方法封装,最终拼接为一个最终字符串。其中最关键的方法在于
generateMethodContent 方法来生成代理功能。

  1. private String generateMethod(Method method) {
  2. // 方法返回类型
  3. String methodReturnType = method.getReturnType().getCanonicalName();
  4. // 方法名称
  5. String methodName = method.getName();
  6. // 生成方法内容
  7. String methodContent = generateMethodContent(method);
  8. // 生成参数列表
  9. String methodArgs = generateMethodArguments(method);
  10. // 方法抛出的异常
  11. String methodThrows = generateMethodThrows(method);
  12. // 格式化为一个字符串
  13. // public %s %s(%s) %s {
  14. // %s
  15. //}
  16. return String.format(CODE_METHOD_DECLARATION, methodReturnType, methodName, methodArgs, methodThrows, methodContent);
  17. }

(6) generateMethodContent 方法解读。这块儿更推荐通过debug的形式走进来, 看代码也更直接了
当(就可以直接按照常用功能中的SPI章节来debug)。这部分也是整个Adaptive中最为核心的代码,包括
获取扩展点名称并且执行。

  1. private String generateMethodContent(Method method) {
  2. // 获取Adaptive注解,只支持含有Adaptive注解方法处理
  3. Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
  4. StringBuilder code = new StringBuilder(512);
  5. // 没有该注解,直接抛出异常
  6. // throw new UnsupportedOperationException
  7. if (adaptiveAnnotation == null) {
  8. return generateUnsupported(method);
  9. } else {
  10. // 获取URL参数的所在位置
  11. int urlTypeIndex = getUrlTypeIndex(method);
  12. // found parameter in URL type
  13. if (urlTypeIndex != -1) {
  14. // Null Point check
  15. // 增加判断url不为空的代码
  16. code.append(generateUrlNullCheck(urlTypeIndex));
  17. } else {
  18. // did not find parameter in URL type
  19. // 获取这个方法中的所有参数列表
  20. // 寻找每个参数中是否有"get"开头的方法,并且返回值是URL的
  21. // 如果有则同样认定为找到,否则抛出异常
  22. code.append(generateUrlAssignmentIndirectly(method));
  23. }
  24. // 获取扩展点的适配名称
  25. String[] value = getMethodAdaptiveValue(adaptiveAnnotation);
  26. // 判断是否有参数是Invocation类
  27. // 这里判断的主要目的在于,拥有Invocation时,则获取扩展名称的方式发生改变
  28. // 存在Invocation时,通过getMethodParameter,否则通过getParameter来执行
  29. // getMethodParameter是dubboURL中特有的,用于将"test.a"转换为"testA"的形式
  30. boolean hasInvocation = hasInvocationArgument(method);
  31. // 增加有Invocation类时的不为空判断
  32. code.append(generateInvocationArgumentNullCheck(method));
  33. // 生成获取扩展点名称的方法
  34. code.append(generateExtNameAssignment(value, hasInvocation));
  35. // check extName == null?
  36. // 检查扩展点不能为空
  37. code.append(generateExtNameNullCheck(value));
  38. // 获取扩展点实现
  39. code.append(generateExtensionAssignment());
  40. // return statement
  41. // 返回扩展点中的真实调用
  42. code.append(generateReturnAndInvocation(method));
  43. }
  44. return code.toString();
  45. }

跟踪示例代码可以看出它是按照请求URL动态生成代理类代码 然后调用编译器进行编译获取字节码文件 从而获取代理对象,此代理对象内部生成了对应的按照名称获取不同扩展点实现的方法。这就是Adaptive功能的核心实现逻辑
image.png

5、集群容错源码剖析

在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、
Cluster Invoker、Directory、Router 和 LoadBalance 等。
(下图需记住)

image.png

集群工作过程可分为两个阶段,
第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
第二个阶段是在服务消费者进行远程调用
时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举
Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker列表,可简单
类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持
有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删
Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当
FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列
表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker
实例的 invoke 方法,进行真正的远程调用。

Dubbo 主要提供了这样几种容错方式:

  • Failover Cluster - 失败自动切换 失败时会重试其它服务器
  • Failfast Cluster - 快速失败 请求失败后快速返回异常结果 不重试
  • Failsafe Cluster - 失败安全 出现异常 直接忽略 会对请求做负载均衡
  • Failback Cluster - 失败自动恢复 请求失败后 会自动记录请求到失败队列中
  • Forking Cluster - 并行调用多个服务提供者 其中有一个返回 则立即返回结果

5.1 信息缓存接口Directory

Directory是Dubbo中的一个接口,主要用于缓存当前可以被调用的提供者列表信息。我们在消费者进行调用时都会通过这个接口来获取所有的提供者列表,再进行后续处理。
(1)我们先来看看 Directory 接口,这里比较简单,我们可以通过 Directory 来找到指定服务中的提供者信息列表。

  1. public interface Directory<T> extends Node {
  2. /**
  3. * get service type.
  4. * 获取服务的类型,也就是我们demo中所使用的HelloService
  5. * @return service type.
  6. */
  7. Class<T> getInterface();
  8. /**
  9. * list invokers.
  10. *根据本次调用的信息来获取所有可以被执行的提供者信息
  11. * @return invokers
  12. */
  13. List<Invoker<T>> list(Invocation invocation) throws RpcException;
  14. //获取所有的提供者信息
  15. List<Invoker<T>> getAllInvokers();
  16. URL getConsumerUrl();
  17. }

(2) Directory 中有一个基础的实现类,主要是对一些通用的方法封装,主要还是依靠真正的实现。
其中可以看看 AbstractDirectory中的list 方法。通过这个方式我们能知道,真正实现还是依靠于真
正子类汇总的 doList 方法。

  1. public abstract class AbstractDirectory<T> implements Directory<T> {
  2. @Override
  3. public List<Invoker<T>> list(Invocation invocation) throws RpcException {
  4. if (destroyed) {
  5. throw new RpcException("Directory already destroyed .url: " + getUrl());
  6. }
  7. // 交给子类进行处理
  8. return doList(invocation);
  9. }
  10. //抽象类
  11. protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
  12. }

(3)我们可以继续往下看,他的实现子类是 RegistryDirectory#doList 方法。我们可以看到这里的
实现也相对比较简单,主要依靠routerChain去决定真实返回的提供者列表。

  1. @Override
  2. public List<Invoker<T>> doList(Invocation invocation) {
  3. if (forbidden) {
  4. // 当没有提供者的时候会直接抛出异常
  5. // 1. No service provider 2. Service providers are disabled
  6. throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
  7. getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
  8. NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
  9. ", please check status of providers(disabled, not registered or in blacklist).");
  10. }
  11. if (multiGroup) {
  12. return this.invokers == null ? Collections.emptyList() : this.invokers;
  13. }
  14. List<Invoker<T>> invokers = null;
  15. try {
  16. // Get invokers from cache, only runtime routers will be executed.
  17. // 交给路由chain去处理并且获取所有的invokers
  18. invokers = routerChain.route(getConsumerUrl(), invocation);
  19. } catch (Throwable t) {
  20. logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
  21. }
  22. return invokers == null ? Collections.emptyList() : invokers;
  23. }

(4)路由是如何获取Invoker 列表的呢? 我们观察这个方法: RegistryProtocol.refer ,这里面也是
Invoker 生成的部分关键代码。 这部分代码在上节服务消费过程中已有体现

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  4. // 获取注册中心的地址URL(主要用于转换协议),比如我们是使用的zookeeper,那么他就会转换 为zookeeper://
  5. url = getRegistryUrl(url);
  6. // 获取注册中心配置信息
  7. Registry registry = registryFactory.getRegistry(url);
  8. if (RegistryService.class.equals(type)) {
  9. return proxyFactory.getInvoker((T) registry, type, url);
  10. }
  11. // 适用于多个分组时使用
  12. // group="a,b" or group="*"
  13. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
  14. String group = qs.get(GROUP_KEY);
  15. if (group != null && group.length() > 0) {
  16. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
  17. return doRefer(getMergeableCluster(), registry, type, url);
  18. }
  19. }
  20. // 真正进行构建invoker和我们上面的Directory
  21. return doRefer(cluster, registry, type, url);
  22. }

(5)下面我们再来仔细跟踪 doRefer 方法, 这里面就是最主要产生Directory并且注册和监听的主要代码逻辑。我们所需要的 routerChain 也是在这里产生的。

  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  2. // 实例化Directory
  3. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  4. // 设置注册中心和所使用的协议
  5. directory.setRegistry(registry);
  6. directory.setProtocol(protocol);
  7. // all attributes of REFER_KEY
  8. // all attributes of REFER_KEY
  9. // 生成监听路径URL
  10. Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
  11. URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
  12. if (directory.isShouldRegister()) {
  13. // 在Directory中设置监听的consumerurl地址
  14. directory.setRegisteredConsumerUrl(subscribeUrl);
  15. // 在注册中心中注册消费者URL
  16. // 也就是我们之前的Zookeeper的node中看到的consumer://
  17. registry.register(directory.getRegisteredConsumerUrl());
  18. }
  19. // 构建路由链
  20. directory.buildRouterChain(subscribeUrl);
  21. // 进行监听所有的的provider
  22. directory.subscribe(toSubscribeUrl(subscribeUrl));
  23. // 加入到集群中
  24. Invoker<T> invoker = cluster.join(directory);
  25. List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
  26. if (CollectionUtils.isEmpty(listeners)) {
  27. return invoker;
  28. }
  29. RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
  30. for (RegistryProtocolListener listener : listeners) {
  31. listener.onRefer(this, registryInvokerWrapper);
  32. }
  33. return registryInvokerWrapper;
  34. }

跟踪buildRouterChain
image.png
=>跟踪进入buildChain方法 此处根据URL获取到RouterFactory列表,然后依次用RouterFactory创建不同的Router的实现,组成一个Router调用链
image.png
通过routeFactory获取router 此处以我们自定义实现的Router为例
image.png
此处示例中自定义了Router实现,跟踪上图方法会进入自定义实现Router的类构造方法中
image.png
此自定义Router实现中我们重写了默认获取List的方法
image.png

(6)回到 RouterChain#route 方法。这里所做的就是依次遍历所有的路由,然后分别执行并返回。这
也就是整体的路由规则的实现。

  1. /**
  2. *
  3. * @param url
  4. * @param invocation
  5. * @return
  6. */
  7. public List<Invoker<T>> route(URL url, Invocation invocation) {
  8. // 所有的invoker列表
  9. List<Invoker<T>> finalInvokers = invokers;
  10. for (Router router : routers) {
  11. // 依次交给所有的路由规则进行选取路由列表
  12. finalInvokers = router.route(finalInvokers, url, invocation);
  13. }
  14. return finalInvokers;
  15. }

5.2 路由规则实现原理

这里我们具体来讲解一下 RouterChain 中的 Router 是如何实现的。这里我们主要对ConditionRouter 的实现来做说明。
(1)可以看到这个类中有两个属性比较关键,这两个属性也是判断的关键。

  1. // 是否满足判断条件
  2. protected Map<String, MatchPair> whenCondition;
  3. // 当满足判断条件时如何选择invokers
  4. protected Map<String, MatchPair> thenCondition;

(2)我们可以看到每一个 MatchPair 都有这两个属性,分别表示满足的条件和不满足的具体条件。

  1. final Set<String> matches = new HashSet<String>();
  2. final Set<String> mismatches = new HashSet<String>();

(3)下面我们先跳过生成规则的代码,先从如何选择Invoker入手。可以看到整体的流程也比较简单,
主要在于判断( matchWhen )和选择( matchThen )的逻辑。

  1. @Override
  2. public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
  3. throws RpcException {
  4. // 不启用的时,则直接返回提供者的列表
  5. if (!enabled) {
  6. return invokers;
  7. }
  8. // 如果不存在任何invoker则直接返回
  9. if (CollectionUtils.isEmpty(invokers)) {
  10. return invokers;
  11. }
  12. try {
  13. // 判断是否满足判断条件,不满足直接返回列表
  14. if (!matchWhen(url, invocation)) {
  15. return invokers;
  16. }
  17. List<Invoker<T>> result = new ArrayList<Invoker<T>>();
  18. if (thenCondition == null) {
  19. logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
  20. return result;
  21. }
  22. // 依次判断每一个invoker的url是否满足条件
  23. for (Invoker<T> invoker : invokers) {
  24. if (matchThen(invoker.getUrl(), url)) {
  25. result.add(invoker);
  26. }
  27. }
  28. // 如果不为空则直接返回
  29. if (!result.isEmpty()) {
  30. return result;
  31. } else if (force) {
  32. logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
  33. return result;
  34. }
  35. } catch (Throwable t) {
  36. logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
  37. }
  38. return invokers;
  39. }

(4)可以看到这里判断条件是尽量的简单,甚至可以为空,主要在于判定when 以及是否匹配then规
则。两者最终底层都是调用的 matchCondition 方法,我们在看他实现逻辑之前,先来确定一下
condition 中都存储了什么样的信息。

  1. boolean matchWhen(URL url, Invocation invocation) {
  2. // 1. 如果判断条件为空则直接认定为匹配
  3. // 2. 如果条件匹配则认定为匹配
  4. return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation);
  5. }
  6. private boolean matchThen(URL url, URL param) {
  7. // 判断条件不能为空并且匹配条件规则时才返回
  8. return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null);
  9. }

(5)最后我们再来看看他是如何生成整个路由规则的。我们跟进 ConditionRouter#init 方法,其中
比较关键的方法为 parseRule , when 和 then 的规则都是相同的。

  1. public void init(String rule) {
  2. try {
  3. // 必须包含规则配置 url参数中必须有rule=xxxx
  4. if (rule == null || rule.trim().length() == 0) {
  5. throw new IllegalArgumentException("Illegal route rule!");
  6. }
  7. rule = rule.replace("consumer.", "").replace("provider.", "");
  8. // 根据"=>"来判断when或者then条件
  9. int i = rule.indexOf("=>");
  10. String whenRule = i < 0 ? null : rule.substring(0, i).trim();
  11. String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
  12. // 分别根据"=>"来生成前后的规则
  13. Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule)
  14. ? new HashMap<String, MatchPair>() : parseRule(whenRule);
  15. Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule)
  16. ? null : parseRule(thenRule);
  17. // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
  18. this.whenCondition = when;
  19. this.thenCondition = then;
  20. } catch (ParseException e) {
  21. throw new IllegalStateException(e.getMessage(), e);
  22. }
  23. }

(6) parseRule 方法实现。(了解)

image.png
image.png
image.png
image.png
image.png

5.3 Cluster组件

下面我们再来看看再Dubbo中也是很关键的组件: Cluster 。它主要用于代理真正的Invoker执行时做
处理,提供了多种容错方案。
(1)我们首先来看一下他的接口定义。这里我们在之前也有见到过( doRefer ),那里也是真正调用它来
生成的位置。

  1. // 默认使用failover作为实现
  2. @SPI(FailoverCluster.NAME)
  3. public interface Cluster {
  4. // 生成一个新的invoker
  5. @Adaptive
  6. <T> Invoker<T> join(Directory<T> directory) throws RpcException;
  7. }

(2)下面我们再来看一下他提供的几种实现,Cluster和 Registry 采用了相同的类方式,都提供了
Abstract 类来进行统一的封装。

  1. @Override
  2. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  3. // 使用子类doJoin来真正生成Invoker
  4. // 并且使用拦截器的方式进行一层封装
  5. return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
  6. }
  7. protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException;
  8. // 对invoker进行封装
  9. private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
  10. AbstractClusterInvoker<T> last = clusterInvoker;
  11. // 获取所有的拦截器
  12. List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class)
  13. .getActivateExtension(clusterInvoker.getUrl(), key);
  14. if (!interceptors.isEmpty()) {
  15. for (int i = interceptors.size() - 1; i >= 0; i--) {
  16. // 对拦截器进行一层封装
  17. final ClusterInterceptor interceptor = interceptors.get(i);
  18. final AbstractClusterInvoker<T> next = last;
  19. last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
  20. }
  21. }
  22. return last;
  23. }

(3)下面我们看看 failover 里面都做了些什么。这里面比较简单,只是进行new了一个新的
Invoker。

  1. public class FailoverCluster extends AbstractCluster {
  2. public final static String NAME = "failover";
  3. @Override
  4. public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
  5. return new FailoverClusterInvoker<>(directory);
  6. }
  7. }

4)我们通过观察Invoker接口得知,其中最关键的方式是 invoke 方法。我们也可以看到,他也是通
过 Abstract 进行了一层封装。其中我们来看看他的 invoke 方法实现。
( AbstractClusterInvoker.invoke )

  1. @Override
  2. public Result invoke(final Invocation invocation) throws RpcException {
  3. // 检查是否已经关闭了
  4. checkWhetherDestroyed();
  5. // binding attachments into invocation.
  6. // 拷贝当前RPCContext中的附加信息到当前的invocation中
  7. Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
  8. if (contextAttachments != null && contextAttachments.size() != 0) {
  9. ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
  10. }
  11. // 找寻出所有支持的invoker,已经路由过的
  12. List<Invoker<T>> invokers = list(invocation);
  13. // 初始化负载均衡器
  14. LoadBalance loadbalance = initLoadBalance(invokers, invocation);
  15. // 用于适配异步请求使用
  16. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
  17. // 交给子类进行真正处理请求
  18. return doInvoke(invocation, invokers, loadbalance);
  19. }

(5)我们再来细关注一下 FailoverClusterInvoker 中的 doInvoke 方法是怎么做的。这里的方法也
是很简单,主要是通过for循环的形式来达到重试次数的目的,并且每次重试否会重新走一遍路由等规
则。

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. // 如果没有任何的invoker则抛出异常
  3. List<Invoker<T>> copyInvokers = invokers;
  4. checkInvokers(copyInvokers, invocation);
  5. // 获取这个方法最大的重试次数
  6. String methodName = RpcUtils.getMethodName(invocation);
  7. int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
  8. if (len <= 0) {
  9. len = 1;
  10. }
  11. // retry loop.
  12. // 通过for循环的形式表示可以调用的次数
  13. RpcException le = null; // last exception.
  14. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
  15. Set<String> providers = new HashSet<String>(len);
  16. for (int i = 0; i < len; i++) {
  17. //Reselect before retry to avoid a change of candidate `invokers`.
  18. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  19. if (i > 0) {
  20. // 每次都执行一次是否关闭当前consumer的判断
  21. checkWhetherDestroyed();
  22. // 重新获取一遍invoker列表
  23. copyInvokers = list(invocation);
  24. // check again
  25. // 再次进行一次存在invoker的检查
  26. checkInvokers(copyInvokers, invocation);
  27. }
  28. // 选择具体的invoker(交给负载均衡)
  29. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
  30. // 增加到已经执行过得invoker列表中
  31. invoked.add(invoker);
  32. RpcContext.getContext().setInvokers((List) invoked);
  33. try {
  34. // 让其真正的去进行执行操作
  35. Result result = invoker.invoke(invocation);
  36. if (le != null && logger.isWarnEnabled()) {
  37. logger.warn("Although retry the method " + methodName
  38. + " in the service " + getInterface().getName()
  39. + " was successful by the provider " + invoker.getUrl().getAddress()
  40. + ", but there have been failed providers " + providers
  41. + " (" + providers.size() + "/" + copyInvokers.size()
  42. + ") from the registry " + directory.getUrl().getAddress()
  43. + " on the consumer " + NetUtils.getLocalHost()
  44. + " using the dubbo version " + Version.getVersion() + ". Last error is: "
  45. + le.getMessage(), le);
  46. }
  47. return result;
  48. } catch (RpcException e) {
  49. if (e.isBiz()) { // biz exception.
  50. throw e;
  51. }
  52. le = e;
  53. } catch (Throwable e) {
  54. le = new RpcException(e.getMessage(), e);
  55. } finally {
  56. providers.add(invoker.getUrl().getAddress());
  57. }
  58. }
  59. throw new RpcException(le.getCode(), "Failed to invoke the method "
  60. + methodName + " in the service " + getInterface().getName()
  61. + ". Tried " + len + " times of the providers " + providers
  62. + " (" + providers.size() + "/" + copyInvokers.size()
  63. + ") from the registry " + directory.getUrl().getAddress()
  64. + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
  65. + Version.getVersion() + ". Last error is: "
  66. + le.getMessage(), le.getCause() != null ? le.getCause() : le);
  67. }

5.4 负载均衡实现原理

通过上面一小节我们也有看到在 Cluster 中经过负载选择真正 Invoker 的代码,这里我们再来细追踪
是如何负载均衡的。
(1)再次来看看 LoadBalance 接口定义。这里默认选择了随机算法。

  1. /**
  2. * LoadBalance. (SPI, Singleton, ThreadSafe)
  3. * <p>
  4. * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
  5. *
  6. * @see org.apache.dubbo.rpc.cluster.Cluster#join(Directory)
  7. */
  8. @SPI(RandomLoadBalance.NAME)
  9. public interface LoadBalance {
  10. /**
  11. * select one invoker in list.
  12. *
  13. * @param invokers invokers.
  14. * @param url refer url
  15. * @param invocation invocation.
  16. * @return selected invoker.
  17. */
  18. @Adaptive("loadbalance")
  19. <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
  20. }

(2) LoadBalance 依旧选择了 AbstractLoadBalance 作为基础的实现类。我们来关注一下 select
方法。这里的方法也比较简单,主要就是处理只有一个invoker的情况。

  1. @Override
  2. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  3. // 如果不存在任何的invoker则直接返回
  4. if (CollectionUtils.isEmpty(invokers)) {
  5. return null;
  6. }
  7. // 如果还有一个invoker则直接返回,不需要执行负载均衡
  8. if (invokers.size() == 1) {
  9. return invokers.get(0);
  10. }
  11. // 交给子类进行实现
  12. return doSelect(invokers, url, invocation);
  13. }
  14. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

(3)我们来看看默认的随机算法是如何实现的。这里主要比较关键在于权重的概念。通过权重选取了
不同的机器。

  1. @Override
  2. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  3. // 总计的invoker列表数量
  4. int length = invokers.size();
  5. // Every invoker has the same weight?
  6. // 默认每个invoker的权重都是相同的
  7. boolean sameWeight = true;
  8. // the weight of every invokers
  9. // 所有的权重列表
  10. int[] weights = new int[length];
  11. // the first invoker's weight
  12. // 首个invoker的权重信息
  13. int firstWeight = getWeight(invokers.get(0), invocation);
  14. weights[0] = firstWeight;
  15. // The sum of weights
  16. // 计算总共的权重,并且吧每一个invoker的权重进行设置到列表中
  17. int totalWeight = firstWeight;
  18. for (int i = 1; i < length; i++) {
  19. int weight = getWeight(invokers.get(i), invocation);
  20. // save for later use
  21. weights[i] = weight;
  22. // Sum
  23. totalWeight += weight;
  24. if (sameWeight && weight != firstWeight) {
  25. sameWeight = false;
  26. }
  27. }
  28. // 如果权重不相同
  29. if (totalWeight > 0 && !sameWeight) {
  30. // If (not every invoker has the same weight & at least one invoker's weight>0),
  31. // select randomly based on totalWeight.
  32. // 通过总共的权重来随机分配
  33. int offset = ThreadLocalRandom.current().nextInt(totalWeight);
  34. // Return a invoker based on the random value.
  35. // 看看最终落到哪一个机器上去
  36. for (int i = 0; i < length; i++) {
  37. offset -= weights[i];
  38. if (offset < 0) {
  39. return invokers.get(i);
  40. }
  41. }
  42. }
  43. // If all invokers have the same weight value or totalWeight=0, return evenly.
  44. // 如果权重都是相同的话,则随机选取一个即可
  45. return invokers.get(ThreadLocalRandom.current().nextInt(length));
  46. }

5.5 Invoker执行逻辑

Invoker就是我们真实执行请求的组件。这里也会衍生出我们真正的 Dubbo 或者 Grpc 等其他协议的请
求。
(1)我们依旧先来看一下接口定义:

  1. public interface Invoker<T> extends Node {
  2. // 当前执行器的服务接口是哪一个
  3. Class<T> getInterface();
  4. // 执行请求操作
  5. Result invoke(Invocation invocation) throws RpcException;
  6. }

(2) Invoker 同样具有 AbstractInvoker ,其中我们重点关注一下 invoke 方法。这里同样主要做的
是基础信息封装,并且将请求真正的子类。这里面的子类主要是 DubboInvoker

  1. @Override
  2. public Result invoke(Invocation inv) throws RpcException {
  3. // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
  4. // 判断系统是否已经关闭
  5. if (destroyed.get()) {
  6. logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
  7. + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
  8. }
  9. RpcInvocation invocation = (RpcInvocation) inv;
  10. invocation.setInvoker(this);
  11. // 设置所有的RPCContext中的附加信息
  12. if (CollectionUtils.isNotEmptyMap(attachment)) {
  13. invocation.addObjectAttachmentsIfAbsent(attachment);
  14. }
  15. Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
  16. if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
  17. invocation.addObjectAttachments(contextAttachments);
  18. }
  19. // 获取执行的模式
  20. invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
  21. // 设置执行id,主要用于适配异步模式使用
  22. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
  23. // 交给子类进行真正的执行
  24. AsyncRpcResult asyncResult;
  25. try {
  26. asyncResult = (AsyncRpcResult) doInvoke(invocation);
  27. } catch (InvocationTargetException e) { // biz exception
  28. // 业务异常
  29. Throwable te = e.getTargetException();
  30. if (te == null) {
  31. asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
  32. } else {
  33. if (te instanceof RpcException) {
  34. ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
  35. }
  36. asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
  37. }
  38. } catch (RpcException e) {
  39. // RPC阶段出现了异常
  40. if (e.isBiz()) {
  41. asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
  42. } else {
  43. throw e;
  44. }
  45. } catch (Throwable e) {
  46. asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
  47. }
  48. // 设置执行的结果信息
  49. RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
  50. // 返回结果
  51. return asyncResult;
  52. }

(3)我们再来看看 DubboInvoker 中的 doInvoke 方法。这里看到,他其实底层更多的是依赖底层真
正的客户端实现。

  1. @Override
  2. protected Result doInvoke(final Invocation invocation) throws Throwable {
  3. //RPC相关参数
  4. RpcInvocation inv = (RpcInvocation) invocation;
  5. final String methodName = RpcUtils.getMethodName(invocation);
  6. inv.setAttachment(PATH_KEY, getUrl().getPath());
  7. inv.setAttachment(VERSION_KEY, version);
  8. // 传输的客户端
  9. ExchangeClient currentClient;
  10. if (clients.length == 1) {
  11. currentClient = clients[0];
  12. } else {
  13. currentClient = clients[index.getAndIncrement() % clients.length];
  14. }
  15. try {
  16. // 是否返回值,也就是相当于发送了一个指令,不在乎服务端的返回
  17. // 通常适用于异步请求
  18. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  19. // 获取超时的配置
  20. int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
  21. if (isOneway) {
  22. // 如果不需要返回值信息(异步)
  23. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
  24. // 发送命令
  25. currentClient.send(inv, isSent);
  26. // 告知为异步的结果
  27. return AsyncRpcResult.newDefaultAsyncResult(invocation);
  28. } else {
  29. // 获取真正执行的线程池(ThreadPool中的SPI)
  30. ExecutorService executor = getCallbackExecutor(getUrl(), inv);
  31. // 发送请求并且等待结果
  32. CompletableFuture<AppResponse> appResponseFuture =
  33. currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
  34. // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
  35. // 在2.6.x中使用,设置完成的额结果信息
  36. FutureContext.getContext().setCompatibleFuture(appResponseFuture);
  37. // 创建新的结果信息并且返回
  38. AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
  39. result.setExecutor(executor);
  40. return result;
  41. }
  42. } catch (TimeoutException e) {
  43. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  44. } catch (RemotingException e) {
  45. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  46. }
  47. }

(4)我们再来详细追踪一下 ExchangeClient 接口,发现他有一个最关键的方法。位于
ExchangeChannel 接口中。

  1. // 真实的发送请求信息
  2. // request: RPCInvocation
  3. // timeout: 超时
  4. // executor: 业务线程池
  5. CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;

(5)他底层真实的实现方式是 HeaderExchangeClient 来进行处理的。可以看到他只是交给了真实的
渠道 channel 进行数据处理。

  1. @Override
  2. public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
  3. return channel.request(request, timeout, executor);
  4. }

(6)这里的 channel 会交 HeaderExchangeChannel 来进行封装。我们来看看他的实现。这里我们需
要细看一下Request对象的组成和DefaultFuture里面了做了些什么。这里的 Channle 对象是通过
Transporter 这个SPI进行创建的。这里我们先不细跟了。我们所熟知的Netty协议就是在这里创建
的。

  1. @Override
  2. public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
  3. if (closed) {
  4. throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
  5. }
  6. // create request.
  7. // 创建一个新的request对象
  8. Request req = new Request();
  9. req.setVersion(Version.getProtocolVersion());
  10. req.setTwoWay(true);
  11. req.setData(request);
  12. // 创建一个执行结果的回调信息处理
  13. DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
  14. try {
  15. // 交给真正的业务渠道进行处理
  16. // 这里的渠道是交给Transporter这个SPI进行创建的,其中我们的NettyChannel就是在这 里产生的
  17. channel.send(req);
  18. } catch (RemotingException e) {
  19. // 请求出现异常则取消当前的请求封装
  20. future.cancel();
  21. throw e;
  22. }
  23. return future;
  24. }

image.png