一、服务发布时序图
二、服务发布过程代码追踪
1、ServiceConfig类:服务发布的入口
public synchronized void export() {
//配置检查
checkAndUpdateSubConfigs();
//是否需要导出,不需要的话,直接结束
if (!shouldExport()) {
return;
}
//是否需要延迟发布
if (shouldDelay()) {
//通过调度线程池完成延迟发布
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
//发布
doExport();
}
}
private void doExportUrls() {
//加载业务方配置的所有注册信息,zookeeper、redis、multicast、simple。通常使用的都是zk
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//服务发布
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//构造应用、模块、提供者的信息,解析MethodConfig配置
...
//判断是否是泛型调用
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
//正常调用设置拼接url的参数
...
}
//拼接url对象
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//导出服务,本地发布、远程发布
//服务的作用域有三种:none、local、remote。为none标识不发布,通常默认是null,既发布本地又发布远程
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
//如果scope作用域不是remote,则发布本地服务,发布本地服务并不会开启端口,但是依然会走调用链。
//invoker在export过程中,会被Protocol的包装类进行层层包装。
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果scope作用域不是local,则发布远程服务,同时开启端口。开启端口是为了监听rpc消费者调用
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//注册地址是否为空,为空则说明是直连方式,否则不是
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//将服务实现类封装成Invoker,PROXY_FACTORY的对应实例是ProxyFactory$Adaptive代理类,默认是JavassistProxyFactory
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//包装Invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//远程发布,protocol对应的实例是Protocol$Adaptive,这里的url是registry,所以会选
//择RegistryProtocol实例,我们看一下调用过程
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
//元数据存储
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
}
2、Protocol$Adaptive:协议适配器类,只要看export方法
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
...其他方法省略
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
//这里传进来的url是registry,所以会得到RegistryProtocol的实例
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
//获取具体的Protocol实现类,ExtensionLoader之前分析过,它会加载SPI接口的实现类,构造Class实例,
//但不会实例化对象。实例化对象的过程是在getExtension()方法中完成的,详情见ExtensionLoader
//源码分析
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
3、RegistryProtocol:注册协议类
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//发布服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//将服务信息注册到注册中心
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protocol是Protocol$Adaptive实例,invoker中的url已经是dubbo协议了,所以最终会调用到DubboProtocol
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
//registryFactory指向的实例类是RegistryFactory$Adaptive,我们配置的是zookeeper,所以最终选择
//ZookeeperRegistryFactory,这里就不详细介绍了,看下面的时序图
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
dubbo服务注册时序图:
4、DubboProtocol:dubbo协议类,默认实现
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//exporter存储在map中,等到服务消费调用时,也会从该map中获取exporter执行业务逻辑
exporterMap.put(key, exporter);
//export an stub service for dispatching event
...代码省略
//开启端口,监听消费
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
//key的格式,ip:port,比如:192.168.56.1:20880
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//通过加锁和缓存,保证同一个ip:port只开启一次服务端口监听
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//拼接url参数
url = URLBuilder.from(url).addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME).build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
//绑定端口,继续深入代码,这个requestHandler跟服务响应过程密切相关哦
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
5、Exchangers:交换机
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
...代码省略
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
//默认情况下使用DEFAULT_EXCHANGER,即header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
//看到这里又是ExtensionLoader,是不是有点晕呢?继续深入看代码。Exchanger接口下只有一个实现类:
//HeaderExchanger,所以代码跳转至HeaderExchanger中
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
6、HeaderExchanger:头部交换机
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//Transporters是要进行选择通信中间件了,netty、mina等,默认是netty。最终会实例化NettyServer类,
//并将NettyServer作为参数传给HeaderExchangeServer。HeaderExchangeServer就是dubbo服务发布期间
//唯一的开启的通信服务类。
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
7、NettyTransporter:netty
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
//到这一步就是精华了啊!这里将会涉及到dubbo线程池的选择。为什么dubbo要设置线程池呢,直接用netty
//的io线程不好吗?netty在接收到消费者的调用请求之后,是自己处理还是交给业务线程池(dubbo线程池)
//处理,有一套策略可配置,视应用而定。要是业务逻辑处理极快,直接使用netty的io线程未必不可,但是
//若业务逻辑处理较慢,使用netty的io线程将会导致io线程阻塞,无法处理更多的连接,应用的并发量会很低。
//这里就不着重介绍NettyServer了,总之到了这一步,就已经启动端口监听了,监听来自消费者的调用请求。
//待会再写一篇着重NettyServer功能的。
return new NettyServer(url, listener);
}
8、NettyServer:负责Netty服务的创建、关闭、通道管理
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
super是指AbstractServer类,ChannelHandlers.wrap(...)是用来选择、实例化dubbo线程池的。在dubbo中,
线程池也被封装成了ChannelHandler,顾名思义,通道处理器。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
//开启服务监听,由子类实现,若是netty的话,就有NettyServer实现
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
//NettyServer开启端口监听
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
//netty的boss/worker线程组,前者负责socket的连接事件,后者负责socket的读写事件
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
//netty通道的处理器,通道上的读写连接事件均由该类处理
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //选择io模式,这里选择Nio
//tcp消息延迟设置,为true表示应用层一有消息便发送出去,而不必等到socket的输出缓冲区满了之后再发送
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
//??含义记不清了,待查证
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
//内存使用策略,池化的直接内存,池化:相当于缓存池,内存使用之后不会立即销毁,提高性能
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
//消息编解码
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
//设置业务处理器,nettyServerHandler是实际处理socket事件的地方
.addLast("handler", nettyServerHandler);
}
});
//绑定接口,服务端开启了监听
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
9、ChannelHandlers:通道处理器,用以选择、封装具体的dubbo线程池
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
可以看出,一个线程池最终会经过HeartbeatHandler、MultiMessageHandler的包装。
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()会得到Dispatcher$Adaptive类,
默认的线程策略是all,所以最终调用AllDispatcher类。
10、AllDispatcher
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//实例化AllChannelHandler
return new AllChannelHandler(handler, url);
}
}
11、AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
//super指WrappedChannelHandler
super(handler, url);
}
...
}
AllChannelHandler是dubbo默认的线程池策略,但最终所有的任务都要交由线程池处理。那么AllChannelHandler
的线程池实例在哪呢?在其父类WrappedChannelHandler的构造函数中
12、WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
//获取线程池实例,线程池默认策略是固定线程池fixed,即核心线程数和最大线程数相同
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
//将executor实例存在dataStore中,在dubbo服务优雅停机时,会从dataStore中取出executor关闭
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}