:::warning 针对多实例特性,需要进行二次更新….. :::

Provider:服务暴露流程

先抛开 Dubbo 不谈,从直观上去看,服务暴露应当至少包含 3 个部分:

  1. 服务初始化和本地注册
  2. 启动本地 Socker Server,监听端口并分发请求
  3. 告知注册中心相关服务信息

我们以 Dubbo 源码 里的 org.apache.dubbo.demo.provider.ApiProvider 示例为出发点,去探寻 Dubbo 是如何实现上述 3 点的。

服务初始化

整体流程

该示例没有涉及到对 XML 和注解的解析,这块后面会有专题讲解:

  1. public class ApiProvider {
  2. public static void main(String[] args) throws InterruptedException {
  3. // ServiceConfig 是后续流程流转的中心节点
  4. // 此处直接新建了服务配置实例
  5. ServiceConfig<GreeterService> serviceConfig = new ServiceConfig<>();
  6. serviceConfig.setInterface(GreeterService.class);
  7. serviceConfig.setRef(new GreeterServiceImpl());
  8. DubboBootstrap bootstrap = DubboBootstrap.getInstance();
  9. // 应用信息配置
  10. bootstrap.application(new ApplicationConfig("dubbo-demo-triple-api-provider"))
  11. // 此处以经典的 zookeeper 为例
  12. .registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
  13. // 以 Dubbo 3.0 主推的 TRIPLE 协议为例
  14. .protocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051))
  15. // service 方法可以调用多次,注册多个服务
  16. // 注意:同样的服务不能重复注册!(判定条件为:接口类全限定名相同)
  17. .service(serviceConfig)
  18. // 所有配置就绪后,开启启动流程
  19. .start()
  20. // Block 住线程,在 Dubbo 服务关闭之前,
  21. // 这行代码之后的逻辑是没有执行机会的,使用需要慎重。
  22. .await();
  23. }
  24. }

进入 start 方法后,和服务暴露相关的流程如下: 「Dubbo」3.0 服务暴露和引用流程 - 图1 重点理解 doExportUrls 方法:

// => org.apache.dubbo.config.ServiceConfig#doExportUrls
private void doExportUrls() {
    // 通过 Dubbo SPI 获取服务仓库实例,注意该实例是单例,多次调用返回的是同一个对象
    // repository 可存储服务接口、服务消费者、服务提供者等信息
    ServiceRepository repository = ApplicationModel.getServiceRepository();
    ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
    // 注册服务提供者,入参将会被组织成 ProviderModel 实例
    repository.registerProvider(
        // 通过 interfaceName + group + version 三元组来确定唯一值
        getUniqueServiceName(),
        // setRef 设置的值,为服务实现类实例
        ref,
        // 与 serviceMetadata 存在较多重复属性,官方有合并之意
        serviceDescriptor,
        this,
        serviceMetadata
    );

    // 针对 Provider,ConfigValidationUtils#genCompatibleRegistries 方法会使协议产生裂变
    List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

    for (ProtocolConfig protocolConfig : protocols) {
        String pathKey = URL.buildKey(getContextPath(protocolConfig)
                                      .map(p -> p + "/" + path)
                                      .orElse(path), group, version);
        // In case user specified path, register service one more time to map it to path.
        repository.registerService(pathKey, interfaceClass);
        // 继续跟进
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}


// => org.apache.dubbo.config.utils.ConfigValidationUtils#genCompatibleRegistries
// 输入 
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=5258&registry=zookeeper&timestamp=1628648579153
// 由于未指定 register-mode 获得默认模式为 all
// 会基于输入,新增注册地址:
service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=5258&registry=zookeeper&timestamp=1628648579153

进入 doExportUrlsFor1Protocol

// => org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 
    Map<String, String> map = buildAttributes(protocolConfig);

    //init serviceMetadata attachments
    serviceMetadata.getAttachments().putAll(map);

    // 构建 tri 协议 URL,示例如下
    // tri://172.18.74.250:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=172.18.74.250&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=5258&release=&side=provider&timestamp=1628650332655
    URL url = buildUrl(protocolConfig, registryURLs, map);

    // 依据 SCOPE_KEY 判断是否需要暴露到本地(injvm 协议)和远程
    exportUrl(url, registryURLs);
}

exportUrl 会有两个分支:exportLocalexportRemote 来确定是本地协议还是远程协议,但最终都会回到 doExportUrl 方法,该方法功能几乎都是通过 Dubbo SPI 实现的。

// => org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData) {
    // 代理工厂:默认通过 JavassistProxyFactory 生成代理对象
    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
    if (withMetaData) {
        invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    }
    // 依据协议设计进行暴露
    Exporter<?> exporter = PROTOCOL.export(invoker);
    exporters.add(exporter);
}

动态生成代理 Invoker

Dubbo 3.0 支持通过 JDKProxy 和 Javassist 动态生成代理 Invoker,默认是 Javassist。
假如没有代理 Invoker 我们会碰到什么问题?这里可暂且抛开动态代理的细节,后文会有专题介绍,我们先只需要记住下图即可。
image.png

协议暴露

URL 统一模型

URL 是 Dubbo 中一个重要的领域模型(具象说法:拓展点可以理解的语言描述),了解它可以更加轻松的理解 Dubbo 的设计理念。
Dubbo URL 有如下意义:

  • Dubbo 中统一模型和公共契约,降低沟通和理解成本
  • Dubbo URL 用于在各个拓展点之间传播数据,同时也方便网络传输和序列化
  • 可扩展性强,URL 相当于参数的集合(相当于一个 Map),他所表达的含义比单个参数更丰富,当我们在扩展代码时,可以将新的参数追加到 URL 之中,而不需要改变入参,返参的结构

Dubbo URL 的定义和大家熟知的 RFC1738 并无大区别,一个标准的 URL 格式至多可以包含如下的几个部分:image.png
其中 search 部分也称作 Query,在 Dubbo 中也称作 parameters。

在 Dubbo 中,经常进行协议之间的派生和转换,例如 service-discovery-registry 派生出 zookeeper,确实方便灵活,但也要求开发者深入了解 Dubbo SPI 机制:

// input
// service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=19477&registry=zookeeper

// 操作,替换协议为 zookeeper,并移除 registry 参数
url = url.setProtocol("zookeeper").removeParameter("registry");


// output
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=19477

此外,与 URL 关系最深的是 Dubbo SPI 机制,它依赖 URL 协议进行动态加载和路由,实现各式功能。

例如当 @Adaptive 标注到方法之上的时候:

// => org.apache.dubbo.rpc.ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
@SPI("javassist")
public interface ProxyFactory {
    // 其中 PROXY_KEY = "proxy"
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;
}

在 Dubbo SPI 动态生成的 ProxyFactory$Adaptive 里,”proxy” 将作为从 URL parameters 部分获取值的 key:

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        // ......
        String extName = url.getParameter("proxy", "javassist");
        // ......
    }
}

如果没有指定字段,则系统会默认获取协议部分:

String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());

更多示例,将在后续源码涉及时作出讲解。

重点协议概览

Protocol

类型 协议 说明 格式样例:
本地协议 injvm

实现类:
org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
用于本地调用

export 方法是简单的将 invoker
�封装进 InjvmExporter 对象,并存储到 exporterMap 上,方便后期查询

�该协议也不涉及和远程注册中心的交互
injvm://127.0.0.1/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=172.18.74.250&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=5491&release=&side=provider&timestamp=1628651196115
远程协议 service-discovery-registry

实现类:
org.apache.dubbo.registry.integration.RegistryProtocol
作用总结如下:
- 用于服务暴露和引用的本地注册
- 用于 Provider 协议的本地端口监听启动以及路由信息注册,例如 启动默认的 triple 协议


注意:该协议也不涉及和远程注册中心的交互
service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=8637&registry=zookeeper&timestamp=1628666426705
registry

实现类:
org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol 继承自 RegistryProtocol
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=1900&registry=zookeeper&timestamp=1628858542432

tri

实现类:org.apache.dubbo.rpc.protocol.tri.TripleProtocol
Dubbo 3.0 新增,默认&主推协议


后续会有单独文章分析 Triple 协议
tri://192.168.31.95:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=192.168.31.95&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=1900&release=&service-name-mapping=true&side=provider&timestamp=1628858542445
? ? provider://192.168.31.95:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=192.168.31.95&bind.port=50051&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=1928&release=&service-name-mapping=true&side=provider&timestamp=1628858898318

RegistryFactory

类型 协议 说明 格式样例:

service-discovery-registry

实现类:
org.apache.dubbo.registry.client.ServiceDiscoveryRegistryFactory

service-discovery-registry 暴露流程

// => org.apache.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);


    // 重点:这里引出 TripleProtocol,这里的 Local 跟 injvm 协议不是一回事
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // ......
}

「Dubbo」3.0 服务暴露和引用流程 - 图4

Consumer: 服务引用流程

整体流程

先看下粗粒度的流程,相对服务暴露还是简明很多的: 「Dubbo」3.0 服务暴露和引用流程 - 图5我们重点关注 ReferenceConfig.get 方法(于 ServiceConfig.export 呼应): 「Dubbo」3.0 服务暴露和引用流程 - 图6其中 MigrationRuleHandler 是为了 Dubbo 3.0 的应用级服务发现保持向下兼容,而设计的迁移类,毕竟 Dubob 3.0 从立项之初,就定下了兼容 2.5~2.7 版本的设计目标,后面会有专文介绍应用级服务发现。

获取远端可访问服务

我们以 APPLICATION_FIRST 分支为例,观察下远端引用的一些细节,先看 migrateToApplicationFirstInvoker 方法:

// => org.apache.dubbo.registry.client.migration.MigrationInvoker#migrateToApplicationFirstInvoker
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
    CountDownLatch latch = new CountDownLatch(0);
    // 向远端注册中心订注册 consumer 信息,获取可用的服务提供者信息
    refreshInterfaceInvoker(latch);

    // ......
}

从 refreshInterfaceInvoker 跟进到 doCreateInvoker,此步之后,建立了到远端服务提供者的连接。

// => org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
    // Directory 顾名思义是目录的意思,ZooKeeper 的数据组织形式是不是目录树的形式
    // 所以,我们可以把本地 Directory 实例映射到远端 ZooKeeper 目录下,进行读写操作。
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    URL urlToRegistry = new ServiceConfigURL(
        parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
        parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(urlToRegistry);
        // 向远端注册订阅者信息,与服务提供者相呼应
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(urlToRegistry);

    // 订阅远端服务提供者信息,可以知晓访问地址、可供访问接口等信息,通过 directory.list 接口可以获得可访问的提供者列表
    directory.subscribe(toSubscribeUrl(urlToRegistry));

    // 使用 join 方法向 Invoker 注入 directory,相当于给 invoker 添加了一幅如何访问服务提供者的地图。
    return (ClusterInvoker<T>) cluster.join(directory);
}

再来看看 directory.subscribe 是如何订阅远端目录的,以 ZooKeeper 为例:

// => org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
public void subscribe(URL url) {
    setSubscribeUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
    // 调用远端注册中心接口进行订阅
    registry.subscribe(url, this);
}


// => org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {
    // ......
    if (ANY_VALUE.equals(url.getServiceInterface())) {
        // ......
    } else {
        CountDownLatch latch = new CountDownLatch(1);

        List<URL> urls = new ArrayList<>();
        // toCategoriesPath 最后转换出三个订阅目录
        // 0 = "/dubbo/org.apache.dubbo.demo.GreeterService/providers"
        // 1 = "/dubbo/org.apache.dubbo.demo.GreeterService/configurators"
        // 2 = "/dubbo/org.apache.dubbo.demo.GreeterService/routers"
        for (String path : toCategoriesPath(url)) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());

            // 这里的 listener 是实现了 NotifyListener 接口的 RegistryDirectory 实例
            // 有了事件通知,目录订阅内容实时刷新就变成了可能
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
            if (zkListener instanceof RegistryChildListenerImpl) {
                // 设置等待锁
                ((RegistryChildListenerImpl) zkListener).setLatch(latch);
            }
            // 创建订阅目录,此处是 
            // /dubbo/org.apache.dubbo.demo.GreeterService/providers
            zkClient.create(path, false);
            // 在 ZooKeeper 内使用了 watch 特性,当订阅数据产生变化的时候,最终会触发到 RegistryDirectory#notify 方法,此时是可以获取到注册中心的 providers 配置的。
            // 底层对应到 curator.getChildren().usingWatcher(listener).forPath(path);
            List<String> children = zkClient.addChildListener(path, zkListener);
            // children 返回是为了解决 prvoders 先启动情况下的初始化问题
            if (children != null) {
                // 如果配置不存在会出现 empty:// 协议
                urls.addAll(toUrlsWithEmpty(url, path, children));
            }
        }

        // 手动触发 RegistryDirectory#notify,进行 invokers 的初始化
        // 通知前在 org.apache.dubbo.registry.support.AbstractRegistry#notify 内会检查 providers 和 consumers 的接口是否匹配,具体方法为:
        // org.apache.dubbo.common.utils.UrlUtils#isMatch
        notify(url, listener, urls);

        // 假如此处不用 countDown,也可以用 for 循环触发函数实现,但不够优雅。
        // tells the listener to run only after the sync notification of main thread finishes.
        latch.countDown();
    }
    // ......
}


// => org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {
    // 两种触发方式:
    // 初始化的时候,代码手动触发
    // 完成初始化之后,监听配置中心变更触发

    // ......

    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    // 尝试获取配置,忽略 empty://
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // 尝试获取路由,忽略 empty://
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // 尝试获取服务提供者
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

    // 这里会涉及 3.0 新增的 MeshRuleAddressListenerInterceptor,暂时跳过
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }

    // 得到服务提供者远端访问路由,进行刷新和存储
    // 会依据 Protocol.refer 生成具体的 Consumer Invoker
    // 与 Protocol.export 生成具体的 Provider Exporter 相呼应
    refreshOverrideAndInvoker(providerURLs);
}

生成 InvocationHandler 代理

和 Provider 生成动态代理 Invoker 呼应,Consumer 端也需要生产动态代理 Invoker,可以通过下图一览两端之间的关系:
image.png

当用户调用 ReferenceConfig#get 的最后,最终会返回一个代理对象,由方法在 init 阶段提前生成:

// 调用 createProxy 并赋值给 ref 变量
// => org.apache.dubbo.config.ReferenceConfig#init
protected synchronized void init() {
    // ......
    ref = createProxy(map);
    // ......
}

// => org.apache.dubbo.config.ReferenceConfig#createProxy
private T createProxy(Map<String, String> map) {
    // ......

    // 调用代理工厂方法,默认使用 Javassist
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}


// => org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

在存在接口定义的情况下,个人还是建议使用 JDK Proxy:一是因为通用逻辑简单易懂,降低维护成本;二是永远不要低估了那群开发 Java 的计算机科学家的能力,虽然此刻 JDK Proxy 的性能略低于三方的字节码注入框架,但长远来看 JVM 往往会性能更甚一筹,而且 JVM 本身也是开放的,一直在吸纳开源社区的精华。总之,做时间的朋友。

重点关注下 InvokerInvocationHandler 的触发逻辑:

// => org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 过滤一些没必要触发 RPC 远程调用的方法
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }

    // 新建 RPC 远程调用访问参数
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
    // 服务名
    String serviceKey = url.getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);

    // invoker.getUrl() returns consumer url.
    RpcServiceContext.setRpcContext(url);


    if (consumerModel != null) {
        // 设置消费者的一些信息
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    // 触发 invoker 调用,此处是 MigrationInvoker
    return invoker.invoke(rpcInvocation).recreate();
}

// invoke 内部流转过程中,有一个关键节点与 “获取远端可访问服务” 绑定
// => org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
    // ......

    // 会调用 getDirectory().list(invocation) 获取远端可访问服务
    List<Invoker<T>> invokers = list(invocation);
    // 负载均衡策略
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 执行触发
    return doInvoke(invocation, invokers, loadbalance);
}

RPC 调用过程

至此,万事俱备,只欠东风,我们来看下 RPC 调用过程。

进入 doInvoke 方法:

// => org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 从 URL 获取方法重试次数,默认 3 次
    int len = calculateInvokeTimes(methodName);
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }

        // 客户端实现负载均衡,默认是 RandomLoadBalance
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getServiceContext().setInvokers((List) invoked);
        try {
            // Invoke 为 TripleInvoker 实例,Invocation 为 RpcInvocation 对象
            Result result = invokeWithContext(invoker, invocation);
            // ......
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // ......
}

针对 Triple 协议,在 TripleHttp2Protocol 内有两个方法,默认采用 Protobuf 进行序列化:

  • configServerPipeline - 通过在 Netty 的 Pipline 上添加 Handler,以绑定服务端编解码逻辑
  • configClientPipeline - 通过在 Netty 的 Pipline 上添加 Handler,以绑定客户端编解码逻辑

剩下就是基于 HTTP/2 的网络通讯和序列化的编解码,更多的是关于 Netty 的相关知识,此处不做展开。

Provider 侧路由