一、服务发布时序图
二、服务发布过程代码追踪
1、ServiceConfig类:服务发布的入口
public synchronized void export() {//配置检查checkAndUpdateSubConfigs();//是否需要导出,不需要的话,直接结束if (!shouldExport()) {return;}//是否需要延迟发布if (shouldDelay()) {//通过调度线程池完成延迟发布DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);} else {//发布doExport();}}private void doExportUrls() {//加载业务方配置的所有注册信息,zookeeper、redis、multicast、simple。通常使用的都是zkList<URL> registryURLs = loadRegistries(true);for (ProtocolConfig protocolConfig : protocols) {String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);ApplicationModel.initProviderModel(pathKey, providerModel);//服务发布doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {//构造应用、模块、提供者的信息,解析MethodConfig配置...//判断是否是泛型调用if (ProtocolUtils.isGeneric(generic)) {map.put(GENERIC_KEY, generic);map.put(METHODS_KEY, ANY_VALUE);} else {//正常调用设置拼接url的参数...}//拼接url对象String host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}//导出服务,本地发布、远程发布//服务的作用域有三种:none、local、remote。为none标识不发布,通常默认是null,既发布本地又发布远程if (!SCOPE_NONE.equalsIgnoreCase(scope)) {//如果scope作用域不是remote,则发布本地服务,发布本地服务并不会开启端口,但是依然会走调用链。//invoker在export过程中,会被Protocol的包装类进行层层包装。if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {exportLocal(url);}//如果scope作用域不是local,则发布远程服务,同时开启端口。开启端口是为了监听rpc消费者调用if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {if (!isOnlyInJvm() && logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}//注册地址是否为空,为空则说明是直连方式,否则不是if (CollectionUtils.isNotEmpty(registryURLs)) {for (URL registryURL : registryURLs) {//if protocol is only injvm ,not registerif (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {continue;}url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);}//For providers, this is used to enable custom proxy to generate invokerString proxy = url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(PROXY_KEY, proxy);}//将服务实现类封装成Invoker,PROXY_FACTORY的对应实例是ProxyFactory$Adaptive代理类,默认是JavassistProxyFactoryInvoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));//包装InvokerDelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);//远程发布,protocol对应的实例是Protocol$Adaptive,这里的url是registry,所以会选//择RegistryProtocol实例,我们看一下调用过程Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}} else {Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}//元数据存储MetadataReportService metadataReportService = null;if ((metadataReportService = getMetadataReportService()) != null) {metadataReportService.publishProvider(url);}}}}
2、Protocol$Adaptive:协议适配器类,只要看export方法
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {...其他方法省略public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();//这里传进来的url是registry,所以会得到RegistryProtocol的实例String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");//获取具体的Protocol实现类,ExtensionLoader之前分析过,它会加载SPI接口的实现类,构造Class实例,//但不会实例化对象。实例化对象的过程是在getExtension()方法中完成的,详情见ExtensionLoader//源码分析org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}}
3、RegistryProtocol:注册协议类
@Overridepublic <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {URL registryUrl = getRegistryUrl(originInvoker);// url to export locallyURL providerUrl = getProviderUrl(originInvoker);final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//发布服务final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registryfinal Registry registry = getRegistry(originInvoker);final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,registryUrl, registeredProviderUrl);//to judge if we need to delay publishboolean register = registeredProviderUrl.getParameter("register", true);if (register) {//将服务信息注册到注册中心register(registryUrl, registeredProviderUrl);providerInvokerWrapper.setReg(true);}// Deprecated! Subscribe to override rules in 2.6.x or before.registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);exporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporter<>(exporter);}private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {String key = getCacheKey(originInvoker);return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);//protocol是Protocol$Adaptive实例,invoker中的url已经是dubbo协议了,所以最终会调用到DubboProtocolreturn new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});}//registryFactory指向的实例类是RegistryFactory$Adaptive,我们配置的是zookeeper,所以最终选择//ZookeeperRegistryFactory,这里就不详细介绍了,看下面的时序图public void register(URL registryUrl, URL registeredProviderUrl) {Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);}
dubbo服务注册时序图:
4、DubboProtocol:dubbo协议类,默认实现
@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);//exporter存储在map中,等到服务消费调用时,也会从该map中获取exporter执行业务逻辑exporterMap.put(key, exporter);//export an stub service for dispatching event...代码省略//开启端口,监听消费openServer(url);optimizeSerialization(url);return exporter;}private void openServer(URL url) {//key的格式,ip:port,比如:192.168.56.1:20880String key = url.getAddress();//client can export a service which's only for server to invokeboolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {ExchangeServer server = serverMap.get(key);if (server == null) {//通过加锁和缓存,保证同一个ip:port只开启一次服务端口监听synchronized (this) {server = serverMap.get(key);if (server == null) {serverMap.put(key, createServer(url));}}} else {server.reset(url);}}}private ExchangeServer createServer(URL url) {//拼接url参数url = URLBuilder.from(url).addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()).addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME).build();String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported server type: " + str + ", url: " + url);}ExchangeServer server;try {//绑定端口,继续深入代码,这个requestHandler跟服务响应过程密切相关哦server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return server;}
5、Exchangers:交换机
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {...代码省略url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");return getExchanger(url).bind(url, handler);}public static Exchanger getExchanger(URL url) {//默认情况下使用DEFAULT_EXCHANGER,即headerString type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);}//看到这里又是ExtensionLoader,是不是有点晕呢?继续深入看代码。Exchanger接口下只有一个实现类://HeaderExchanger,所以代码跳转至HeaderExchanger中public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}
6、HeaderExchanger:头部交换机
@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//Transporters是要进行选择通信中间件了,netty、mina等,默认是netty。最终会实例化NettyServer类,//并将NettyServer作为参数传给HeaderExchangeServer。HeaderExchangeServer就是dubbo服务发布期间//唯一的开启的通信服务类。return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}
7、NettyTransporter:netty
@Overridepublic Server bind(URL url, ChannelHandler listener) throws RemotingException {//到这一步就是精华了啊!这里将会涉及到dubbo线程池的选择。为什么dubbo要设置线程池呢,直接用netty//的io线程不好吗?netty在接收到消费者的调用请求之后,是自己处理还是交给业务线程池(dubbo线程池)//处理,有一套策略可配置,视应用而定。要是业务逻辑处理极快,直接使用netty的io线程未必不可,但是//若业务逻辑处理较慢,使用netty的io线程将会导致io线程阻塞,无法处理更多的连接,应用的并发量会很低。//这里就不着重介绍NettyServer了,总之到了这一步,就已经启动端口监听了,监听来自消费者的调用请求。//待会再写一篇着重NettyServer功能的。return new NettyServer(url, listener);}
8、NettyServer:负责Netty服务的创建、关闭、通道管理
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handlersuper(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}super是指AbstractServer类,ChannelHandlers.wrap(...)是用来选择、实例化dubbo线程池的。在dubbo中,线程池也被封装成了ChannelHandler,顾名思义,通道处理器。public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);localAddress = getUrl().toInetSocketAddress();String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}bindAddress = new InetSocketAddress(bindIp, bindPort);this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);try {//开启服务监听,由子类实现,若是netty的话,就有NettyServer实现doOpen();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}//fixme replace this with better methodDataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}//NettyServer开启端口监听protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();//netty的boss/worker线程组,前者负责socket的连接事件,后者负责socket的读写事件bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));//netty通道的处理器,通道上的读写连接事件均由该类处理final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //选择io模式,这里选择Nio//tcp消息延迟设置,为true表示应用层一有消息便发送出去,而不必等到socket的输出缓冲区满了之后再发送.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)//??含义记不清了,待查证.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)//内存使用策略,池化的直接内存,池化:相当于缓存池,内存使用之后不会立即销毁,提高性能.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());//消息编解码NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))//设置业务处理器,nettyServerHandler是实际处理socket事件的地方.addLast("handler", nettyServerHandler);}});//绑定接口,服务端开启了监听ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
9、ChannelHandlers:通道处理器,用以选择、封装具体的dubbo线程池
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}可以看出,一个线程池最终会经过HeartbeatHandler、MultiMessageHandler的包装。ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()会得到Dispatcher$Adaptive类,默认的线程策略是all,所以最终调用AllDispatcher类。
10、AllDispatcher
public class AllDispatcher implements Dispatcher {public static final String NAME = "all";@Overridepublic ChannelHandler dispatch(ChannelHandler handler, URL url) {//实例化AllChannelHandlerreturn new AllChannelHandler(handler, url);}}
11、AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {//super指WrappedChannelHandlersuper(handler, url);}...}AllChannelHandler是dubbo默认的线程池策略,但最终所有的任务都要交由线程池处理。那么AllChannelHandler的线程池实例在哪呢?在其父类WrappedChannelHandler的构造函数中
12、WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;//获取线程池实例,线程池默认策略是固定线程池fixed,即核心线程数和最大线程数相同executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}//将executor实例存在dataStore中,在dubbo服务优雅停机时,会从dataStore中取出executor关闭DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);}
