概述

【Dubbo】服务引用机制 - 图1

首先,看一下服务引用的整体流程,如上图所示。Dubbo框架将服务消费分为两大部分,第一步通过持有远程服务实例生成Invoker,这个Invoker在客户端是核心的远程代理对象。第二步会把Invoker通过动态代理转换成实现用户接口的动态代理引用。

源码解析

ReferenceConfig类

服务引用的逻辑是从ReferenceBean#getObject开始的,ReferenceBean实现了FactoryBeanFactoryBean是一个工厂Bean,可以生成某一个类型Bean实例,它最大的一个作用是:可以让我们自定义Bean的创建过程。

如以下代码所示,当调用getBean方法时,最终会调用到ReferenceBean#getObject,并触发ReferenceConfig#init方法,生成TestService的代理对象并返回。

  1. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"/dubbo-demo-consumer.xml"});
  2. context.start();
  3. TestService testService = (TestService) context.getBean("testService");

ReferenceConfig#createProxy方法

ReferenceBean#getObject被调用时,会调用ReferenceConfig#get方法,此时会检查当前引用对象是否已生成,如果未生成则会触发init方法,生成引用对象。init方法会对配置进行检查和处理,保证配置的正确性,然后会调用createProxy方法创建代理对象。

createProxy方法主要包含以下逻辑:

  1. 处理本地调用的情(injvm=true || scope=local
  2. 处理远程调用的情况(分以下两种)
    1. 点对点调用
    2. 通过注册中心
  3. 可用性检查
  4. 创建代理类

createProxy方法代码比较多,这里分开解析,首先来看一下处理本地调用的逻辑。代码如下所示:

  1. final boolean isJvmRefer;
  2. //没有配置injvm=true
  3. //ps 本地调用有两种配置方式 injvm=true 以及 scope=local,前者已经被@Deprecated。
  4. if (isInjvm() == null) {
  5. //url配置被指定,则不做本地引用
  6. if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
  7. isJvmRefer = false;
  8. }
  9. //如果用户显式配置了scope=local或者injvm=true,此时isInjvmRefer返回true
  10. else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
  11. // by default, reference local service if there is
  12. isJvmRefer = true;
  13. } else {
  14. isJvmRefer = false;
  15. }
  16. }
  17. //配置了injvm
  18. else {
  19. //获取injvm配置值
  20. isJvmRefer = isInjvm().booleanValue();
  21. }
  22. //本地引用
  23. if (isJvmRefer) {
  24. //生成本地引用URL,协议为injvm
  25. URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
  26. //调用refer方法构建InjvmInvoker实例,根据url的协议会去调用InjvmProtocol#refer
  27. invoker = refprotocol.refer(interfaceClass, url);
  28. //省略日志输出代码
  29. }

本地调用逻辑比较简单,看注释就可。接下来看一下远程调用的具体流程,主要包含以下逻辑:

  1. 判断有无指定url,指定了url说明是想要进行点对点调用
  2. 未指定url,则加载注册中心的url
  3. url搞成invoker,多个url则通过cluster合并

代码如下所示:

  1. //url不为空,说明在配置中指定了url的值,表示想进行点对点调用
  2. //比如<dubbo:reference version="1.0" id="xx" interface="xx" url="xxx"/>
  3. if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
  4. //当配置多个url时,可用分号进行分割,这里会进行切分
  5. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
  6. if (us != null && us.length > 0) {
  7. //遍历多个url,加入到urls列表
  8. for (String u : us) {
  9. URL url = URL.valueOf(u);
  10. if (url.getPath() == null || url.getPath().length() == 0) {
  11. //设置url路径=接口全限定名
  12. url = url.setPath(interfaceName);
  13. }
  14. //检测url协议是否为registry,若是,表明用户想使用指定的注册中心
  15. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  16. //将map转换为查询字符串,并作为refer参数的值添加到url中
  17. urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
  18. } else {
  19. //合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
  20. //比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
  21. //最后将合并后的配置设置为 url 查询字符串中。
  22. urls.add(ClusterUtils.mergeUrl(url, map));
  23. }
  24. }
  25. }
  26. }
  27. //url为空,表示通过注册中心获取url
  28. else { // assemble URL from register center's configuration
  29. // 加载注册中心 url
  30. List<URL> us = loadRegistries(false);
  31. if (us != null && !us.isEmpty()) {
  32. for (URL u : us) {
  33. URL monitorUrl = loadMonitor(u);
  34. if (monitorUrl != null) {
  35. map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
  36. }
  37. urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
  38. }
  39. }
  40. //未配置注册中心,抛出异常
  41. if (urls.isEmpty()) {
  42. //省略异常抛出代码
  43. }
  44. }
  45. //单个注册中心或服务提供者
  46. if (urls.size() == 1) {
  47. invoker = refprotocol.refer(interfaceClass, urls.get(0));
  48. }
  49. //多个注册中心或多个服务提供者,或者两者混合
  50. else {
  51. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
  52. URL registryURL = null;
  53. for (URL url : urls) {
  54. //根据url协议头加载指定的Protocol实例,并调用实例的refer方法构建invoker,比如registry就会调用RegistryProtocol#refer
  55. invokers.add(refprotocol.refer(interfaceClass, url));
  56. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  57. registryURL = url; // use last registry url
  58. }
  59. }
  60. //通过cluster合并多个invoker
  61. //如果存在注册中心的url,则会使用AvailableCluster,AvailableCluster会先判断是否可用,再进行invoke调用。
  62. if (registryURL != null) { // registry url is available
  63. // use AvailableCluster only when register's cluster is available
  64. URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
  65. invoker = cluster.join(new StaticDirectory(u, invokers));
  66. } else { // not a registry url
  67. invoker = cluster.join(new StaticDirectory(invokers));
  68. }
  69. }

createProxy方法还有最后一份逻辑是关于可用性检查以及使用invoker创建代理类,代码如下所示:

  1. //如果为配置check=false(默认true),则会进行可用性检查,未通过则会抛出异常。
  2. Boolean c = check;
  3. if (c == null && consumer != null) {
  4. c = consumer.isCheck();
  5. }
  6. if (c == null) {
  7. c = true; // default true
  8. }
  9. //可用性检查
  10. if (c && !invoker.isAvailable()) {
  11. // make it possible for consumer to retry later if provider is temporarily unavailable
  12. initialized = false;
  13. //省略异常抛出代码
  14. }
  15. if (logger.isInfoEnabled()) {
  16. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
  17. }
  18. // create service proxy
  19. //调用JavassistProxyFactory#getProxy生成代理类。
  20. return (T) proxyFactory.getProxy(invoker);

Protocol#refer方法

无论是本地调用还是远程调用,都会使用Protocol#refer方法来创建invoker。本地调用使用的InjvmProtocol#refer逻辑比较简单,就是创建了一个InjvmInvoker对象并返回。这里主要看一下RegistryProtocol以及DubboProtocolrefer方法。

RegistryProtocol#refer方法

RegistryProtocol#refer方法的逻辑比较简单,先获取url的协议参数并设置成协议头,再调用doRefer方法生成invoker并返回。

代码如下所示:

  1. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  2. //取registry参数值,并将其设置为协议头
  3. url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
  4. //获取注册中心实例,如果协议为zookeeper则获取到ZookeeperRegistry
  5. Registry registry = registryFactory.getRegistry(url);
  6. if (RegistryService.class.equals(type)) {
  7. return proxyFactory.getInvoker((T) registry, type, url);
  8. }
  9. //省略group配置处理相关代码
  10. return doRefer(cluster, registry, type, url);
  11. }

接下来看一下doRefer方法,该方法主要包含一下逻辑:

  1. 创建RegistryDirectory实例
  2. 注册服务消费者,在consumers目录下新建节点
  3. 订阅providers、configurators、routers等节点数据
  4. 通过cluster创建invoker

代码如下所示:

  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  2. //创建RegistryDirectory实例
  3. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  4. directory.setRegistry(registry);
  5. directory.setProtocol(protocol);
  6. // all attributes of REFER_KEY
  7. Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
  8. URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
  9. //注册服务消费者,在consumers目录下新节点
  10. if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
  11. && url.getParameter(Constants.REGISTER_KEY, true)) {
  12. URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
  13. registry.register(registeredConsumerUrl);
  14. directory.setRegisteredConsumerUrl(registeredConsumerUrl);
  15. }
  16. //订阅providers、configurators、routers等节点数据
  17. directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
  18. Constants.PROVIDERS_CATEGORY
  19. + "," + Constants.CONFIGURATORS_CATEGORY
  20. + "," + Constants.ROUTERS_CATEGORY));
  21. //一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
  22. Invoker invoker = cluster.join(directory);
  23. ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
  24. return invoker;
  25. }

DubboProtocol#refer方法

DubboProtocol#refer方法逻辑比较简单,主要就是创建一个DubboInvoker并返回。代码如下所示:

  1. public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  2. optimizeSerialization(url);
  3. //创建DubboInvoker
  4. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  5. invokers.add(invoker);
  6. return invoker;
  7. }

这里主要看一下getClients方法,该方法用来获取客户端实例。代码如下所示:

  1. private ExchangeClient[] getClients(URL url) {
  2. // whether to share connection
  3. // 是否共享连接
  4. boolean service_share_connect = false;
  5. // 获取connections参数,connections表示该服务对每个提供者建立的长连接数。
  6. // 如果未配置或配置了0,则默认共用一个长连接。否则一个service使用connections个连接。
  7. int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  8. // if not configured, connection is shared, otherwise, one connection for one service
  9. if (connections == 0) {
  10. service_share_connect = true;
  11. connections = 1;
  12. }
  13. //初始化客户端实例
  14. ExchangeClient[] clients = new ExchangeClient[connections];
  15. for (int i = 0; i < clients.length; i++) {
  16. if (service_share_connect) {
  17. //获取共享客户端
  18. clients[i] = getSharedClient(url);
  19. } else {
  20. clients[i] = initClient(url);
  21. }
  22. }
  23. return clients;
  24. }

这里解释一下connections参数的含义:connections参数表示最大的长连接数量,如果connections未配置或者值为0,则共享一个长连接(这里的共享指的是consumer与某个provider只建立一个长连接);如果connection配置了值且不为0,则表示每个service最大可创建connections个长连接。

接下来看一下getSharedClient方法,该方法用来获取共享客户端,代码如下所示:

  1. private ExchangeClient getSharedClient(URL url) {
  2. //key为address,所以是相同地址共享一个连接
  3. String key = url.getAddress();
  4. //从缓存里获取客户端实例,如果未获取到,则新建一个ExchangeClient实例
  5. //ReferenceCountExchangeClient就是个带引用计数功能的ExchangeClient,表示该连接实例被多少个service共享了
  6. ReferenceCountExchangeClient client = referenceClientMap.get(key);
  7. if (client != null) {
  8. if (!client.isClosed()) {
  9. client.incrementAndGetCount();
  10. return client;
  11. } else {
  12. referenceClientMap.remove(key);
  13. }
  14. }
  15. locks.putIfAbsent(key, new Object());
  16. synchronized (locks.get(key)) {
  17. if (referenceClientMap.containsKey(key)) {
  18. return referenceClientMap.get(key);
  19. }
  20. //初始化客户端实例,然后丢到map里
  21. ExchangeClient exchangeClient = initClient(url);
  22. client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
  23. referenceClientMap.put(key, client);
  24. ghostClientMap.remove(key);
  25. locks.remove(key);
  26. return client;
  27. }
  28. }

getSharedClient方法在未查询到缓存时,会调用initClient方法创建新的客户端实例,initClient代码如下所示:

  1. private ExchangeClient initClient(URL url) {
  2. // client type setting.
  3. // 获取客户端类型,默认为netty
  4. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
  5. url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
  6. // enable heartbeat by default
  7. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  8. // BIO is not allowed since it has severe performance issue.
  9. //判断客户端类型是否支持
  10. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
  11. //省略异常抛出代码
  12. }
  13. ExchangeClient client;
  14. try {
  15. // connection should be lazy
  16. //创建客户端实例,懒加载(request调用时才进行创建)&普通
  17. if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
  18. client = new LazyConnectExchangeClient(url, requestHandler);
  19. } else {
  20. client = Exchangers.connect(url, requestHandler);
  21. }
  22. } //省略异常抛出代码
  23. return client;

ProxyFactory#getProxy方法

ReferenceConfig#createProxy在创建invoker之后会进行可用性检查,然后会调用ProxyFactory.getProxy创建代理类。

代码如下所示:

  1. public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
  2. Class<?>[] interfaces = null;
  3. //获取接口列表
  4. String config = invoker.getUrl().getParameter("interfaces");
  5. if (config != null && config.length() > 0) {
  6. String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
  7. if (types != null && types.length > 0) {
  8. interfaces = new Class<?>[types.length + 2];
  9. interfaces[0] = invoker.getInterface();
  10. interfaces[1] = EchoService.class;
  11. for (int i = 0; i < types.length; i++) {
  12. interfaces[i + 2] = ReflectUtils.forName(types[i]);
  13. }
  14. }
  15. }
  16. //创建接口数组
  17. if (interfaces == null) {
  18. interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
  19. }
  20. //省略泛化调用代码
  21. //生成代理类
  22. return getProxy(invoker, interfaces);
  23. }

回声测试用于检测服务是否可用,回声测试按照正常请求流程执行,能够测试整个调用是否通畅,可用于监控。所有服务自动实现 EchoService 接口,只需将任意服务引用强制转型为 EchoService,即可使用。

  1. // 远程服务引用
  2. MemberService memberService = ctx.getBean("memberService");
  3. EchoService echoService = (EchoService) memberService; // 强制转型为EchoService
  4. // 回声测试可用性
  5. String status = echoService.$echo("OK");
  6. assert(status.equals("OK"));

在获取interfaces数组之后,会调用JavassistProxyFactory#getProxy方法生成代理类对象,该方法不做解析,这里通过反编译看一下生成的代理类的具体代码。

  1. /*
  2. * Decompiled with CFR.
  3. *
  4. * Could not load the following classes:
  5. * top.fuyuaaa.api.TestService
  6. */
  7. package com.alibaba.dubbo.common.bytecode;
  8. import com.alibaba.dubbo.common.bytecode.ClassGenerator;
  9. import com.alibaba.dubbo.rpc.service.EchoService;
  10. import java.lang.reflect.InvocationHandler;
  11. import java.lang.reflect.Method;
  12. import top.fuyuaaa.api.TestService;
  13. public class proxy0 implements ClassGenerator.DC,TestService,EchoService {
  14. public static Method[] methods;
  15. private InvocationHandler handler;
  16. public String demo(String string) {
  17. Object[] arrobject = new Object[]{string};
  18. Object object = this.handler.invoke(this, methods[0], arrobject);
  19. return (String)object;
  20. }
  21. @Override
  22. public Object $echo(Object object) {
  23. Object[] arrobject = new Object[]{object};
  24. Object object2 = this.handler.invoke(this, methods[1], arrobject);
  25. return object2;
  26. }
  27. public proxy0() {
  28. }
  29. public proxy0(InvocationHandler invocationHandler) {
  30. this.handler = invocationHandler;
  31. }
  32. }

参考

Dubbo官网

《深入理解Apache Dubbo与实战》