一、启动流程概述
启动的入口在ReferenceConfig类的get()方法,消费者启动流程包含以下几步:
1. 构建Registry实例、RegistryDirectory实例
2. 向注册中心注册消费者信息
3. 构建路由规则链
4. 向注册中心订阅提供者信息
5. 将订阅的提供者信息转换为DubboInvoker,并且开启通信服务(默认是netty,消费者在启动时会与多个提供者之间都建立连接,但缺省情况下同一个提供者下的所有rpc调用共用一个连接,但是也可以配置为不共用)
6.将构建好的Invoker实例封装进集群容错策略类中
7.将构建好的集群容错策略实例封装在InvokerInvocationHandler类中
consumer://192.168.56.1/service.GreetingServiceAsync?application=first-dubbo-consumer&async=true&dubbo=2.0.2&group=dubbo&interface=service.GreetingServiceAsync&lazy=false&methods=sayHello,testGeneric&pid=13912&release=2.7.3&revision=1.0.0&side=consumer&sticky=false&timeout=3000×tamp=1581562335685&version=1.0.0
二、启动时序图
图1.0
三、源码分析
1、ReferenceConfig
代码1.0
public synchronized T get() {checkAndUpdateSubConfigs();if (destroyed) {throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");}if (ref == null) {//初始化init();}return ref;}private void init() {//省略的代码是用来构造参数map的,包括消费者的应用信息、模块信息、要调用的接口...//创建的代理类才是调用具体方法时的入口。订阅的提供者信息将会被转化为DubboInvoker,//DubboInvoker会被层层包装,最后封装在集群容错策略类中。dubbo最后对集群容错策略//实例类进行代理封装,代理类才是实际的调用入口。细节随后再说ref = createProxy(map);...}private T createProxy(Map<String, String> map) {//判断是否是本地引用,如果是的话,就不用发起远程调用了。但是本地调用dubbo服务,也不是直接调用//实现类,消费者和提供者还是会经过各种包装类、过滤类,最终才调用到具体实现类if (shouldJvmRefer(map)) {URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);invoker = REF_PROTOCOL.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {urls.clear(); // reference retry init will add url to urls, lead to OOM//直连方式if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (StringUtils.isEmpty(url.getPath())) {url = url.setPath(interfaceName);}if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}} else { // assemble URL from register center's configuration// if protocols not injvm checkRegistryif (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){checkRegistry();//加载注册地址List<URL> us = loadRegistries(false);if (CollectionUtils.isNotEmpty(us)) {for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}//把map转为字符串添加进url中urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));}}if (urls.isEmpty()) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}}//如果注册中心只有一个则走这段代码,否则走下面代码。但是流程和原理是相通的。if (urls.size() == 1) {//REF_PROTOCOL引用的是Protocol$Adaptive,这里获取的是RegistryProtocol,但是在获取//RegistryProtocol的过程中,ExtensionLoader会对RegistryProtocol实例进行包装,包装//类就是ProtocolListenerWrapper、QosProtocolWrapper、ProtocolFilterWrapper。在//ProtocolFilterWrapper中还会经过filter过滤器的封装invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));} else {List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(REF_PROTOCOL.refer(interfaceClass, url));if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use RegistryAwareCluster only when register's CLUSTER is availableURL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invokerinvoker = CLUSTER.join(new StaticDirectory(u, invokers));} else { // not a registry url, must be direct invoke.invoker = CLUSTER.join(new StaticDirectory(invokers));}}}if (shouldCheck() && !invoker.isAvailable()) {throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());}if (logger.isInfoEnabled()) {logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());}//根据invoker创建代理类,PROXY_FACTORY引用的是ProxyFactory$Adaptive,默认是JavassistProxyFactory//1)invoker实例指向的是MockClusterInvoker//2)PROXY_FACTORY.getProxy(invoker)生成的实例指向的是InvokerInvocationHandler。为什么//是InvokerInvocationHandler呢,自己去看一眼JavassistProxyFactory的代码便知//参见图1.1return (T) PROXY_FACTORY.getProxy(invoker);}

图1.1
2、RegistryProtocol
我们看一下这个实例:
图2.0
包装Wrapper类代码:
代码2.0
public class ProtocolListenerWrapper implements Protocol {public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//这时的url信息是注册信息,以registry开头if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {//protocol指向QosProtocolWrapperreturn protocol.refer(type, url);}return new ListenerInvokerWrapper<T>(protocol.refer(type, url),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, INVOKER_LISTENER_KEY)));}}public class QosProtocolWrapper implements Protocol {public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {startQosServer(url);//protocol指向ProtocolFilterWrapperreturn protocol.refer(type, url);}return protocol.refer(type, url);}}public class ProtocolFilterWrapper implements Protocol {public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {return protocol.refer(type, url);}//protocol指向RegistryProtocolreturn buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);}//该方法用来构造过滤器,将invoker实例用各种filter封装。在filter中完成相应功能private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new Invoker<T>() {public Class<T> getInterface() {return invoker.getInterface();}public URL getUrl() {return invoker.getUrl();}public boolean isAvailable() {return invoker.isAvailable();}public Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {asyncResult = filter.invoke(next, invocation);} catch (Exception e) {// onError callbackif (filter instanceof ListenableFilter) {Filter.Listener listener = ((ListenableFilter) filter).listener();if (listener != null) {listener.onError(e, invoker, invocation);}}throw e;}return asyncResult;}public void destroy() {invoker.destroy();}public String toString() {return invoker.toString();}};}}return new CallbackRegistrationInvoker<>(last, filters);}static class CallbackRegistrationInvoker<T> implements Invoker<T> {private final Invoker<T> filterInvoker;private final List<Filter> filters;public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {this.filterInvoker = filterInvoker;this.filters = filters;}public Result invoke(Invocation invocation) throws RpcException {Result asyncResult = filterInvoker.invoke(invocation);asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {for (int i = filters.size() - 1; i >= 0; i--) {Filter filter = filters.get(i);// onResponse callbackif (filter instanceof ListenableFilter) {Filter.Listener listener = ((ListenableFilter) filter).listener();if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} else {filter.onResponse(r, filterInvoker, invocation);}}});return asyncResult;}public Class<T> getInterface() {return filterInvoker.getInterface();}public URL getUrl() {return filterInvoker.getUrl();}public boolean isAvailable() {return filterInvoker.isAvailable();}public void destroy() {filterInvoker.destroy();}}}
RegistryProtocol代码:
代码2.1
public class RegistryProtocol implements Protocol {public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {url = URLBuilder.from(url).setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY).build();//获取注册实例,如果是zk的话,registry指向ZookeeperRegistryRegistry registry = registryFactory.getRegistry(url);if (RegistryService.class.equals(type)) {return proxyFactory.getInvoker((T) registry, type, url);}// group="a,b" or group="*"Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));String group = qs.get(GROUP_KEY);if (group != null && group.length() > 0) {if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {return doRefer(getMergeableCluster(), registry, type, url);}}//cluster指向的是Cluster$Adaptivereturn doRefer(cluster, registry, type, url);}private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {//directory是目录,什么意思呢?通常微服务应用会部署多个实例,也就是说同一个api有多个服务者,//那么怎么管理这些服务者呢?自然是通过相应Directory实例管理了RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));//向注册中心注册消费者信息registry.register(directory.getRegisteredConsumerUrl());}//构建路由链,什么是路由呢?一个接口有十个提供者,业务上要求消费者只调用某几个提供者或者来自某//个消费者的调用只调用某几个消费者,如何完成这些需求?通过路由规则directory.buildRouterChain(subscribeUrl);//订阅提供者信息。//subscribeUrl.addParameter增加了类别参数,分别是providers、configurators、routers,将会//从注册中心上订阅这三种类型的数据,并将订阅的信息转换成Invoker,还会开启NettyClient客户端,//与服务者通信。这里的代码很绕,调用链参见图2.1,代码参加2.2directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));//cluster指向Cluster$Adaptive,就是把directory封装在集群容错类中而已。看图就行,参见图2.0//invoker指向的是MockClusterInvoker实例Invoker invoker = cluster.join(directory);ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);return invoker;}}

图2.0

图2.1
RegistryDirectory:
代码2.2
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}Set<String> keys = new HashSet<>();String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {// If protocol is configured at the reference side, only the matching protocol is selectedif (queryProtocols != null && queryProtocols.length() > 0) {boolean accept = false;String[] acceptProtocols = queryProtocols.split(",");for (String acceptProtocol : acceptProtocols) {if (providerUrl.getProtocol().equals(acceptProtocol)) {accept = true;break;}}if (!accept) {continue;}}if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {continue;}if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));continue;}URL url = mergeUrl(providerUrl);String key = url.toFullString(); // The parameter urls are sortedif (keys.contains(key)) { // Repeated urlcontinue;}keys.add(key);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer againMap<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local referenceInvoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {//这里是重点,protocol指向的是Protocol$Adaptive,到了这一步,url的协议变//成了dubbo协议,所以最终会调用DubboProtocol,生成DubboInvoker。//但是这个DubboInvoker肯定是要被ProtocolListenerWrapper、QosProtocolWrapper、//ProtocolFilterWrapper以及Filter过滤类进行封装的,这个过程参见RegistryProtocol//实例化的过程。我们来看一下DubboProtocol的代码invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(key, invoker);}} else {newUrlInvokerMap.put(key, invoker);}}keys.clear();return newUrlInvokerMap;}
DubboProtocol:
代码2.3
public abstract class AbstractProtocol implements Protocol {public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//protocolBindingRefer由具体子类实现,从这里可以看出,从dubbo2.7.3分支开始,//消费者端的调用都是异步的了。消费者发起rpc之后,异步获取调用结果return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));}}public class DubboProtocol extends AbstractProtocol {//DubboProtocol的refer方法继承自父类public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);//创建DubboInvoker,创建实例的同时还要创建netty客户端,开启通信DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}//1、微服务通常会部署多个实例,每个实例都是一个提供者//2、消费者在启动时,会与每个提供者都建立连接,所以消费者会维护多个连接//3、一个提供者尝尝会提供多个接口,消费者是针对每个接口都建立一个连接还是所有接口共用一个连接?//缺省情况下,是同一个提供者下的所有接口共用一个连接。当然也可以设置不共享private ExchangeClient[] getClients(URL url) {//同一个提供者下的所有rpc调用是否共用一个连接boolean useShareConnect = false;int connections = url.getParameter(CONNECTIONS_KEY, 0);List<ReferenceCountExchangeClient> shareClients = null;// if not configured, connection is shared, otherwise, one connection for one serviceif (connections == 0) {useShareConnect = true;//xml的配置方式优先级最高String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);//获取共享连接shareClients = getSharedClient(url, connections);}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (useShareConnect) {clients[i] = shareClients.get(i);} else {clients[i] = initClient(url);}}return clients;}//获取共享链接private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {String key = url.getAddress();List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);if (checkClientCanUse(clients)) {batchClientRefIncr(clients);return clients;}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {clients = referenceClientMap.get(key);// dubbo checkif (checkClientCanUse(clients)) {batchClientRefIncr(clients);return clients;}// connectNum must be greater than or equal to 1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization isif (CollectionUtils.isEmpty(clients)) {clients = buildReferenceCountExchangeClientList(url, connectNum);referenceClientMap.put(key, clients);} else {for (int i = 0; i < clients.size(); i++) {ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {clients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}locks.remove(key);return clients;}}//构建共享连接private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}return clients;}private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {ExchangeClient exchangeClient = initClient(url);return new ReferenceCountExchangeClient(exchangeClient);}//初始化一个链接private ExchangeClient initClient(URL url) {// client type setting.String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));url = url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by defaulturl = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));// BIO is not allowed since it has severe performance issue.if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}ExchangeClient client;try {// connection should be lazyif (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {//创建netty客户端,这部分代码参见代码2.4client = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}return client;}}
Exchangers:
代码2.4
public class Exchangers {public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");return getExchanger(url).connect(url, handler);}public static Exchanger getExchanger(URL url) {String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);}//哈哈,都是dubbo spi。得到的实例是HeaderExchangerpublic static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}}public class HeaderExchanger implements Exchanger {public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//封装链如下:requestHandler -> HeaderExchangeHandler -> DecodeHandler ->// --> AllChannelHandler --> HeartbeatHandler --> MultiMessageHandler,//MultiMessageHandler最终赋值给了NettyClient的handler属性,NettyClient也作为参数//传给了NettyClientHandler。//requestHandler是DubboProtocol中的属性,也是最终通道事件处理的地方return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}}public class Transporters {public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}ChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//getTransporter()得到的是NettyTransporter实例return getTransporter().connect(url, handler);}public static Transporter getTransporter() {//默认是NettyTransporterreturn ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}}public class NettyTransporter implements Transporter {public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);}public Client connect(URL url, ChannelHandler listener) throws RemotingException {//实例化NettyClient的内容也很精彩,参加代码2.5return new NettyClient(url, listener);}}
代码2.5
public class NettyClient extends AbstractClient {public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler//wrapChannelHandler方法会创建dubbo线程池策略实例,用以处理netty通道上的连接、读、写等事件。//线程池策略实例将会被MultiMessageHandler->HeartbeatHandler包装super(url, wrapChannelHandler(url, handler));}protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);return ChannelHandlers.wrap(handler, url);}protected void doOpen() throws Throwable {final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);bootstrap = new Bootstrap();bootstrap.group(nioEventLoopGroup).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()).channel(NioSocketChannel.class);if (getConnectTimeout() < 3000) {bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);} else {bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());}bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//nettyClientHandler这是通信的入口.addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}protected void doConnect() throws Throwable {long start = System.currentTimeMillis();ChannelFuture future = bootstrap.connect(getConnectAddress());try {boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);if (ret && future.isSuccess()) {Channel newChannel = future.channel();try {// Close old channel// copy referenceChannel oldChannel = NettyClient.this.channel;if (oldChannel != null) {try {if (logger.isInfoEnabled()) {logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info("Close new netty channel " + newChannel + ", because the client closed.");}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} else if (future.cause() != null) {throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());} else {throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + " client-side timeout "+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());}} finally {// just add new valid channel to NettyChannel's cacheif (!isConnected()) {//future.cancel(true);}}}}public abstract class AbstractClient extends AbstractEndpoint implements Client {public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);try {doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect.connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));}}public class ChannelHandlers {public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {//spi机制创建线程池策略实例,默认是all,即AllChannelHandlerreturn new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}}public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {//super指WrappedChannelHandlersuper(handler, url);}}public class WrappedChannelHandler implements ChannelHandlerDelegate {public WrappedChannelHandler(ChannelHandler handler, URL url) {this.handler = handler;this.url = url;//既然是线程池策略那当然要创建线程池了,又是spi,默认是fixed线程池,线程池的核心线程数和//最大线程数相同,默认是200executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {componentKey = CONSUMER_SIDE;}//线程池实例存储起来,优雅关机时要关闭线程池DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();dataStore.put(componentKey, Integer.toString(url.getPort()), executor);}}
