一、启动流程概述
启动的入口在ReferenceConfig类的get()方法,消费者启动流程包含以下几步:
1. 构建Registry实例、RegistryDirectory实例
2. 向注册中心注册消费者信息
3. 构建路由规则链
4. 向注册中心订阅提供者信息
5. 将订阅的提供者信息转换为DubboInvoker,并且开启通信服务(默认是netty,消费者在启动时会与多个提供者之间都建立连接,但缺省情况下同一个提供者下的所有rpc调用共用一个连接,但是也可以配置为不共用)
6.将构建好的Invoker实例封装进集群容错策略类中
7.将构建好的集群容错策略实例封装在InvokerInvocationHandler类中
consumer://192.168.56.1/service.GreetingServiceAsync?application=first-dubbo-consumer&async=true&dubbo=2.0.2&group=dubbo&interface=service.GreetingServiceAsync&lazy=false&methods=sayHello,testGeneric&pid=13912&release=2.7.3&revision=1.0.0&side=consumer&sticky=false&timeout=3000×tamp=1581562335685&version=1.0.0
二、启动时序图
图1.0
三、源码分析
1、ReferenceConfig
代码1.0
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
//初始化
init();
}
return ref;
}
private void init() {
//省略的代码是用来构造参数map的,包括消费者的应用信息、模块信息、要调用的接口
...
//创建的代理类才是调用具体方法时的入口。订阅的提供者信息将会被转化为DubboInvoker,
//DubboInvoker会被层层包装,最后封装在集群容错策略类中。dubbo最后对集群容错策略
//实例类进行代理封装,代理类才是实际的调用入口。细节随后再说
ref = createProxy(map);
...
}
private T createProxy(Map<String, String> map) {
//判断是否是本地引用,如果是的话,就不用发起远程调用了。但是本地调用dubbo服务,也不是直接调用
//实现类,消费者和提供者还是会经过各种包装类、过滤类,最终才调用到具体实现类
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
//直连方式
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
//加载注册地址
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
//把map转为字符串添加进url中
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
//如果注册中心只有一个则走这段代码,否则走下面代码。但是流程和原理是相通的。
if (urls.size() == 1) {
//REF_PROTOCOL引用的是Protocol$Adaptive,这里获取的是RegistryProtocol,但是在获取
//RegistryProtocol的过程中,ExtensionLoader会对RegistryProtocol实例进行包装,包装
//类就是ProtocolListenerWrapper、QosProtocolWrapper、ProtocolFilterWrapper。在
//ProtocolFilterWrapper中还会经过filter过滤器的封装
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
//根据invoker创建代理类,PROXY_FACTORY引用的是ProxyFactory$Adaptive,默认是JavassistProxyFactory
//1)invoker实例指向的是MockClusterInvoker
//2)PROXY_FACTORY.getProxy(invoker)生成的实例指向的是InvokerInvocationHandler。为什么
//是InvokerInvocationHandler呢,自己去看一眼JavassistProxyFactory的代码便知
//参见图1.1
return (T) PROXY_FACTORY.getProxy(invoker);
}
图1.1
2、RegistryProtocol
我们看一下这个实例:
图2.0
包装Wrapper类代码:
代码2.0
public class ProtocolListenerWrapper implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//这时的url信息是注册信息,以registry开头
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//protocol指向QosProtocolWrapper
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, INVOKER_LISTENER_KEY)));
}
}
public class QosProtocolWrapper implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
//protocol指向ProtocolFilterWrapper
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
}
public class ProtocolFilterWrapper implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
//protocol指向RegistryProtocol
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
//该方法用来构造过滤器,将invoker实例用各种filter封装。在filter中完成相应功能
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
public void destroy() {
invoker.destroy();
}
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker;
private final List<Filter> filters;
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// onResponse callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, filterInvoker, invocation);
} else {
listener.onError(t, filterInvoker, invocation);
}
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
});
return asyncResult;
}
public Class<T> getInterface() {
return filterInvoker.getInterface();
}
public URL getUrl() {
return filterInvoker.getUrl();
}
public boolean isAvailable() {
return filterInvoker.isAvailable();
}
public void destroy() {
filterInvoker.destroy();
}
}
}
RegistryProtocol代码:
代码2.1
public class RegistryProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//获取注册实例,如果是zk的话,registry指向ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//cluster指向的是Cluster$Adaptive
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//directory是目录,什么意思呢?通常微服务应用会部署多个实例,也就是说同一个api有多个服务者,
//那么怎么管理这些服务者呢?自然是通过相应Directory实例管理了
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
//向注册中心注册消费者信息
registry.register(directory.getRegisteredConsumerUrl());
}
//构建路由链,什么是路由呢?一个接口有十个提供者,业务上要求消费者只调用某几个提供者或者来自某
//个消费者的调用只调用某几个消费者,如何完成这些需求?通过路由规则
directory.buildRouterChain(subscribeUrl);
//订阅提供者信息。
//subscribeUrl.addParameter增加了类别参数,分别是providers、configurators、routers,将会
//从注册中心上订阅这三种类型的数据,并将订阅的信息转换成Invoker,还会开启NettyClient客户端,
//与服务者通信。这里的代码很绕,调用链参见图2.1,代码参加2.2
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//cluster指向Cluster$Adaptive,就是把directory封装在集群容错类中而已。看图就行,参见图2.0
//invoker指向的是MockClusterInvoker实例
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
}
图2.0
图2.1
RegistryDirectory:
代码2.2
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
//这里是重点,protocol指向的是Protocol$Adaptive,到了这一步,url的协议变
//成了dubbo协议,所以最终会调用DubboProtocol,生成DubboInvoker。
//但是这个DubboInvoker肯定是要被ProtocolListenerWrapper、QosProtocolWrapper、
//ProtocolFilterWrapper以及Filter过滤类进行封装的,这个过程参见RegistryProtocol
//实例化的过程。我们来看一下DubboProtocol的代码
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
DubboProtocol:
代码2.3
public abstract class AbstractProtocol implements Protocol {
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//protocolBindingRefer由具体子类实现,从这里可以看出,从dubbo2.7.3分支开始,
//消费者端的调用都是异步的了。消费者发起rpc之后,异步获取调用结果
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
}
public class DubboProtocol extends AbstractProtocol {
//DubboProtocol的refer方法继承自父类
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
//创建DubboInvoker,创建实例的同时还要创建netty客户端,开启通信
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
//1、微服务通常会部署多个实例,每个实例都是一个提供者
//2、消费者在启动时,会与每个提供者都建立连接,所以消费者会维护多个连接
//3、一个提供者尝尝会提供多个接口,消费者是针对每个接口都建立一个连接还是所有接口共用一个连接?
//缺省情况下,是同一个提供者下的所有接口共用一个连接。当然也可以设置不共享
private ExchangeClient[] getClients(URL url) {
//同一个提供者下的所有rpc调用是否共用一个连接
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
//xml的配置方式优先级最高
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
//获取共享连接
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
//获取共享链接
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else {
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
locks.remove(key);
return clients;
}
}
//构建共享连接
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new ArrayList<>();
for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
//初始化一个链接
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//创建netty客户端,这部分代码参见代码2.4
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}
Exchangers:
代码2.4
public class Exchangers {
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
//哈哈,都是dubbo spi。得到的实例是HeaderExchanger
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
}
public class HeaderExchanger implements Exchanger {
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//封装链如下:requestHandler -> HeaderExchangeHandler -> DecodeHandler ->
// --> AllChannelHandler --> HeartbeatHandler --> MultiMessageHandler,
//MultiMessageHandler最终赋值给了NettyClient的handler属性,NettyClient也作为参数
//传给了NettyClientHandler。
//requestHandler是DubboProtocol中的属性,也是最终通道事件处理的地方
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
public class Transporters {
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//getTransporter()得到的是NettyTransporter实例
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
//默认是NettyTransporter
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
}
public class NettyTransporter implements Transporter {
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
//实例化NettyClient的内容也很精彩,参加代码2.5
return new NettyClient(url, listener);
}
}
代码2.5
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
//wrapChannelHandler方法会创建dubbo线程池策略实例,用以处理netty通道上的连接、读、写等事件。
//线程池策略实例将会被MultiMessageHandler->HeartbeatHandler包装
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
//nettyClientHandler这是通信的入口
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
// copy reference
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
// just add new valid channel to NettyChannel's cache
if (!isConnected()) {
//future.cancel(true);
}
}
}
}
public abstract class AbstractClient extends AbstractEndpoint implements Client {
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
}
}
public class ChannelHandlers {
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
//spi机制创建线程池策略实例,默认是all,即AllChannelHandler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
//super指WrappedChannelHandler
super(handler, url);
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
//既然是线程池策略那当然要创建线程池了,又是spi,默认是fixed线程池,线程池的核心线程数和
//最大线程数相同,默认是200
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
componentKey = CONSUMER_SIDE;
}
//线程池实例存储起来,优雅关机时要关闭线程池
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}