概述

【Dubbo】服务暴露机制 - 图1

首先,看一下服务暴露的整体流程,如上图所示。Dubbo框架将服务暴露分为两大部分,第一步将持有的服务实例通过代理转换成Invoker,第二步会把Invoker通过具体的协议(比如Dubbo )转换成Exporter。接下来会具体分析一下每个步骤的流程以及源码。

源码解析

ServiceConfig类

ServiceBean类

在看ServiceConfig之前,先来看一下其子类ServiceBeanonApplicationEvent方法,该方法监听了Spring容器的上下文刷新事件,当收到该事件时会触发服务的暴露工作,具体代码如下所示:

  1. @Override
  2. //添加了上下文刷新监听,用于暴露服务使用。
  3. public void onApplicationEvent(ContextRefreshedEvent event) {
  4. //是否延迟导出 && 是否已经导出 && 是不是已经取消导出
  5. if (isDelay() && !isExported() && !isUnexported()) {
  6. //省略日志输出代码
  7. export();
  8. }
  9. }

至于ServiceBean的话,是通过配置或者注解解析出来的Bean,本文不做解析。

export方法

接下来看一下ServiceConfig#export方法,该方法主要对exportdelay配置进行检查,如果export=false就不做暴露,如果delay=true则进行延迟暴露。

  1. public synchronized void export() {
  2. if (provider != null) {
  3. //获取export和delay配置。
  4. if (export == null) {
  5. export = provider.getExport();
  6. }
  7. if (delay == null) {
  8. delay = provider.getDelay();
  9. }
  10. }
  11. if (export != null && !export) {
  12. return;
  13. }
  14. //延迟暴露
  15. if (delay != null && delay > 0) {
  16. delayExportExecutor.schedule(new Runnable() {
  17. @Override
  18. public void run() {
  19. doExport();
  20. }
  21. }, delay, TimeUnit.MILLISECONDS);
  22. } else {
  23. //暴露服务
  24. doExport();
  25. }
  26. }

接下来看一下ServiceConfig#doExport方法,该方法主要是进行配置检查,设置缺省值或者抛出异常等。配置检查部分本文不做解析,主要看一下该方法中调用的ServiceConfig#doExportUrls方法。

doExportUrls方法

ServiceConfig#doExportUrls方法会加载注册中心的URL,然后遍历协议列表,在每个协议下面都导出服务。代码如下所示:

  1. //多协议多注册中心导出。
  2. private void doExportUrls() {
  3. //加载注册中心URL
  4. List<URL> registryURLs = loadRegistries(true);
  5. for (ProtocolConfig protocolConfig : protocols) {
  6. // 遍历 protocols,并在每个协议下导出服务
  7. doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  8. }
  9. }

loadRegistries方法

loadRegistries方法主要包含以下逻辑:

  1. 检测是否存在注册中心配置类,不存在则抛出异常
  2. 遍历注册中心配置类,构建注册中心URL列表
  3. 判断是否加入到registryList

代码如下所示:

  1. protected List<URL> loadRegistries(boolean provider) {
  2. //检查是否存在注册中心配置类,不存在则抛出异常。
  3. checkRegistry();
  4. List<URL> registryList = new ArrayList<URL>();
  5. if (registries != null && !registries.isEmpty()) {
  6. for (RegistryConfig config : registries) {
  7. String address = config.getAddress();
  8. //默认值
  9. if (address == null || address.length() == 0) {
  10. address = Constants.ANYHOST_VALUE;
  11. }
  12. //配置值
  13. String sysaddress = System.getProperty("dubbo.registry.address");
  14. if (sysaddress != null && sysaddress.length() > 0) {
  15. address = sysaddress;
  16. }
  17. if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
  18. //将构建URL所需的参数加入到map里。
  19. Map<String, String> map = new HashMap<String, String>();
  20. appendParameters(map, application);
  21. appendParameters(map, config);
  22. map.put("path", RegistryService.class.getName());
  23. map.put("dubbo", Version.getProtocolVersion());
  24. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  25. if (ConfigUtils.getPid() > 0) {
  26. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  27. }
  28. if (!map.containsKey("protocol")) {
  29. if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
  30. map.put("protocol", "remote");
  31. } else {
  32. map.put("protocol", "dubbo");
  33. }
  34. }
  35. //解析得到URL列表,address可能包含多个注册中心ip
  36. List<URL> urls = UrlUtils.parseURLs(address, map);
  37. for (URL url : urls) {
  38. url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
  39. //将URL协议头设置为registry
  40. url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
  41. //(服务提供者 && 注册) || (非服务提供者 && 订阅)
  42. if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
  43. || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
  44. registryList.add(url);
  45. }
  46. }
  47. }
  48. }
  49. }
  50. return registryList;
  51. }

doExportUrlsFor1Protocol方法

doExportUrlsFor1Protocol方法主要包含以下逻辑:

  1. 组装URL
  2. 导出服务

这里主要分析一下导出服务的代码,导出服务部分代码主要包含以下逻辑:

  1. 如果scope!=remote,则导出服务到本地
  2. 如果scope!=local,则导出服务到远程
    1. 遍历注册中心URL,依次导出
  3. 其实一般来说scope是为空的,上面两个都会执行

代码如下所示:

  1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  2. //省略无关代码
  3. // don't export when none is configured
  4. if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
  5. // scope != remote,导出到本地
  6. if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
  7. exportLocal(url);
  8. }
  9. // scope != local,导出到远程
  10. if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
  11. //省略日志输出代码
  12. if (registryURLs != null && !registryURLs.isEmpty()) {
  13. for (URL registryURL : registryURLs) {
  14. url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
  15. //加载监视器链接,将监视器链接作为参数添加到 url 中
  16. URL monitorUrl = loadMonitor(registryURL);
  17. if (monitorUrl != null) {
  18. url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
  19. }
  20. //省略日志输出代码
  21. // For providers, this is used to enable custom proxy to generate invoker
  22. String proxy = url.getParameter(Constants.PROXY_KEY);
  23. if (StringUtils.isNotEmpty(proxy)) {
  24. registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
  25. }
  26. //生成invoker
  27. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  28. //DelegateProviderMetaDataInvoker用于持有Invoker和ServiceConfig
  29. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  30. //导出服务,将exporter加入到exporters
  31. Exporter<?> exporter = protocol.export(wrapperInvoker);
  32. exporters.add(exporter);
  33. }
  34. }
  35. // 不存在注册中心,仅导出服务
  36. else {
  37. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
  38. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  39. Exporter<?> exporter = protocol.export(wrapperInvoker);
  40. exporters.add(exporter);
  41. }
  42. }
  43. }
  44. this.urls.add(url);
  45. }

exportLocal方法

exportLocal方法用于导出本地服务,主要包含以下逻辑:

  1. 判断当前协议是否已经是injvm,如果是不作处理
  2. 设置URL的协议、hostport为本地
  3. 创建invoker,导出服务
  1. private void exportLocal(URL url) {
  2. //判断当前协议是否为injvm,如果是就无需再次导出了。
  3. if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
  4. URL local = URL.valueOf(url.toFullString())
  5. //将协议设置成injvm,host设置为本地,port=0
  6. .setProtocol(Constants.LOCAL_PROTOCOL)
  7. .setHost(LOCALHOST)
  8. .setPort(0);
  9. StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
  10. //创建invoker,导出服务
  11. //这里的protocol是一个自适应扩展类,会根据url的协议去调用对应扩展类的export方法
  12. //此处调用的是InjvmProtocol#export方法
  13. Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
  14. exporters.add(exporter);
  15. logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
  16. }
  17. }

getInvoker方法

getInvokerProxyFactor接口提供的,默认的实现类是JavassistProxyFactoryJavassistProxyFactory#getInvoker方法主要包含以下逻辑:

  1. 创建包装类
  2. 创建invoker,对invoker的方法调用最终会调用到包装类的invokeMethod

代码如下所示:

  1. public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
  2. //创建包装类
  3. final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
  4. //创建匿名invoker类,实现doInvoke方法,doInvoke方法是一个抽象法法
  5. return new AbstractProxyInvoker<T>(proxy, type, url) {
  6. @Override
  7. protected Object doInvoke(T proxy, String methodName,
  8. Class<?>[] parameterTypes,
  9. Object[] arguments) throws Throwable {
  10. return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
  11. }
  12. };
  13. }

getWrapper方法主要逻辑是对传入的类进行解析,生成invokeMethod代码以及一些其他代码,然后通过Javassist生成class对象并通过反射创建实例,具体代码不做解析。

Protocol

RegistryProtocol#export

RegistryProtocol#export方法主要包含以下逻辑:

  1. 委托具体协议(Dubbo)进行服务暴露,创建NettyServer监听端口和保存服务实例。
  2. 创建注册中心对象,与注册中心创建TCP连接。
  3. 注册服务元数据到注册中心。
  4. 订阅configurators节点,监听服务动态属性变更事件。
  5. 服务销毁收尾工作,比如关闭端口、反注册服务信息等

代码如下所示:

  1. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  2. //本地暴露服务,这个方法会取出export参数的值作为url,并根据url的协议调用对应的protocol(比如dubbo)进行export
  3. final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
  4. //获取注册中心URL,以zookeeper注册中心为例,得到的示例 URL 如下:
  5. //zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.18.39.113%3A20880%2Ftop.fuyuaaa.api.TestService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bean.name%3Dtop.fuyuaaa.api.TestService%26bind.ip%3D172.18.39.113%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dtop.fuyuaaa.api.TestService%26methods%3Ddemo%26pid%3D72080%26qos.port%3D22222%26revision%3D1.0%26side%3Dprovider%26timestamp%3D1599729148679%26transporter%3Dnetty%26version%3D1.0&pid=72080&qos.port=22222&timestamp=1599729148633
  6. URL registryUrl = getRegistryUrl(originInvoker);
  7. //根据URL加载Registry实现类,比如 ZookeeperRegistry
  8. final Registry registry = getRegistry(originInvoker);
  9. //获取已注册的服务提供者URL,比如:
  10. //dubbo://172.18.39.113:20880/top.fuyuaaa.api.TestService?anyhost=true&application=demo-provider&bean.name=top.fuyuaaa.api.TestService&dubbo=2.0.2&generic=false&interface=top.fuyuaaa.api.TestService&methods=demo&pid=72080&revision=1.0&side=provider&timestamp=1599729148679&transporter=netty&version=1.0
  11. final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
  12. //to judge to delay publish whether or not
  13. //获取 register 参数
  14. boolean register = registeredProviderUrl.getParameter("register", true);
  15. //向服务提供者与消费者注册表中注册服务提供者
  16. ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
  17. if (register) {
  18. //向注册中心注册服务
  19. register(registryUrl, registeredProviderUrl);
  20. ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
  21. }
  22. //获取订阅 URL,比如:
  23. //provider://172.18.39.113:20880/top.fuyuaaa.api.TestService?anyhost=true&application=demo-provider&bean.name=top.fuyuaaa.api.TestService&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=top.fuyuaaa.api.TestService&methods=demo&pid=72080&revision=1.0&side=provider&timestamp=1599729148679&transporter=netty&version=1.0
  24. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
  25. //创建监听器,监听服务接口下configurators节点,用于处理动态配置,比如dubbo-admin对集群的服务治理。
  26. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
  27. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  28. //向注册中心进行订阅override数据
  29. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  30. //创建并返回 DestroyableExporter
  31. return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
  32. }

DubboProtocol#export

DubboProtocol#export方法主要包含以下逻辑:

  1. 缓存servicekeyexporter
  2. 创建服务器

代码如下所示:

  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  2. URL url = invoker.getUrl();
  3. // export service.
  4. //缓存key和对应的export
  5. //🌰:top.fuyuaaa.api.TestService:1.0:20880
  6. String key = serviceKey(url);
  7. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  8. exporterMap.put(key, exporter);
  9. //省略本地存根相关代码
  10. //创建服务器实例
  11. openServer(url);
  12. //优化序列化
  13. optimizeSerialization(url);
  14. return exporter;
  15. }
  16. private void openServer(URL url) {
  17. // find server.
  18. // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
  19. String key = url.getAddress();
  20. boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
  21. if (isServer) {
  22. ExchangeServer server = serverMap.get(key);
  23. if (server == null) {
  24. serverMap.put(key, createServer(url));
  25. } else {
  26. // 服务器已创建,则根据 url 中的配置重置服务器
  27. server.reset(url);
  28. }
  29. }
  30. }

关于服务注册以及创建netty服务器具体流程不做解析。

流程图

【Dubbo】服务暴露机制 - 图2

参考

Dubbo官网

《深入理解Apache Dubbo与实战》