• 了解需求
  • 先主线, 找到入口
  • 技术设计的整理(云笔记)

    Dubbo服务的注册流程

    服务发布步骤

  • 注解

    1. @DubboService(
    2. loadbalance = "random",
    3. cluster = "failover",
    4. retries = 2)
  • 注解扫描

    1. @DubboComponentScan

    引发思考

  • 扫描注解(解析注解)

  • url驱动, url的组装
  • 注册到zookeeper(?)
  • 启动服务(根据url中配置的协议、端口去发布对应的服务) (?)

    dubbo源码

    dubbo服务发布有两种形式

  • xml 形式 dubbo:service

  • @DubboService/ @Service

    Dubbo注解的解析流程

    DubboComponetScan

    无非就是,把哪个bean注册到了Spring IOC容器

    ServiceAnnotationBeanPostProcessor

  1. @Override
  2. public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
  3. Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
  4. registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
  5. // @since 2.7.6 Register the common beans
  6. registerCommonBeans(registry);
  7. }

ServiceAnnotationBeanPostProcessor

  1. public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
  2. super(packagesToScan);
  3. }

BeanDefinitionRegistryPostProcessor

bean装载完成之后,会触发下面这个方法.

  1. public ServiceAnnotationBeanPostProcessor(Set<String> packagesToScan) {
  2. super(packagesToScan);
  3. }
  1. @Override
  2. public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
  3. // @since 2.7.5
  4. registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);
  5. Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
  6. if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
  7. registerServiceBeans(resolvedPackagesToScan, registry);
  8. } else {
  9. if (logger.isWarnEnabled()) {
  10. logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
  11. }
  12. }
  13. }

注册一个DubboBootstrapApplicationListener

这个bean会在spring 容器的上下文装载完成之后,触发监听

  1. public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListener
  2. implements Ordered {
  3. /**
  4. * The bean name of {@link DubboBootstrapApplicationListener}
  5. *
  6. * @since 2.7.6
  7. */
  8. public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
  9. private final DubboBootstrap dubboBootstrap;
  10. public DubboBootstrapApplicationListener() {
  11. this.dubboBootstrap = DubboBootstrap.getInstance();
  12. }
  13. @Override
  14. public void onApplicationContextEvent(ApplicationContextEvent event) {
  15. if (event instanceof ContextRefreshedEvent) {
  16. onContextRefreshedEvent((ContextRefreshedEvent) event);
  17. } else if (event instanceof ContextClosedEvent) {
  18. onContextClosedEvent((ContextClosedEvent) event);
  19. }
  20. }

registerServiceBeans

  1. private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
  2. DubboClassPathBeanDefinitionScanner scanner =
  3. new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
  4. BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
  5. scanner.setBeanNameGenerator(beanNameGenerator);
  6. // refactor @since 2.7.7
  7. serviceAnnotationTypes.forEach(annotationType -> {
  8. scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
  9. });
  10. for (String packageToScan : packagesToScan) {
  11. // Registers @Service Bean first
  12. scanner.scan(packageToScan);
  13. // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
  14. Set<BeanDefinitionHolder> beanDefinitionHolders =
  15. findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
  16. if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
  17. for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
  18. registerServiceBean(beanDefinitionHolder, registry, scanner);
  19. }
  20. if (logger.isInfoEnabled()) {
  21. logger.info(beanDefinitionHolders.size() + " annotated Dubbo's @Service Components { " +
  22. beanDefinitionHolders +
  23. " } were scanned under package[" + packageToScan + "]");
  24. }
  25. } else {
  26. if (logger.isWarnEnabled()) {
  27. logger.warn("No Spring Bean annotating Dubbo's @Service was found under package["
  28. + packageToScan + "]");
  29. }
  30. }
  31. }
  32. }

registerServiceBean

和服务有关的信息,实际上都在我们刚刚定义的@DubboService
[DemoService]

  • 服务以什么协议发布
  • 服务的负载均衡策略
  • 服务的容错策略
  • 服务发布端口

org.apache.dubbo.config.spring.ServiceBean

  1. private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
  2. DubboClassPathBeanDefinitionScanner scanner) {
  3. Class<?> beanClass = resolveClass(beanDefinitionHolder);
  4. Annotation service = findServiceAnnotation(beanClass);
  5. /**
  6. * The {@link AnnotationAttributes} of @Service annotation
  7. */
  8. AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
  9. Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
  10. String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
  11. AbstractBeanDefinition serviceBeanDefinition =
  12. buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
  13. // ServiceBean Bean name
  14. String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
  15. if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
  16. registry.registerBeanDefinition(beanName, serviceBeanDefinition);
  17. if (logger.isInfoEnabled()) {
  18. logger.info("The BeanDefinition[" + serviceBeanDefinition +
  19. "] of ServiceBean has been registered with name : " + beanName);
  20. }
  21. } else {
  22. if (logger.isWarnEnabled()) {
  23. logger.warn("The Duplicated BeanDefinition[" + serviceBeanDefinition +
  24. "] of ServiceBean[ bean name : " + beanName +
  25. "] was be found , Did @DubboComponentScan scan to same package in many times?");
  26. }
  27. }
  28. }

registry.registerBeanDefinition(beanName, serviceBeanDefinition);

最终通过上述代码,讲一个 dubbo中提供的ServiceBean注入到Spring IOC容器

ServiceBean的初始化阶段

  1. public ServiceBean() {
  2. super();
  3. this.service = null;
  4. }

当ServiceBean初始化完成之后,会调用下面的方法.

  1. @Override
  2. public void afterPropertiesSet() throws Exception {
  3. if (StringUtils.isEmpty(getPath())) {
  4. if (StringUtils.isNotEmpty(beanName)
  5. && StringUtils.isNotEmpty(getInterface())
  6. && beanName.startsWith(getInterface())) {
  7. setPath(beanName);
  8. }
  9. }
  10. }

DubboBootstrapApplicationListener

启动dubbo服务

  1. private void onContextRefreshedEvent(ContextRefreshedEvent event) {
  2. dubboBootstrap.start();
  3. }
  • 元数据/远程配置信息的初始化
  • 拼接url()
  • 如果是dubbo协议,则启动netty server
  • 服务注册

    start()

    1. public DubboBootstrap start() {
    2. if (started.compareAndSet(false, true)) {
    3. ready.set(false);
    4. initialize();
    5. if (logger.isInfoEnabled()) {
    6. logger.info(NAME + " is starting...");
    7. }
    8. // 1. export Dubbo Services
    9. exportServices();
    10. // Not only provider register
    11. if (!isOnlyRegisterProvider() || hasExportedServices()) {
    12. // 2. export MetadataService
    13. exportMetadataService();
    14. //3. Register the local ServiceInstance if required
    15. registerServiceInstance();
    16. }
    17. referServices();
    18. if (asyncExportingFutures.size() > 0) {
    19. new Thread(() -> {
    20. try {
    21. this.awaitFinish();
    22. } catch (Exception e) {
    23. logger.warn(NAME + " exportAsync occurred an exception.");
    24. }
    25. ready.set(true);
    26. if (logger.isInfoEnabled()) {
    27. logger.info(NAME + " is ready.");
    28. }
    29. }).start();
    30. } else {
    31. ready.set(true);
    32. if (logger.isInfoEnabled()) {
    33. logger.info(NAME + " is ready.");
    34. }
    35. }
    36. if (logger.isInfoEnabled()) {
    37. logger.info(NAME + " has started.");
    38. }
    39. }
    40. return this;
    41. }

    exportServices

    发布dubbo服务

  1. private void exportServices() {
  2. configManager.getServices().forEach(sc -> {
  3. // TODO, compatible with ServiceConfig.export()
  4. ServiceConfig serviceConfig = (ServiceConfig) sc;
  5. serviceConfig.setBootstrap(this);
  6. if (exportAsync) {
  7. ExecutorService executor = executorRepository.getServiceExporterExecutor();
  8. Future<?> future = executor.submit(() -> {
  9. sc.export();
  10. exportedServices.add(sc);
  11. });
  12. asyncExportingFutures.add(future);
  13. } else {
  14. sc.export();
  15. exportedServices.add(sc);
  16. }
  17. });
  18. }

遍历所有dubbo服务,进行服务发布.

  1. <dubbo:service beanName="ServiceBean:com.msbedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService" />
  2. <dubbo:service beanName="ServiceBean:com.msbedu.springboot.dubbo.ISayHelloService" />
  3. dubbo://ip:port?com.msbedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService&xxx&xxx
  4. dubbo://ip:port?com.msbedu.springboot.dubbo.ISayHelloService&xxx&xxx

一个接口需要发布几次?
一个dubbo服务需要发布几次,取决于协议的配置数,如果一个dubbo服务配置了3个协议,rest、webservice、dubbo。
dubbo://
rest://
webservice://

export

  1. public synchronized void export() {
  2. if (!shouldExport()) {
  3. return;
  4. }
  5. if (bootstrap == null) {
  6. bootstrap = DubboBootstrap.getInstance();
  7. bootstrap.init();
  8. }
  9. checkAndUpdateSubConfigs();
  10. //init serviceMetadata
  11. serviceMetadata.setVersion(version);
  12. serviceMetadata.setGroup(group);
  13. serviceMetadata.setDefaultGroup(group);
  14. serviceMetadata.setServiceType(getInterfaceClass());
  15. serviceMetadata.setServiceInterfaceName(getInterface());
  16. serviceMetadata.setTarget(getRef());
  17. if (shouldDelay()) {
  18. DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
  19. } else {
  20. doExport();
  21. }
  22. exported();
  23. }

doExportUrls

主要流程,根据开发者配置的协议列表,遍历协议列表逐项进行发布。

  1. private void doExportUrls() {
  2. ServiceRepository repository = ApplicationModel.getServiceRepository();
  3. ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
  4. repository.registerProvider(
  5. getUniqueServiceName(),
  6. ref,
  7. serviceDescriptor,
  8. this,
  9. serviceMetadata
  10. );
  11. List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
  12. for (ProtocolConfig protocolConfig : protocols) {
  13. String pathKey = URL.buildKey(getContextPath(protocolConfig)
  14. .map(p -> p + "/" + path)
  15. .orElse(path), group, version);
  16. // In case user specified path, register service one more time to map it to path.
  17. repository.registerService(pathKey, interfaceClass);
  18. // TODO, uncomment this line once service key is unified
  19. serviceMetadata.setServiceKey(pathKey);
  20. doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  21. }
  22. }

doExportUrlsFor1Protocol

  • 生成url
  • 根据url中配置的协议类型,调用指定协议进行服务的发布

    • 启动服务
    • 注册服务

      1. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
      2. String name = protocolConfig.getName();
      3. if (StringUtils.isEmpty(name)) {
      4. name = DUBBO;
      5. }
      6. //用来存储所有的配置信息
      7. * dubbo:service
      8. dubbo:method
      9. dubbo:argument
      10. Map<String, String> map = new HashMap<String, String>();
      11. map.put(SIDE_KEY, PROVIDER_SIDE);
      12. ServiceConfig.appendRuntimeParameters(map);
      13. AbstractConfig.appendParameters(map, getMetrics());
      14. AbstractConfig.appendParameters(map, getApplication());
      15. AbstractConfig.appendParameters(map, getModule());
      16. // remove 'default.' prefix for configs from ProviderConfig
      17. // appendParameters(map, provider, Constants.DEFAULT_KEY);
      18. AbstractConfig.appendParameters(map, provider);
      19. AbstractConfig.appendParameters(map, protocolConfig);
      20. AbstractConfig.appendParameters(map, this);
      21. MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
      22. if (metadataReportConfig != null && metadataReportConfig.isValid()) {
      23. map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
      24. }
      25. if (CollectionUtils.isNotEmpty(getMethods())) {
      26. for (MethodConfig method : getMethods()) {
      27. AbstractConfig.appendParameters(map, method, method.getName());
      28. String retryKey = method.getName() + ".retry";
      29. if (map.containsKey(retryKey)) {
      30. String retryValue = map.remove(retryKey);
      31. if ("false".equals(retryValue)) {
      32. map.put(method.getName() + ".retries", "0");
      33. }
      34. }
      35. List<ArgumentConfig> arguments = method.getArguments();
      36. if (CollectionUtils.isNotEmpty(arguments)) {
      37. for (ArgumentConfig argument : arguments) {
      38. // convert argument type
      39. if (argument.getType() != null && argument.getType().length() > 0) {
      40. Method[] methods = interfaceClass.getMethods();
      41. // visit all methods
      42. if (methods.length > 0) {
      43. for (int i = 0; i < methods.length; i++) {
      44. String methodName = methods[i].getName();
      45. // target the method, and get its signature
      46. if (methodName.equals(method.getName())) {
      47. Class<?>[] argtypes = methods[i].getParameterTypes();
      48. // one callback in the method
      49. if (argument.getIndex() != -1) {
      50. if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
      51. AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
      52. } else {
      53. throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
      54. }
      55. } else {
      56. // multiple callbacks in the method
      57. for (int j = 0; j < argtypes.length; j++) {
      58. Class<?> argclazz = argtypes[j];
      59. if (argclazz.getName().equals(argument.getType())) {
      60. AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
      61. if (argument.getIndex() != -1 && argument.getIndex() != j) {
      62. throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
      63. }
      64. }
      65. }
      66. }
      67. }
      68. }
      69. }
      70. } else if (argument.getIndex() != -1) {
      71. AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
      72. } else {
      73. throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
      74. }
      75. }
      76. }
      77. } // end of methods for
      78. }
      79. if (ProtocolUtils.isGeneric(generic)) {
      80. map.put(GENERIC_KEY, generic);
      81. map.put(METHODS_KEY, ANY_VALUE);
      82. } else {
      83. String revision = Version.getVersion(interfaceClass, version);
      84. if (revision != null && revision.length() > 0) {
      85. map.put(REVISION_KEY, revision);
      86. }
      87. String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
      88. if (methods.length == 0) {
      89. logger.warn("No method found in service interface " + interfaceClass.getName());
      90. map.put(METHODS_KEY, ANY_VALUE);
      91. } else {
      92. map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
      93. }
      94. }
      95. /**
      96. * Here the token value configured by the provider is used to assign the value to ServiceConfig#token
      97. */
      98. if(ConfigUtils.isEmpty(token) && provider != null) {
      99. token = provider.getToken();
      100. }
      101. if (!ConfigUtils.isEmpty(token)) {
      102. if (ConfigUtils.isDefault(token)) {
      103. map.put(TOKEN_KEY, UUID.randomUUID().toString());
      104. } else {
      105. map.put(TOKEN_KEY, token);
      106. }
      107. }
      108. //init serviceMetadata attachments
      109. serviceMetadata.getAttachments().putAll(map);
      110. // export service
      111. String host = findConfigedHosts(protocolConfig, registryURLs, map);
      112. Integer port = findConfigedPorts(protocolConfig, name, map);
      113. URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
      114. // You can customize Configurator to append extra parameters
      115. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
      116. .hasExtension(url.getProtocol())) {
      117. url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
      118. .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
      119. }
      120. String scope = url.getParameter(SCOPE_KEY);
      121. // don't export when none is configured
      122. if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
      123. // export to local if the config is not remote (export to remote only when config is remote)
      124. if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
      125. exportLocal(url);
      126. }
      127. // export to remote if the config is not local (export to local only when config is local)
      128. if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
      129. if (CollectionUtils.isNotEmpty(registryURLs)) {
      130. for (URL registryURL : registryURLs) {
      131. //if protocol is only injvm ,not register
      132. if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
      133. continue;
      134. }
      135. url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
      136. URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
      137. if (monitorUrl != null) {
      138. url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
      139. }
      140. if (logger.isInfoEnabled()) {
      141. if (url.getParameter(REGISTER_KEY, true)) {
      142. logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
      143. } else {
      144. logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
      145. }
      146. }
      147. // For providers, this is used to enable custom proxy to generate invoker
      148. String proxy = url.getParameter(PROXY_KEY);
      149. if (StringUtils.isNotEmpty(proxy)) {
      150. registryURL = registryURL.addParameter(PROXY_KEY, proxy);
      151. }
      152. Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
      153. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
      154. Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
      155. exporters.add(exporter);
      156. }
      157. } else {
      158. if (logger.isInfoEnabled()) {
      159. logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
      160. }
      161. Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
      162. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
      163. Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
      164. exporters.add(exporter);
      165. }
      166. /**
      167. * @since 2.7.0
      168. * ServiceData Store
      169. */
      170. WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
      171. if (metadataService != null) {
      172. metadataService.publishServiceDefinition(url);
      173. }
      174. }
      175. }
      176. this.urls.add(url);
      177. }
  1. dubbo://192.168.1.104:20880/com.msbedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService?anyhost=true&application=spring-boot-dubbo-sample-provider&bind.ip=192.168.1.104&bind.port=20880&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msbedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService&methods=getTxt&pid=225512&qos.accept.foreign.ip=false&qos.enable=true&qos.port=8888&release=2.7.7&side=provider&timestamp=1597498654297
  2. & loadbalance=msbloadbalance
  3. consumer://ip:port....&

dubbo中的扩展点

  1. ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())
  • Java中的spi
  • SpringFactoriesLoader
  • Dubbo SPI

    Dubbo中的扩展点

    指定名称的扩展点

    1. ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name");
  • 找到Protocol的全路径名称, 在/META-INF/dubbo/intenal

  • 在指定文件中找到“name”对应的实现类.

    自定义扩展点

  • 在resource/META-INF/dubbo/ org.apache.dubbo.rpc.cluster.LoadBalance

  • 实现扩展点

    1. public class msbDefineLoadBalance extends AbstractLoadBalance{
    2. @Override
    3. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    4. return null;
    5. }
    6. }
  • 测试

    1. //解析URL
    2. String loadbalance="random";
    3. //URL loadbalance="msbloadbalance"
    4. //loadlance=msbloadbalance
    5. LoadBalance loadBalance=ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadbalance);
    6. System.out.println(loadBalance);

    扩展点的特征

    在类级别标准@SPI(RandomLoadBalance.NAME).
    其中,括号内的数据,表示当前扩展点的默认扩展点。

    指定名称的扩展点的原理

  • 思考实现步骤的拆解

    自适应扩展点

    1. ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    激活扩展点

    1. ExtensionLoader.getExtensionLoader(Protocol.class).getActiveExtension