一、启动流程概述
    启动的入口在ReferenceConfig类的get()方法,消费者启动流程包含以下几步:
    1. 构建Registry实例、RegistryDirectory实例
    2. 向注册中心注册消费者信息
    3. 构建路由规则链
    4. 向注册中心订阅提供者信息
    5. 将订阅的提供者信息转换为DubboInvoker,并且开启通信服务(默认是netty,消费者在启动时会与多个提供者之间都建立连接,但缺省情况下同一个提供者下的所有rpc调用共用一个连接,但是也可以配置为不共用)
    6.将构建好的Invoker实例封装进集群容错策略类中
    7.将构建好的集群容错策略实例封装在InvokerInvocationHandler类中

    consumer://192.168.56.1/service.GreetingServiceAsync?application=first-dubbo-consumer&async=true&dubbo=2.0.2&group=dubbo&interface=service.GreetingServiceAsync&lazy=false&methods=sayHello,testGeneric&pid=13912&release=2.7.3&revision=1.0.0&side=consumer&sticky=false&timeout=3000&timestamp=1581562335685&version=1.0.0

    二、启动时序图
    image.png
    图1.0

    三、源码分析
    1、ReferenceConfig
    代码1.0

    1. public synchronized T get() {
    2. checkAndUpdateSubConfigs();
    3. if (destroyed) {
    4. throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    5. }
    6. if (ref == null) {
    7. //初始化
    8. init();
    9. }
    10. return ref;
    11. }
    12. private void init() {
    13. //省略的代码是用来构造参数map的,包括消费者的应用信息、模块信息、要调用的接口
    14. ...
    15. //创建的代理类才是调用具体方法时的入口。订阅的提供者信息将会被转化为DubboInvoker,
    16. //DubboInvoker会被层层包装,最后封装在集群容错策略类中。dubbo最后对集群容错策略
    17. //实例类进行代理封装,代理类才是实际的调用入口。细节随后再说
    18. ref = createProxy(map);
    19. ...
    20. }
    21. private T createProxy(Map<String, String> map) {
    22. //判断是否是本地引用,如果是的话,就不用发起远程调用了。但是本地调用dubbo服务,也不是直接调用
    23. //实现类,消费者和提供者还是会经过各种包装类、过滤类,最终才调用到具体实现类
    24. if (shouldJvmRefer(map)) {
    25. URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
    26. invoker = REF_PROTOCOL.refer(interfaceClass, url);
    27. if (logger.isInfoEnabled()) {
    28. logger.info("Using injvm service " + interfaceClass.getName());
    29. }
    30. } else {
    31. urls.clear(); // reference retry init will add url to urls, lead to OOM
    32. //直连方式
    33. if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
    34. String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
    35. if (us != null && us.length > 0) {
    36. for (String u : us) {
    37. URL url = URL.valueOf(u);
    38. if (StringUtils.isEmpty(url.getPath())) {
    39. url = url.setPath(interfaceName);
    40. }
    41. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    42. urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
    43. } else {
    44. urls.add(ClusterUtils.mergeUrl(url, map));
    45. }
    46. }
    47. }
    48. } else { // assemble URL from register center's configuration
    49. // if protocols not injvm checkRegistry
    50. if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
    51. checkRegistry();
    52. //加载注册地址
    53. List<URL> us = loadRegistries(false);
    54. if (CollectionUtils.isNotEmpty(us)) {
    55. for (URL u : us) {
    56. URL monitorUrl = loadMonitor(u);
    57. if (monitorUrl != null) {
    58. map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    59. }
    60. //把map转为字符串添加进url中
    61. urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
    62. }
    63. }
    64. if (urls.isEmpty()) {
    65. throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
    66. }
    67. }
    68. }
    69. //如果注册中心只有一个则走这段代码,否则走下面代码。但是流程和原理是相通的。
    70. if (urls.size() == 1) {
    71. //REF_PROTOCOL引用的是Protocol$Adaptive,这里获取的是RegistryProtocol,但是在获取
    72. //RegistryProtocol的过程中,ExtensionLoader会对RegistryProtocol实例进行包装,包装
    73. //类就是ProtocolListenerWrapper、QosProtocolWrapper、ProtocolFilterWrapper。在
    74. //ProtocolFilterWrapper中还会经过filter过滤器的封装
    75. invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    76. } else {
    77. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    78. URL registryURL = null;
    79. for (URL url : urls) {
    80. invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
    81. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    82. registryURL = url; // use last registry url
    83. }
    84. }
    85. if (registryURL != null) { // registry url is available
    86. // use RegistryAwareCluster only when register's CLUSTER is available
    87. URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
    88. // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
    89. invoker = CLUSTER.join(new StaticDirectory(u, invokers));
    90. } else { // not a registry url, must be direct invoke.
    91. invoker = CLUSTER.join(new StaticDirectory(invokers));
    92. }
    93. }
    94. }
    95. if (shouldCheck() && !invoker.isAvailable()) {
    96. throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    97. }
    98. if (logger.isInfoEnabled()) {
    99. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    100. }
    101. //根据invoker创建代理类,PROXY_FACTORY引用的是ProxyFactory$Adaptive,默认是JavassistProxyFactory
    102. //1)invoker实例指向的是MockClusterInvoker
    103. //2)PROXY_FACTORY.getProxy(invoker)生成的实例指向的是InvokerInvocationHandler。为什么
    104. //是InvokerInvocationHandler呢,自己去看一眼JavassistProxyFactory的代码便知
    105. //参见图1.1
    106. return (T) PROXY_FACTORY.getProxy(invoker);
    107. }

    image.png
    图1.1

    2、RegistryProtocol
    我们看一下这个实例:
    image.png
    图2.0

    包装Wrapper类代码:
    代码2.0

    1. public class ProtocolListenerWrapper implements Protocol {
    2. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    3. //这时的url信息是注册信息,以registry开头
    4. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    5. //protocol指向QosProtocolWrapper
    6. return protocol.refer(type, url);
    7. }
    8. return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
    9. Collections.unmodifiableList(
    10. ExtensionLoader.getExtensionLoader(InvokerListener.class)
    11. .getActivateExtension(url, INVOKER_LISTENER_KEY)));
    12. }
    13. }
    14. public class QosProtocolWrapper implements Protocol {
    15. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    16. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    17. startQosServer(url);
    18. //protocol指向ProtocolFilterWrapper
    19. return protocol.refer(type, url);
    20. }
    21. return protocol.refer(type, url);
    22. }
    23. }
    24. public class ProtocolFilterWrapper implements Protocol {
    25. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    26. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    27. return protocol.refer(type, url);
    28. }
    29. //protocol指向RegistryProtocol
    30. return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    31. }
    32. //该方法用来构造过滤器,将invoker实例用各种filter封装。在filter中完成相应功能
    33. private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    34. Invoker<T> last = invoker;
    35. List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    36. if (!filters.isEmpty()) {
    37. for (int i = filters.size() - 1; i >= 0; i--) {
    38. final Filter filter = filters.get(i);
    39. final Invoker<T> next = last;
    40. last = new Invoker<T>() {
    41. public Class<T> getInterface() {
    42. return invoker.getInterface();
    43. }
    44. public URL getUrl() {
    45. return invoker.getUrl();
    46. }
    47. public boolean isAvailable() {
    48. return invoker.isAvailable();
    49. }
    50. public Result invoke(Invocation invocation) throws RpcException {
    51. Result asyncResult;
    52. try {
    53. asyncResult = filter.invoke(next, invocation);
    54. } catch (Exception e) {
    55. // onError callback
    56. if (filter instanceof ListenableFilter) {
    57. Filter.Listener listener = ((ListenableFilter) filter).listener();
    58. if (listener != null) {
    59. listener.onError(e, invoker, invocation);
    60. }
    61. }
    62. throw e;
    63. }
    64. return asyncResult;
    65. }
    66. public void destroy() {
    67. invoker.destroy();
    68. }
    69. public String toString() {
    70. return invoker.toString();
    71. }
    72. };
    73. }
    74. }
    75. return new CallbackRegistrationInvoker<>(last, filters);
    76. }
    77. static class CallbackRegistrationInvoker<T> implements Invoker<T> {
    78. private final Invoker<T> filterInvoker;
    79. private final List<Filter> filters;
    80. public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
    81. this.filterInvoker = filterInvoker;
    82. this.filters = filters;
    83. }
    84. public Result invoke(Invocation invocation) throws RpcException {
    85. Result asyncResult = filterInvoker.invoke(invocation);
    86. asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
    87. for (int i = filters.size() - 1; i >= 0; i--) {
    88. Filter filter = filters.get(i);
    89. // onResponse callback
    90. if (filter instanceof ListenableFilter) {
    91. Filter.Listener listener = ((ListenableFilter) filter).listener();
    92. if (listener != null) {
    93. if (t == null) {
    94. listener.onResponse(r, filterInvoker, invocation);
    95. } else {
    96. listener.onError(t, filterInvoker, invocation);
    97. }
    98. }
    99. } else {
    100. filter.onResponse(r, filterInvoker, invocation);
    101. }
    102. }
    103. });
    104. return asyncResult;
    105. }
    106. public Class<T> getInterface() {
    107. return filterInvoker.getInterface();
    108. }
    109. public URL getUrl() {
    110. return filterInvoker.getUrl();
    111. }
    112. public boolean isAvailable() {
    113. return filterInvoker.isAvailable();
    114. }
    115. public void destroy() {
    116. filterInvoker.destroy();
    117. }
    118. }
    119. }

    RegistryProtocol代码:
    代码2.1

    1. public class RegistryProtocol implements Protocol {
    2. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    3. url = URLBuilder.from(url)
    4. .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
    5. .removeParameter(REGISTRY_KEY)
    6. .build();
    7. //获取注册实例,如果是zk的话,registry指向ZookeeperRegistry
    8. Registry registry = registryFactory.getRegistry(url);
    9. if (RegistryService.class.equals(type)) {
    10. return proxyFactory.getInvoker((T) registry, type, url);
    11. }
    12. // group="a,b" or group="*"
    13. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    14. String group = qs.get(GROUP_KEY);
    15. if (group != null && group.length() > 0) {
    16. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
    17. return doRefer(getMergeableCluster(), registry, type, url);
    18. }
    19. }
    20. //cluster指向的是Cluster$Adaptive
    21. return doRefer(cluster, registry, type, url);
    22. }
    23. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    24. //directory是目录,什么意思呢?通常微服务应用会部署多个实例,也就是说同一个api有多个服务者,
    25. //那么怎么管理这些服务者呢?自然是通过相应Directory实例管理了
    26. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    27. directory.setRegistry(registry);
    28. directory.setProtocol(protocol);
    29. // all attributes of REFER_KEY
    30. Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    31. URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    32. if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
    33. directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
    34. //向注册中心注册消费者信息
    35. registry.register(directory.getRegisteredConsumerUrl());
    36. }
    37. //构建路由链,什么是路由呢?一个接口有十个提供者,业务上要求消费者只调用某几个提供者或者来自某
    38. //个消费者的调用只调用某几个消费者,如何完成这些需求?通过路由规则
    39. directory.buildRouterChain(subscribeUrl);
    40. //订阅提供者信息。
    41. //subscribeUrl.addParameter增加了类别参数,分别是providers、configurators、routers,将会
    42. //从注册中心上订阅这三种类型的数据,并将订阅的信息转换成Invoker,还会开启NettyClient客户端,
    43. //与服务者通信。这里的代码很绕,调用链参见图2.1,代码参加2.2
    44. directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
    45. PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    46. //cluster指向Cluster$Adaptive,就是把directory封装在集群容错类中而已。看图就行,参见图2.0
    47. //invoker指向的是MockClusterInvoker实例
    48. Invoker invoker = cluster.join(directory);
    49. ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    50. return invoker;
    51. }
    52. }

    image.png
    图2.0

    image.png
    图2.1

    RegistryDirectory:
    代码2.2

    1. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    2. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
    3. if (urls == null || urls.isEmpty()) {
    4. return newUrlInvokerMap;
    5. }
    6. Set<String> keys = new HashSet<>();
    7. String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    8. for (URL providerUrl : urls) {
    9. // If protocol is configured at the reference side, only the matching protocol is selected
    10. if (queryProtocols != null && queryProtocols.length() > 0) {
    11. boolean accept = false;
    12. String[] acceptProtocols = queryProtocols.split(",");
    13. for (String acceptProtocol : acceptProtocols) {
    14. if (providerUrl.getProtocol().equals(acceptProtocol)) {
    15. accept = true;
    16. break;
    17. }
    18. }
    19. if (!accept) {
    20. continue;
    21. }
    22. }
    23. if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
    24. continue;
    25. }
    26. if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
    27. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
    28. " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
    29. " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
    30. ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
    31. continue;
    32. }
    33. URL url = mergeUrl(providerUrl);
    34. String key = url.toFullString(); // The parameter urls are sorted
    35. if (keys.contains(key)) { // Repeated url
    36. continue;
    37. }
    38. keys.add(key);
    39. // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
    40. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
    41. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
    42. if (invoker == null) { // Not in the cache, refer again
    43. try {
    44. boolean enabled = true;
    45. if (url.hasParameter(DISABLED_KEY)) {
    46. enabled = !url.getParameter(DISABLED_KEY, false);
    47. } else {
    48. enabled = url.getParameter(ENABLED_KEY, true);
    49. }
    50. if (enabled) {
    51. //这里是重点,protocol指向的是Protocol$Adaptive,到了这一步,url的协议变
    52. //成了dubbo协议,所以最终会调用DubboProtocol,生成DubboInvoker。
    53. //但是这个DubboInvoker肯定是要被ProtocolListenerWrapper、QosProtocolWrapper、
    54. //ProtocolFilterWrapper以及Filter过滤类进行封装的,这个过程参见RegistryProtocol
    55. //实例化的过程。我们来看一下DubboProtocol的代码
    56. invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
    57. }
    58. } catch (Throwable t) {
    59. logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
    60. }
    61. if (invoker != null) { // Put new invoker in cache
    62. newUrlInvokerMap.put(key, invoker);
    63. }
    64. } else {
    65. newUrlInvokerMap.put(key, invoker);
    66. }
    67. }
    68. keys.clear();
    69. return newUrlInvokerMap;
    70. }

    DubboProtocol:
    代码2.3

    1. public abstract class AbstractProtocol implements Protocol {
    2. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    3. //protocolBindingRefer由具体子类实现,从这里可以看出,从dubbo2.7.3分支开始,
    4. //消费者端的调用都是异步的了。消费者发起rpc之后,异步获取调用结果
    5. return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    6. }
    7. }
    8. public class DubboProtocol extends AbstractProtocol {
    9. //DubboProtocol的refer方法继承自父类
    10. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    11. optimizeSerialization(url);
    12. //创建DubboInvoker,创建实例的同时还要创建netty客户端,开启通信
    13. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    14. invokers.add(invoker);
    15. return invoker;
    16. }
    17. //1、微服务通常会部署多个实例,每个实例都是一个提供者
    18. //2、消费者在启动时,会与每个提供者都建立连接,所以消费者会维护多个连接
    19. //3、一个提供者尝尝会提供多个接口,消费者是针对每个接口都建立一个连接还是所有接口共用一个连接?
    20. //缺省情况下,是同一个提供者下的所有接口共用一个连接。当然也可以设置不共享
    21. private ExchangeClient[] getClients(URL url) {
    22. //同一个提供者下的所有rpc调用是否共用一个连接
    23. boolean useShareConnect = false;
    24. int connections = url.getParameter(CONNECTIONS_KEY, 0);
    25. List<ReferenceCountExchangeClient> shareClients = null;
    26. // if not configured, connection is shared, otherwise, one connection for one service
    27. if (connections == 0) {
    28. useShareConnect = true;
    29. //xml的配置方式优先级最高
    30. String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
    31. connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
    32. DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
    33. //获取共享连接
    34. shareClients = getSharedClient(url, connections);
    35. }
    36. ExchangeClient[] clients = new ExchangeClient[connections];
    37. for (int i = 0; i < clients.length; i++) {
    38. if (useShareConnect) {
    39. clients[i] = shareClients.get(i);
    40. } else {
    41. clients[i] = initClient(url);
    42. }
    43. }
    44. return clients;
    45. }
    46. //获取共享链接
    47. private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    48. String key = url.getAddress();
    49. List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    50. if (checkClientCanUse(clients)) {
    51. batchClientRefIncr(clients);
    52. return clients;
    53. }
    54. locks.putIfAbsent(key, new Object());
    55. synchronized (locks.get(key)) {
    56. clients = referenceClientMap.get(key);
    57. // dubbo check
    58. if (checkClientCanUse(clients)) {
    59. batchClientRefIncr(clients);
    60. return clients;
    61. }
    62. // connectNum must be greater than or equal to 1
    63. connectNum = Math.max(connectNum, 1);
    64. // If the clients is empty, then the first initialization is
    65. if (CollectionUtils.isEmpty(clients)) {
    66. clients = buildReferenceCountExchangeClientList(url, connectNum);
    67. referenceClientMap.put(key, clients);
    68. } else {
    69. for (int i = 0; i < clients.size(); i++) {
    70. ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
    71. // If there is a client in the list that is no longer available, create a new one to replace him.
    72. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
    73. clients.set(i, buildReferenceCountExchangeClient(url));
    74. continue;
    75. }
    76. referenceCountExchangeClient.incrementAndGetCount();
    77. }
    78. }
    79. locks.remove(key);
    80. return clients;
    81. }
    82. }
    83. //构建共享连接
    84. private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
    85. List<ReferenceCountExchangeClient> clients = new ArrayList<>();
    86. for (int i = 0; i < connectNum; i++) {
    87. clients.add(buildReferenceCountExchangeClient(url));
    88. }
    89. return clients;
    90. }
    91. private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    92. ExchangeClient exchangeClient = initClient(url);
    93. return new ReferenceCountExchangeClient(exchangeClient);
    94. }
    95. //初始化一个链接
    96. private ExchangeClient initClient(URL url) {
    97. // client type setting.
    98. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    99. url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    100. // enable heartbeat by default
    101. url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
    102. // BIO is not allowed since it has severe performance issue.
    103. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    104. throw new RpcException("Unsupported client type: " + str + "," +
    105. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    106. }
    107. ExchangeClient client;
    108. try {
    109. // connection should be lazy
    110. if (url.getParameter(LAZY_CONNECT_KEY, false)) {
    111. client = new LazyConnectExchangeClient(url, requestHandler);
    112. } else {
    113. //创建netty客户端,这部分代码参见代码2.4
    114. client = Exchangers.connect(url, requestHandler);
    115. }
    116. } catch (RemotingException e) {
    117. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    118. }
    119. return client;
    120. }
    121. }

    Exchangers:
    代码2.4

    1. public class Exchangers {
    2. public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    3. if (url == null) {
    4. throw new IllegalArgumentException("url == null");
    5. }
    6. if (handler == null) {
    7. throw new IllegalArgumentException("handler == null");
    8. }
    9. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    10. return getExchanger(url).connect(url, handler);
    11. }
    12. public static Exchanger getExchanger(URL url) {
    13. String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    14. return getExchanger(type);
    15. }
    16. //哈哈,都是dubbo spi。得到的实例是HeaderExchanger
    17. public static Exchanger getExchanger(String type) {
    18. return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    19. }
    20. }
    21. public class HeaderExchanger implements Exchanger {
    22. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    23. //封装链如下:requestHandler -> HeaderExchangeHandler -> DecodeHandler ->
    24. // --> AllChannelHandler --> HeartbeatHandler --> MultiMessageHandler,
    25. //MultiMessageHandler最终赋值给了NettyClient的handler属性,NettyClient也作为参数
    26. //传给了NettyClientHandler。
    27. //requestHandler是DubboProtocol中的属性,也是最终通道事件处理的地方
    28. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    29. }
    30. }
    31. public class Transporters {
    32. public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    33. if (url == null) {
    34. throw new IllegalArgumentException("url == null");
    35. }
    36. ChannelHandler handler;
    37. if (handlers == null || handlers.length == 0) {
    38. handler = new ChannelHandlerAdapter();
    39. } else if (handlers.length == 1) {
    40. handler = handlers[0];
    41. } else {
    42. handler = new ChannelHandlerDispatcher(handlers);
    43. }
    44. //getTransporter()得到的是NettyTransporter实例
    45. return getTransporter().connect(url, handler);
    46. }
    47. public static Transporter getTransporter() {
    48. //默认是NettyTransporter
    49. return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    50. }
    51. }
    52. public class NettyTransporter implements Transporter {
    53. public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    54. return new NettyServer(url, listener);
    55. }
    56. public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    57. //实例化NettyClient的内容也很精彩,参加代码2.5
    58. return new NettyClient(url, listener);
    59. }
    60. }

    代码2.5

    1. public class NettyClient extends AbstractClient {
    2. public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    3. // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    4. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    5. //wrapChannelHandler方法会创建dubbo线程池策略实例,用以处理netty通道上的连接、读、写等事件。
    6. //线程池策略实例将会被MultiMessageHandler->HeartbeatHandler包装
    7. super(url, wrapChannelHandler(url, handler));
    8. }
    9. protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    10. url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    11. url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
    12. return ChannelHandlers.wrap(handler, url);
    13. }
    14. protected void doOpen() throws Throwable {
    15. final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    16. bootstrap = new Bootstrap();
    17. bootstrap.group(nioEventLoopGroup)
    18. .option(ChannelOption.SO_KEEPALIVE, true)
    19. .option(ChannelOption.TCP_NODELAY, true)
    20. .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    21. //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
    22. .channel(NioSocketChannel.class);
    23. if (getConnectTimeout() < 3000) {
    24. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    25. } else {
    26. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    27. }
    28. bootstrap.handler(new ChannelInitializer() {
    29. @Override
    30. protected void initChannel(Channel ch) throws Exception {
    31. int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
    32. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
    33. ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    34. .addLast("decoder", adapter.getDecoder())
    35. .addLast("encoder", adapter.getEncoder())
    36. .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
    37. //nettyClientHandler这是通信的入口
    38. .addLast("handler", nettyClientHandler);
    39. String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
    40. if(socksProxyHost != null) {
    41. int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
    42. Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
    43. ch.pipeline().addFirst(socks5ProxyHandler);
    44. }
    45. }
    46. });
    47. }
    48. protected void doConnect() throws Throwable {
    49. long start = System.currentTimeMillis();
    50. ChannelFuture future = bootstrap.connect(getConnectAddress());
    51. try {
    52. boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
    53. if (ret && future.isSuccess()) {
    54. Channel newChannel = future.channel();
    55. try {
    56. // Close old channel
    57. // copy reference
    58. Channel oldChannel = NettyClient.this.channel;
    59. if (oldChannel != null) {
    60. try {
    61. if (logger.isInfoEnabled()) {
    62. logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
    63. }
    64. oldChannel.close();
    65. } finally {
    66. NettyChannel.removeChannelIfDisconnected(oldChannel);
    67. }
    68. }
    69. } finally {
    70. if (NettyClient.this.isClosed()) {
    71. try {
    72. if (logger.isInfoEnabled()) {
    73. logger.info("Close new netty channel " + newChannel + ", because the client closed.");
    74. }
    75. newChannel.close();
    76. } finally {
    77. NettyClient.this.channel = null;
    78. NettyChannel.removeChannelIfDisconnected(newChannel);
    79. }
    80. } else {
    81. NettyClient.this.channel = newChannel;
    82. }
    83. }
    84. } else if (future.cause() != null) {
    85. throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
    86. + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
    87. } else {
    88. throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
    89. + getRemoteAddress() + " client-side timeout "
    90. + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
    91. + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
    92. }
    93. } finally {
    94. // just add new valid channel to NettyChannel's cache
    95. if (!isConnected()) {
    96. //future.cancel(true);
    97. }
    98. }
    99. }
    100. }
    101. public abstract class AbstractClient extends AbstractEndpoint implements Client {
    102. public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    103. super(url, handler);
    104. needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    105. try {
    106. doOpen();
    107. } catch (Throwable t) {
    108. close();
    109. throw new RemotingException(url.toInetSocketAddress(), null,
    110. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    111. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    112. }
    113. try {
    114. // connect.
    115. connect();
    116. if (logger.isInfoEnabled()) {
    117. logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
    118. }
    119. } catch (RemotingException t) {
    120. if (url.getParameter(Constants.CHECK_KEY, true)) {
    121. close();
    122. throw t;
    123. } else {
    124. logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    125. + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
    126. }
    127. } catch (Throwable t) {
    128. close();
    129. throw new RemotingException(url.toInetSocketAddress(), null,
    130. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    131. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    132. }
    133. executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
    134. .getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
    135. ExtensionLoader.getExtensionLoader(DataStore.class)
    136. .getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
    137. }
    138. }
    139. public class ChannelHandlers {
    140. public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    141. return ChannelHandlers.getInstance().wrapInternal(handler, url);
    142. }
    143. protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    144. //spi机制创建线程池策略实例,默认是all,即AllChannelHandler
    145. return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
    146. .getAdaptiveExtension().dispatch(handler, url)));
    147. }
    148. }
    149. public class AllChannelHandler extends WrappedChannelHandler {
    150. public AllChannelHandler(ChannelHandler handler, URL url) {
    151. //super指WrappedChannelHandler
    152. super(handler, url);
    153. }
    154. }
    155. public class WrappedChannelHandler implements ChannelHandlerDelegate {
    156. public WrappedChannelHandler(ChannelHandler handler, URL url) {
    157. this.handler = handler;
    158. this.url = url;
    159. //既然是线程池策略那当然要创建线程池了,又是spi,默认是fixed线程池,线程池的核心线程数和
    160. //最大线程数相同,默认是200
    161. executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    162. String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    163. if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
    164. componentKey = CONSUMER_SIDE;
    165. }
    166. //线程池实例存储起来,优雅关机时要关闭线程池
    167. DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    168. dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    169. }
    170. }