一、服务发布时序图
    dubbo之提供者服务发布过程详解 - 图1

    二、服务发布过程代码追踪
    1、ServiceConfig类:服务发布的入口

    1. public synchronized void export() {
    2. //配置检查
    3. checkAndUpdateSubConfigs();
    4. //是否需要导出,不需要的话,直接结束
    5. if (!shouldExport()) {
    6. return;
    7. }
    8. //是否需要延迟发布
    9. if (shouldDelay()) {
    10. //通过调度线程池完成延迟发布
    11. DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    12. } else {
    13. //发布
    14. doExport();
    15. }
    16. }
    17. private void doExportUrls() {
    18. //加载业务方配置的所有注册信息,zookeeper、redis、multicast、simple。通常使用的都是zk
    19. List<URL> registryURLs = loadRegistries(true);
    20. for (ProtocolConfig protocolConfig : protocols) {
    21. String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
    22. ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
    23. ApplicationModel.initProviderModel(pathKey, providerModel);
    24. //服务发布
    25. doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    26. }
    27. }
    28. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    29. //构造应用、模块、提供者的信息,解析MethodConfig配置
    30. ...
    31. //判断是否是泛型调用
    32. if (ProtocolUtils.isGeneric(generic)) {
    33. map.put(GENERIC_KEY, generic);
    34. map.put(METHODS_KEY, ANY_VALUE);
    35. } else {
    36. //正常调用设置拼接url的参数
    37. ...
    38. }
    39. //拼接url对象
    40. String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    41. Integer port = this.findConfigedPorts(protocolConfig, name, map);
    42. URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
    43. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
    44. url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    45. .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    46. }
    47. //导出服务,本地发布、远程发布
    48. //服务的作用域有三种:none、local、remote。为none标识不发布,通常默认是null,既发布本地又发布远程
    49. if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    50. //如果scope作用域不是remote,则发布本地服务,发布本地服务并不会开启端口,但是依然会走调用链。
    51. //invoker在export过程中,会被Protocol的包装类进行层层包装。
    52. if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
    53. exportLocal(url);
    54. }
    55. //如果scope作用域不是local,则发布远程服务,同时开启端口。开启端口是为了监听rpc消费者调用
    56. if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    57. if (!isOnlyInJvm() && logger.isInfoEnabled()) {
    58. logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
    59. }
    60. //注册地址是否为空,为空则说明是直连方式,否则不是
    61. if (CollectionUtils.isNotEmpty(registryURLs)) {
    62. for (URL registryURL : registryURLs) {
    63. //if protocol is only injvm ,not register
    64. if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
    65. continue;
    66. }
    67. url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
    68. URL monitorUrl = loadMonitor(registryURL);
    69. if (monitorUrl != null) {
    70. url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
    71. }
    72. if (logger.isInfoEnabled()) {
    73. logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
    74. }
    75. //For providers, this is used to enable custom proxy to generate invoker
    76. String proxy = url.getParameter(PROXY_KEY);
    77. if (StringUtils.isNotEmpty(proxy)) {
    78. registryURL = registryURL.addParameter(PROXY_KEY, proxy);
    79. }
    80. //将服务实现类封装成Invoker,PROXY_FACTORY的对应实例是ProxyFactory$Adaptive代理类,默认是JavassistProxyFactory
    81. Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    82. //包装Invoker
    83. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    84. //远程发布,protocol对应的实例是Protocol$Adaptive,这里的url是registry,所以会选
    85. //择RegistryProtocol实例,我们看一下调用过程
    86. Exporter<?> exporter = protocol.export(wrapperInvoker);
    87. exporters.add(exporter);
    88. }
    89. } else {
    90. Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
    91. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    92. Exporter<?> exporter = protocol.export(wrapperInvoker);
    93. exporters.add(exporter);
    94. }
    95. //元数据存储
    96. MetadataReportService metadataReportService = null;
    97. if ((metadataReportService = getMetadataReportService()) != null) {
    98. metadataReportService.publishProvider(url);
    99. }
    100. }
    101. }
    102. }

    2、Protocol$Adaptive:协议适配器类,只要看export方法

    1. public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    2. ...其他方法省略
    3. public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    4. if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    5. if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    6. org.apache.dubbo.common.URL url = arg0.getUrl();
    7. //这里传进来的url是registry,所以会得到RegistryProtocol的实例
    8. String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    9. if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    10. //获取具体的Protocol实现类,ExtensionLoader之前分析过,它会加载SPI接口的实现类,构造Class实例,
    11. //但不会实例化对象。实例化对象的过程是在getExtension()方法中完成的,详情见ExtensionLoader
    12. //源码分析
    13. org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    14. return extension.export(arg0);
    15. }
    16. }

    3、RegistryProtocol:注册协议类

    1. @Override
    2. public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    3. URL registryUrl = getRegistryUrl(originInvoker);
    4. // url to export locally
    5. URL providerUrl = getProviderUrl(originInvoker);
    6. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    7. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    8. overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    9. providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    10. //发布服务
    11. final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    12. // url to registry
    13. final Registry registry = getRegistry(originInvoker);
    14. final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    15. ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
    16. registryUrl, registeredProviderUrl);
    17. //to judge if we need to delay publish
    18. boolean register = registeredProviderUrl.getParameter("register", true);
    19. if (register) {
    20. //将服务信息注册到注册中心
    21. register(registryUrl, registeredProviderUrl);
    22. providerInvokerWrapper.setReg(true);
    23. }
    24. // Deprecated! Subscribe to override rules in 2.6.x or before.
    25. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    26. exporter.setRegisterUrl(registeredProviderUrl);
    27. exporter.setSubscribeUrl(overrideSubscribeUrl);
    28. //Ensure that a new exporter instance is returned every time export
    29. return new DestroyableExporter<>(exporter);
    30. }
    31. private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    32. String key = getCacheKey(originInvoker);
    33. return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    34. Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
    35. //protocol是Protocol$Adaptive实例,invoker中的url已经是dubbo协议了,所以最终会调用到DubboProtocol
    36. return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    37. });
    38. }
    39. //registryFactory指向的实例类是RegistryFactory$Adaptive,我们配置的是zookeeper,所以最终选择
    40. //ZookeeperRegistryFactory,这里就不详细介绍了,看下面的时序图
    41. public void register(URL registryUrl, URL registeredProviderUrl) {
    42. Registry registry = registryFactory.getRegistry(registryUrl);
    43. registry.register(registeredProviderUrl);
    44. }

    dubbo服务注册时序图:
    dubbo之提供者服务发布过程详解 - 图2

    4、DubboProtocol:dubbo协议类,默认实现

    1. @Override
    2. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    3. URL url = invoker.getUrl();
    4. // export service.
    5. String key = serviceKey(url);
    6. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    7. //exporter存储在map中,等到服务消费调用时,也会从该map中获取exporter执行业务逻辑
    8. exporterMap.put(key, exporter);
    9. //export an stub service for dispatching event
    10. ...代码省略
    11. //开启端口,监听消费
    12. openServer(url);
    13. optimizeSerialization(url);
    14. return exporter;
    15. }
    16. private void openServer(URL url) {
    17. //key的格式,ip:port,比如:192.168.56.1:20880
    18. String key = url.getAddress();
    19. //client can export a service which's only for server to invoke
    20. boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    21. if (isServer) {
    22. ExchangeServer server = serverMap.get(key);
    23. if (server == null) {
    24. //通过加锁和缓存,保证同一个ip:port只开启一次服务端口监听
    25. synchronized (this) {
    26. server = serverMap.get(key);
    27. if (server == null) {
    28. serverMap.put(key, createServer(url));
    29. }
    30. }
    31. } else {
    32. server.reset(url);
    33. }
    34. }
    35. }
    36. private ExchangeServer createServer(URL url) {
    37. //拼接url参数
    38. url = URLBuilder.from(url).addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    39. .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
    40. .addParameter(CODEC_KEY, DubboCodec.NAME).build();
    41. String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    42. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    43. throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    44. }
    45. ExchangeServer server;
    46. try {
    47. //绑定端口,继续深入代码,这个requestHandler跟服务响应过程密切相关哦
    48. server = Exchangers.bind(url, requestHandler);
    49. } catch (RemotingException e) {
    50. throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    51. }
    52. str = url.getParameter(CLIENT_KEY);
    53. if (str != null && str.length() > 0) {
    54. Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    55. if (!supportedTypes.contains(str)) {
    56. throw new RpcException("Unsupported client type: " + str);
    57. }
    58. }
    59. return server;
    60. }

    5、Exchangers:交换机

    1. public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    2. ...代码省略
    3. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    4. return getExchanger(url).bind(url, handler);
    5. }
    6. public static Exchanger getExchanger(URL url) {
    7. //默认情况下使用DEFAULT_EXCHANGER,即header
    8. String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    9. return getExchanger(type);
    10. }
    11. //看到这里又是ExtensionLoader,是不是有点晕呢?继续深入看代码。Exchanger接口下只有一个实现类:
    12. //HeaderExchanger,所以代码跳转至HeaderExchanger中
    13. public static Exchanger getExchanger(String type) {
    14. return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    15. }

    dubbo之提供者服务发布过程详解 - 图3

    6、HeaderExchanger:头部交换机

    1. @Override
    2. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    3. //Transporters是要进行选择通信中间件了,netty、mina等,默认是netty。最终会实例化NettyServer类,
    4. //并将NettyServer作为参数传给HeaderExchangeServer。HeaderExchangeServer就是dubbo服务发布期间
    5. //唯一的开启的通信服务类。
    6. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    7. }

    7、NettyTransporter:netty

    1. @Override
    2. public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    3. //到这一步就是精华了啊!这里将会涉及到dubbo线程池的选择。为什么dubbo要设置线程池呢,直接用netty
    4. //的io线程不好吗?netty在接收到消费者的调用请求之后,是自己处理还是交给业务线程池(dubbo线程池)
    5. //处理,有一套策略可配置,视应用而定。要是业务逻辑处理极快,直接使用netty的io线程未必不可,但是
    6. //若业务逻辑处理较慢,使用netty的io线程将会导致io线程阻塞,无法处理更多的连接,应用的并发量会很低。
    7. //这里就不着重介绍NettyServer了,总之到了这一步,就已经启动端口监听了,监听来自消费者的调用请求。
    8. //待会再写一篇着重NettyServer功能的。
    9. return new NettyServer(url, listener);
    10. }

    8、NettyServer:负责Netty服务的创建、关闭、通道管理

    1. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    2. // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    3. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    4. super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    5. }
    6. super是指AbstractServer类,ChannelHandlers.wrap(...)是用来选择、实例化dubbo线程池的。在dubbo中,
    7. 线程池也被封装成了ChannelHandler,顾名思义,通道处理器。
    8. public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    9. super(url, handler);
    10. localAddress = getUrl().toInetSocketAddress();
    11. String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    12. int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    13. if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
    14. bindIp = ANYHOST_VALUE;
    15. }
    16. bindAddress = new InetSocketAddress(bindIp, bindPort);
    17. this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    18. this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    19. try {
    20. //开启服务监听,由子类实现,若是netty的话,就有NettyServer实现
    21. doOpen();
    22. if (logger.isInfoEnabled()) {
    23. logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    24. }
    25. } catch (Throwable t) {
    26. throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
    27. + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    28. }
    29. //fixme replace this with better method
    30. DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    31. executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    32. }
    33. //NettyServer开启端口监听
    34. protected void doOpen() throws Throwable {
    35. bootstrap = new ServerBootstrap();
    36. //netty的boss/worker线程组,前者负责socket的连接事件,后者负责socket的读写事件
    37. bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    38. workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
    39. new DefaultThreadFactory("NettyServerWorker", true));
    40. //netty通道的处理器,通道上的读写连接事件均由该类处理
    41. final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    42. channels = nettyServerHandler.getChannels();
    43. bootstrap.group(bossGroup, workerGroup)
    44. .channel(NioServerSocketChannel.class) //选择io模式,这里选择Nio
    45. //tcp消息延迟设置,为true表示应用层一有消息便发送出去,而不必等到socket的输出缓冲区满了之后再发送
    46. .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    47. //??含义记不清了,待查证
    48. .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    49. //内存使用策略,池化的直接内存,池化:相当于缓存池,内存使用之后不会立即销毁,提高性能
    50. .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    51. .childHandler(new ChannelInitializer<NioSocketChannel>() {
    52. @Override
    53. protected void initChannel(NioSocketChannel ch) throws Exception {
    54. // FIXME: should we use getTimeout()?
    55. int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
    56. //消息编解码
    57. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
    58. ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    59. .addLast("decoder", adapter.getDecoder())
    60. .addLast("encoder", adapter.getEncoder())
    61. .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
    62. //设置业务处理器,nettyServerHandler是实际处理socket事件的地方
    63. .addLast("handler", nettyServerHandler);
    64. }
    65. });
    66. //绑定接口,服务端开启了监听
    67. ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    68. channelFuture.syncUninterruptibly();
    69. channel = channelFuture.channel();
    70. }

    9、ChannelHandlers:通道处理器,用以选择、封装具体的dubbo线程池

    1. protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    2. return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
    3. .getAdaptiveExtension().dispatch(handler, url)));
    4. }
    5. 可以看出,一个线程池最终会经过HeartbeatHandlerMultiMessageHandler的包装。
    6. ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()会得到Dispatcher$Adaptive类,
    7. 默认的线程策略是all,所以最终调用AllDispatcher类。

    10、AllDispatcher

    1. public class AllDispatcher implements Dispatcher {
    2. public static final String NAME = "all";
    3. @Override
    4. public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    5. //实例化AllChannelHandler
    6. return new AllChannelHandler(handler, url);
    7. }
    8. }

    11、AllChannelHandler

    1. public class AllChannelHandler extends WrappedChannelHandler {
    2. public AllChannelHandler(ChannelHandler handler, URL url) {
    3. //super指WrappedChannelHandler
    4. super(handler, url);
    5. }
    6. ...
    7. }
    8. AllChannelHandlerdubbo默认的线程池策略,但最终所有的任务都要交由线程池处理。那么AllChannelHandler
    9. 的线程池实例在哪呢?在其父类WrappedChannelHandler的构造函数中

    12、WrappedChannelHandler

    1. public WrappedChannelHandler(ChannelHandler handler, URL url) {
    2. this.handler = handler;
    3. this.url = url;
    4. //获取线程池实例,线程池默认策略是固定线程池fixed,即核心线程数和最大线程数相同
    5. executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    6. String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    7. if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
    8. componentKey = CONSUMER_SIDE;
    9. }
    10. //将executor实例存在dataStore中,在dubbo服务优雅停机时,会从dataStore中取出executor关闭
    11. DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    12. dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    13. }