:::warning 针对多实例特性,需要进行二次更新….. :::
Provider:服务暴露流程
先抛开 Dubbo 不谈,从直观上去看,服务暴露应当至少包含 3 个部分:
- 服务初始化和本地注册
- 启动本地 Socker Server,监听端口并分发请求
- 告知注册中心相关服务信息
我们以 Dubbo 源码 里的 org.apache.dubbo.demo.provider.ApiProvider
示例为出发点,去探寻 Dubbo 是如何实现上述 3 点的。
服务初始化
整体流程
该示例没有涉及到对 XML 和注解的解析,这块后面会有专题讲解:
public class ApiProvider {
public static void main(String[] args) throws InterruptedException {
// ServiceConfig 是后续流程流转的中心节点
// 此处直接新建了服务配置实例
ServiceConfig<GreeterService> serviceConfig = new ServiceConfig<>();
serviceConfig.setInterface(GreeterService.class);
serviceConfig.setRef(new GreeterServiceImpl());
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
// 应用信息配置
bootstrap.application(new ApplicationConfig("dubbo-demo-triple-api-provider"))
// 此处以经典的 zookeeper 为例
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
// 以 Dubbo 3.0 主推的 TRIPLE 协议为例
.protocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051))
// service 方法可以调用多次,注册多个服务
// 注意:同样的服务不能重复注册!(判定条件为:接口类全限定名相同)
.service(serviceConfig)
// 所有配置就绪后,开启启动流程
.start()
// Block 住线程,在 Dubbo 服务关闭之前,
// 这行代码之后的逻辑是没有执行机会的,使用需要慎重。
.await();
}
}
进入 start 方法后,和服务暴露相关的流程如下:
重点理解
doExportUrls
方法:
// => org.apache.dubbo.config.ServiceConfig#doExportUrls
private void doExportUrls() {
// 通过 Dubbo SPI 获取服务仓库实例,注意该实例是单例,多次调用返回的是同一个对象
// repository 可存储服务接口、服务消费者、服务提供者等信息
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
// 注册服务提供者,入参将会被组织成 ProviderModel 实例
repository.registerProvider(
// 通过 interfaceName + group + version 三元组来确定唯一值
getUniqueServiceName(),
// setRef 设置的值,为服务实现类实例
ref,
// 与 serviceMetadata 存在较多重复属性,官方有合并之意
serviceDescriptor,
this,
serviceMetadata
);
// 针对 Provider,ConfigValidationUtils#genCompatibleRegistries 方法会使协议产生裂变
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// 继续跟进
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
// => org.apache.dubbo.config.utils.ConfigValidationUtils#genCompatibleRegistries
// 输入
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=5258®istry=zookeeper×tamp=1628648579153
// 由于未指定 register-mode 获得默认模式为 all
// 会基于输入,新增注册地址:
service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=5258®istry=zookeeper×tamp=1628648579153
进入 doExportUrlsFor1Protocol
:
// => org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//
Map<String, String> map = buildAttributes(protocolConfig);
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
// 构建 tri 协议 URL,示例如下
// tri://172.18.74.250:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=172.18.74.250&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=5258&release=&side=provider×tamp=1628650332655
URL url = buildUrl(protocolConfig, registryURLs, map);
// 依据 SCOPE_KEY 判断是否需要暴露到本地(injvm 协议)和远程
exportUrl(url, registryURLs);
}
exportUrl
会有两个分支:exportLocal
和 exportRemote
来确定是本地协议还是远程协议,但最终都会回到 doExportUrl
方法,该方法功能几乎都是通过 Dubbo SPI 实现的。
// => org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData) {
// 代理工厂:默认通过 JavassistProxyFactory 生成代理对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
}
// 依据协议设计进行暴露
Exporter<?> exporter = PROTOCOL.export(invoker);
exporters.add(exporter);
}
动态生成代理 Invoker
Dubbo 3.0 支持通过 JDKProxy 和 Javassist 动态生成代理 Invoker,默认是 Javassist。
假如没有代理 Invoker 我们会碰到什么问题?这里可暂且抛开动态代理的细节,后文会有专题介绍,我们先只需要记住下图即可。
协议暴露
URL 统一模型
URL 是 Dubbo 中一个重要的领域模型(具象说法:拓展点可以理解的语言描述),了解它可以更加轻松的理解 Dubbo 的设计理念。
Dubbo URL 有如下意义:
- Dubbo 中统一模型和公共契约,降低沟通和理解成本
- Dubbo URL 用于在各个拓展点之间传播数据,同时也方便网络传输和序列化
- 可扩展性强,URL 相当于参数的集合(相当于一个 Map),他所表达的含义比单个参数更丰富,当我们在扩展代码时,可以将新的参数追加到 URL 之中,而不需要改变入参,返参的结构
Dubbo URL 的定义和大家熟知的 RFC1738 并无大区别,一个标准的 URL 格式至多可以包含如下的几个部分:
其中 search 部分也称作 Query,在 Dubbo 中也称作 parameters。
在 Dubbo 中,经常进行协议之间的派生和转换,例如 service-discovery-registry
派生出 zookeeper
,确实方便灵活,但也要求开发者深入了解 Dubbo SPI 机制:
// input
// service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=19477®istry=zookeeper
// 操作,替换协议为 zookeeper,并移除 registry 参数
url = url.setProtocol("zookeeper").removeParameter("registry");
// output
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=19477
此外,与 URL 关系最深的是 Dubbo SPI 机制,它依赖 URL 协议进行动态加载和路由,实现各式功能。
例如当 @Adaptive
标注到方法之上的时候:
// => org.apache.dubbo.rpc.ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
@SPI("javassist")
public interface ProxyFactory {
// 其中 PROXY_KEY = "proxy"
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
}
在 Dubbo SPI 动态生成的 ProxyFactory$Adaptive
里,”proxy” 将作为从 URL parameters 部分获取值的 key:
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
// ......
String extName = url.getParameter("proxy", "javassist");
// ......
}
}
如果没有指定字段,则系统会默认获取协议部分:
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
更多示例,将在后续源码涉及时作出讲解。
重点协议概览
Protocol
类型 | 协议 | 说明 | 格式样例: |
---|---|---|---|
本地协议 | injvm 实现类: org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol |
用于本地调用 export 方法是简单的将 invoker �封装进 InjvmExporter 对象,并存储到 exporterMap 上,方便后期查询 � �该协议也不涉及和远程注册中心的交互 |
injvm://127.0.0.1/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=172.18.74.250&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=5491&release=&side=provider×tamp=1628651196115 |
远程协议 | service-discovery-registry 实现类: org.apache.dubbo.registry.integration.RegistryProtocol � |
作用总结如下: - 用于服务暴露和引用的本地注册 - 用于 Provider 协议的本地端口监听启动以及路由信息注册,例如 启动默认的 triple 协议 注意:该协议也不涉及和远程注册中心的交互 |
service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=8637®istry=zookeeper×tamp=1628666426705 |
registry 实现类: org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol 继承自 RegistryProtocol |
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-triple-api-provider&dubbo=2.0.2&pid=1900®istry=zookeeper×tamp=1628858542432 | ||
tri 实现类:org.apache.dubbo.rpc.protocol.tri.TripleProtocol � |
Dubbo 3.0 新增,默认&主推协议 后续会有单独文章分析 Triple 协议 |
tri://192.168.31.95:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=192.168.31.95&bind.port=50051&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=1900&release=&service-name-mapping=true&side=provider×tamp=1628858542445 | |
? | ? | provider://192.168.31.95:50051/org.apache.dubbo.demo.GreeterService?anyhost=true&application=dubbo-demo-triple-api-provider&bind.ip=192.168.31.95&bind.port=50051&category=configurators&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.GreeterService&metadata-type=remote&methods=sayHello&pid=1928&release=&service-name-mapping=true&side=provider×tamp=1628858898318 |
RegistryFactory
类型 | 协议 | 说明 | 格式样例: |
---|---|---|---|
service-discovery-registry 实现类: org.apache.dubbo.registry.client.ServiceDiscoveryRegistryFactory |
service-discovery-registry 暴露流程
// => org.apache.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 重点:这里引出 TripleProtocol,这里的 Local 跟 injvm 协议不是一回事
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// ......
}
Consumer: 服务引用流程
整体流程
先看下粗粒度的流程,相对服务暴露还是简明很多的:
我们重点关注
ReferenceConfig.get
方法(于 ServiceConfig.export
呼应):
其中
MigrationRuleHandler
是为了 Dubbo 3.0 的应用级服务发现保持向下兼容,而设计的迁移类,毕竟 Dubob 3.0 从立项之初,就定下了兼容 2.5~2.7 版本的设计目标,后面会有专文介绍应用级服务发现。
获取远端可访问服务
我们以 APPLICATION_FIRST
分支为例,观察下远端引用的一些细节,先看 migrateToApplicationFirstInvoker
方法:
// => org.apache.dubbo.registry.client.migration.MigrationInvoker#migrateToApplicationFirstInvoker
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(0);
// 向远端注册中心订注册 consumer 信息,获取可用的服务提供者信息
refreshInterfaceInvoker(latch);
// ......
}
从 refreshInterfaceInvoker 跟进到 doCreateInvoker
,此步之后,建立了到远端服务提供者的连接。
// => org.apache.dubbo.registry.integration.RegistryProtocol#doCreateInvoker
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
// Directory 顾名思义是目录的意思,ZooKeeper 的数据组织形式是不是目录树的形式
// 所以,我们可以把本地 Directory 实例映射到远端 ZooKeeper 目录下,进行读写操作。
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
// 向远端注册订阅者信息,与服务提供者相呼应
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
// 订阅远端服务提供者信息,可以知晓访问地址、可供访问接口等信息,通过 directory.list 接口可以获得可访问的提供者列表
directory.subscribe(toSubscribeUrl(urlToRegistry));
// 使用 join 方法向 Invoker 注入 directory,相当于给 invoker 添加了一幅如何访问服务提供者的地图。
return (ClusterInvoker<T>) cluster.join(directory);
}
再来看看 directory.subscribe
是如何订阅远端目录的,以 ZooKeeper
为例:
// => org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
public void subscribe(URL url) {
setSubscribeUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 调用远端注册中心接口进行订阅
registry.subscribe(url, this);
}
// => org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {
// ......
if (ANY_VALUE.equals(url.getServiceInterface())) {
// ......
} else {
CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
// toCategoriesPath 最后转换出三个订阅目录
// 0 = "/dubbo/org.apache.dubbo.demo.GreeterService/providers"
// 1 = "/dubbo/org.apache.dubbo.demo.GreeterService/configurators"
// 2 = "/dubbo/org.apache.dubbo.demo.GreeterService/routers"
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 这里的 listener 是实现了 NotifyListener 接口的 RegistryDirectory 实例
// 有了事件通知,目录订阅内容实时刷新就变成了可能
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
// 设置等待锁
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
// 创建订阅目录,此处是
// /dubbo/org.apache.dubbo.demo.GreeterService/providers
zkClient.create(path, false);
// 在 ZooKeeper 内使用了 watch 特性,当订阅数据产生变化的时候,最终会触发到 RegistryDirectory#notify 方法,此时是可以获取到注册中心的 providers 配置的。
// 底层对应到 curator.getChildren().usingWatcher(listener).forPath(path);
List<String> children = zkClient.addChildListener(path, zkListener);
// children 返回是为了解决 prvoders 先启动情况下的初始化问题
if (children != null) {
// 如果配置不存在会出现 empty:// 协议
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 手动触发 RegistryDirectory#notify,进行 invokers 的初始化
// 通知前在 org.apache.dubbo.registry.support.AbstractRegistry#notify 内会检查 providers 和 consumers 的接口是否匹配,具体方法为:
// org.apache.dubbo.common.utils.UrlUtils#isMatch
notify(url, listener, urls);
// 假如此处不用 countDown,也可以用 for 循环触发函数实现,但不够优雅。
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
// ......
}
// => org.apache.dubbo.registry.integration.RegistryDirectory#notify
public synchronized void notify(List<URL> urls) {
// 两种触发方式:
// 初始化的时候,代码手动触发
// 完成初始化之后,监听配置中心变更触发
// ......
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
// 尝试获取配置,忽略 empty://
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 尝试获取路由,忽略 empty://
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 尝试获取服务提供者
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
// 这里会涉及 3.0 新增的 MeshRuleAddressListenerInterceptor,暂时跳过
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
// 得到服务提供者远端访问路由,进行刷新和存储
// 会依据 Protocol.refer 生成具体的 Consumer Invoker
// 与 Protocol.export 生成具体的 Provider Exporter 相呼应
refreshOverrideAndInvoker(providerURLs);
}
生成 InvocationHandler 代理
和 Provider 生成动态代理 Invoker 呼应,Consumer 端也需要生产动态代理 Invoker,可以通过下图一览两端之间的关系:
当用户调用 ReferenceConfig#get
的最后,最终会返回一个代理对象,由方法在 init
阶段提前生成:
// 调用 createProxy 并赋值给 ref 变量
// => org.apache.dubbo.config.ReferenceConfig#init
protected synchronized void init() {
// ......
ref = createProxy(map);
// ......
}
// => org.apache.dubbo.config.ReferenceConfig#createProxy
private T createProxy(Map<String, String> map) {
// ......
// 调用代理工厂方法,默认使用 Javassist
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
// => org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
在存在接口定义的情况下,个人还是建议使用 JDK Proxy:一是因为通用逻辑简单易懂,降低维护成本;二是永远不要低估了那群开发 Java 的计算机科学家的能力,虽然此刻 JDK Proxy 的性能略低于三方的字节码注入框架,但长远来看 JVM 往往会性能更甚一筹,而且 JVM 本身也是开放的,一直在吸纳开源社区的精华。总之,做时间的朋友。
重点关注下 InvokerInvocationHandler
的触发逻辑:
// => org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 过滤一些没必要触发 RPC 远程调用的方法
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
// 新建 RPC 远程调用访问参数
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
// 服务名
String serviceKey = url.getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// invoker.getUrl() returns consumer url.
RpcServiceContext.setRpcContext(url);
if (consumerModel != null) {
// 设置消费者的一些信息
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
// 触发 invoker 调用,此处是 MigrationInvoker
return invoker.invoke(rpcInvocation).recreate();
}
// invoke 内部流转过程中,有一个关键节点与 “获取远端可访问服务” 绑定
// => org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
// ......
// 会调用 getDirectory().list(invocation) 获取远端可访问服务
List<Invoker<T>> invokers = list(invocation);
// 负载均衡策略
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行触发
return doInvoke(invocation, invokers, loadbalance);
}
RPC 调用过程
至此,万事俱备,只欠东风,我们来看下 RPC 调用过程。
进入 doInvoke
方法:
// => org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 从 URL 获取方法重试次数,默认 3 次
int len = calculateInvokeTimes(methodName);
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 客户端实现负载均衡,默认是 RandomLoadBalance
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getServiceContext().setInvokers((List) invoked);
try {
// Invoke 为 TripleInvoker 实例,Invocation 为 RpcInvocation 对象
Result result = invokeWithContext(invoker, invocation);
// ......
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// ......
}
针对 Triple 协议,在 TripleHttp2Protocol
内有两个方法,默认采用 Protobuf 进行序列化:
- configServerPipeline - 通过在 Netty 的 Pipline 上添加 Handler,以绑定服务端编解码逻辑
- configClientPipeline - 通过在 Netty 的 Pipline 上添加 Handler,以绑定客户端编解码逻辑
剩下就是基于 HTTP/2 的网络通讯和序列化的编解码,更多的是关于 Netty 的相关知识,此处不做展开。