一、时序图
    image.png

    二、源码分析
    1、InvokerInvocationHandler

    1. public class InvokerInvocationHandler implements InvocationHandler {
    2. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    3. String methodName = method.getName();
    4. Class<?>[] parameterTypes = method.getParameterTypes();
    5. if (method.getDeclaringClass() == Object.class) {
    6. return method.invoke(invoker, args);
    7. }
    8. if ("toString".equals(methodName) && parameterTypes.length == 0) {
    9. return invoker.toString();
    10. }
    11. if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
    12. return invoker.hashCode();
    13. }
    14. if ("equals".equals(methodName) && parameterTypes.length == 1) {
    15. return invoker.equals(args[0]);
    16. }
    17. //invoker指向的是MockClusterInvoker,将方法和参数封装成RpcInvocation
    18. return invoker.invoke(new RpcInvocation(method, args)).recreate();
    19. }
    20. }

    2、MockClusterInvoker

    1. public class MockClusterInvoker<T> implements Invoker<T> {
    2. public Result invoke(Invocation invocation) throws RpcException {
    3. Result result = null;
    4. String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    5. if (value.length() == 0 || value.equalsIgnoreCase("false")) {
    6. //no mock,invoker指向的是FailoverClusterInvoker
    7. result = this.invoker.invoke(invocation);
    8. } else if (value.startsWith("force")) {
    9. if (logger.isWarnEnabled()) {
    10. logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
    11. }
    12. //force:direct mock
    13. result = doMockInvoke(invocation, null);
    14. } else {
    15. //fail-mock
    16. try {
    17. result = this.invoker.invoke(invocation);
    18. } catch (RpcException e) {
    19. if (e.isBiz()) {
    20. throw e;
    21. }
    22. if (logger.isWarnEnabled()) {
    23. logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
    24. }
    25. result = doMockInvoke(invocation, e);
    26. }
    27. }
    28. return result;
    29. }
    30. }

    3、FailoverClusterInvoker

    1. public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
    2. public Result invoke(final Invocation invocation) throws RpcException {
    3. checkWhetherDestroyed();
    4. // binding attachments into invocation.
    5. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    6. if (contextAttachments != null && contextAttachments.size() != 0) {
    7. ((RpcInvocation) invocation).addAttachments(contextAttachments);
    8. }
    9. List<Invoker<T>> invokers = list(invocation);
    10. //选择负载均衡处理器
    11. LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    12. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    13. //doInvoke
    14. return doInvoke(invocation, invokers, loadbalance);
    15. }
    16. //spi加载默认负载均衡处理器,默认是random,随机选择
    17. protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    18. if (CollectionUtils.isNotEmpty(invokers)) {
    19. return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
    20. .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    21. } else {
    22. return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    23. }
    24. }
    25. protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
    26. List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    27. if (CollectionUtils.isEmpty(invokers)) {
    28. return null;
    29. }
    30. String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();
    31. boolean sticky = invokers.get(0).getUrl()
    32. .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
    33. //ignore overloaded method
    34. if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
    35. stickyInvoker = null;
    36. }
    37. //ignore concurrency problem
    38. if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
    39. if (availablecheck && stickyInvoker.isAvailable()) {
    40. return stickyInvoker;
    41. }
    42. }
    43. //负载均衡选择
    44. Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    45. if (sticky) {
    46. stickyInvoker = invoker;
    47. }
    48. return invoker;
    49. }
    50. private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
    51. List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    52. if (CollectionUtils.isEmpty(invokers)) {
    53. return null;
    54. }
    55. if (invokers.size() == 1) {
    56. return invokers.get(0);
    57. }
    58. Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    59. //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
    60. if ((selected != null && selected.contains(invoker))
    61. || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
    62. try {
    63. Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
    64. if (rInvoker != null) {
    65. invoker = rInvoker;
    66. } else {
    67. //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
    68. int index = invokers.indexOf(invoker);
    69. try {
    70. //Avoid collision
    71. invoker = invokers.get((index + 1) % invokers.size());
    72. } catch (Exception e) {
    73. logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
    74. }
    75. }
    76. } catch (Throwable t) {
    77. logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
    78. }
    79. }
    80. return invoker;
    81. }
    82. }
    83. public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    84. //父类invoke方法调用子类doInvoke方法
    85. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    86. List<Invoker<T>> copyInvokers = invokers;
    87. checkInvokers(copyInvokers, invocation);
    88. String methodName = RpcUtils.getMethodName(invocation);
    89. //获取重试次数,默认三次
    90. int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    91. if (len <= 0) {
    92. len = 1;
    93. }
    94. // retry loop.
    95. RpcException le = null; // last exception.
    96. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    97. Set<String> providers = new HashSet<String>(len);
    98. for (int i = 0; i < len; i++) {
    99. //Reselect before retry to avoid a change of candidate `invokers`.
    100. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
    101. if (i > 0) {
    102. checkWhetherDestroyed();
    103. copyInvokers = list(invocation);
    104. // check again
    105. checkInvokers(copyInvokers, invocation);
    106. }
    107. //调用父类的方法
    108. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
    109. invoked.add(invoker);
    110. RpcContext.getContext().setInvokers((List) invoked);
    111. try {
    112. //invoker指向的是InvokerDelegate,它是RegistryDirectory的内部类
    113. Result result = invoker.invoke(invocation);
    114. if (le != null && logger.isWarnEnabled()) {
    115. logger.warn("...");
    116. }
    117. return result;
    118. } catch (RpcException e) {
    119. if (e.isBiz()) { // biz exception.
    120. throw e;
    121. }
    122. le = e;
    123. } catch (Throwable e) {
    124. le = new RpcException(e.getMessage(), e);
    125. } finally {
    126. providers.add(invoker.getUrl().getAddress());
    127. }
    128. }
    129. throw new RpcException(le.getCode(), ...);
    130. }
    131. }

    4、InvokerDelegate

    1. private static class InvokerDelegate<T> extends InvokerWrapper<T> {
    2. private URL providerUrl;
    3. public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
    4. super(invoker, url);
    5. this.providerUrl = providerUrl;
    6. }
    7. public URL getProviderUrl() {
    8. return providerUrl;
    9. }
    10. }
    11. public class InvokerWrapper<T> implements Invoker<T> {
    12. ...
    13. private final Invoker<T> invoker;
    14. private final URL url;
    15. public InvokerWrapper(Invoker<T> invoker, URL url) {
    16. this.invoker = invoker;
    17. this.url = url;
    18. }
    19. public Result invoke(Invocation invocation) throws RpcException {
    20. //invoker指向的是ListenerInvokerWrapper
    21. return invoker.invoke(invocation);
    22. }
    23. ...
    24. }

    5、ListenerInvokerWrapper

    1. public class ListenerInvokerWrapper<T> implements Invoker<T> {
    2. public Result invoke(Invocation invocation) throws RpcException {
    3. //invoker指向的是ProtocolFilterWrapper
    4. return invoker.invoke(invocation);
    5. }
    6. }

    6、ProtocolFilterWrapper$CallbackRegistrationInvoker

    1. static class CallbackRegistrationInvoker<T> implements Invoker<T> {
    2. private final Invoker<T> filterInvoker;
    3. private final List<Filter> filters;
    4. public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
    5. this.filterInvoker = filterInvoker;
    6. this.filters = filters;
    7. }
    8. public Result invoke(Invocation invocation) throws RpcException {
    9. //filterInvoker是一个责任链模式,将DubboInvoker封装在了filter过滤中,
    10. //参见图1.0,filter过滤器的代码就不细致看了,直接看DubboInvoker代码
    11. Result asyncResult = filterInvoker.invoke(invocation);
    12. asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
    13. for (int i = filters.size() - 1; i >= 0; i--) {
    14. Filter filter = filters.get(i);
    15. // onResponse callback
    16. if (filter instanceof ListenableFilter) {
    17. Filter.Listener listener = ((ListenableFilter) filter).listener();
    18. if (listener != null) {
    19. if (t == null) {
    20. listener.onResponse(r, filterInvoker, invocation);
    21. } else {
    22. listener.onError(t, filterInvoker, invocation);
    23. }
    24. }
    25. } else {
    26. filter.onResponse(r, filterInvoker, invocation);
    27. }
    28. }
    29. });
    30. return asyncResult;
    31. }
    32. ...
    33. }

    image.png
    图1.0

    7、DubboInvoker

    1. public class DubboInvoker<T> extends AbstractInvoker<T> {
    2. protected Result doInvoke(final Invocation invocation) throws Throwable {
    3. RpcInvocation inv = (RpcInvocation) invocation;
    4. final String methodName = RpcUtils.getMethodName(invocation);
    5. inv.setAttachment(PATH_KEY, getUrl().getPath());
    6. inv.setAttachment(VERSION_KEY, version);
    7. ExchangeClient currentClient;
    8. //clients是在DubboProcotol类的protocolBindingRefer方法中构造DubboInvoker时生成的
    9. //里面的元素是ReferenceCountExchangeClient,共享连接实例
    10. if (clients.length == 1) {
    11. currentClient = clients[0];
    12. } else {
    13. currentClient = clients[index.getAndIncrement() % clients.length];
    14. }
    15. try {
    16. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    17. int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
    18. //oneway:只发送不接收; 从dubbo2.7.3可以看出,dubbo的rpc调用现在只有两种方式:oneway和
    19. //异步调用,没有同步调用了
    20. if (isOneway) {
    21. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    22. currentClient.send(inv, isSent);
    23. return AsyncRpcResult.newDefaultAsyncResult(invocation);
    24. } else {
    25. AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
    26. //currentClient指ReferenceCountExchangeClient
    27. CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
    28. asyncRpcResult.subscribeTo(responseFuture);
    29. // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
    30. FutureContext.getContext().setCompatibleFuture(responseFuture);
    31. return asyncRpcResult;
    32. }
    33. } catch (TimeoutException e) {
    34. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    35. } catch (RemotingException e) {
    36. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    37. }
    38. }
    39. }

    8、ReferenceCountExchangeClient

    1. final class ReferenceCountExchangeClient implements ExchangeClient {
    2. public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    3. //client指向的是HeaderExchangeClient
    4. return client.request(request, timeout);
    5. }
    6. }

    9、HeaderExchangeClient

    1. public class HeaderExchangeClient implements ExchangeClient {
    2. public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    3. //channel指向的是HeaderExchangeChannel
    4. return channel.request(request, timeout);
    5. }
    6. }

    10、HeaderExchangeChannel

    1. final class HeaderExchangeChannel implements ExchangeChannel {
    2. public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    3. if (closed) {
    4. throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    5. }
    6. // create request.
    7. Request req = new Request();
    8. req.setVersion(Version.getProtocolVersion());
    9. req.setTwoWay(true);
    10. req.setData(request);
    11. DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    12. try {
    13. //channel指向的是NettyClient
    14. channel.send(req);
    15. } catch (RemotingException e) {
    16. future.cancel();
    17. throw e;
    18. }
    19. return future;
    20. }
    21. }

    11、NettyClient

    1. public abstract class AbstractClient extends AbstractEndpoint implements Client {
    2. public void send(Object message, boolean sent) throws RemotingException {
    3. if (needReconnect && !isConnected()) {
    4. connect();
    5. }
    6. Channel channel = getChannel();
    7. //TODO Can the value returned by getChannel() be null? need improvement.
    8. if (channel == null || !channel.isConnected()) {
    9. throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    10. }
    11. //channel指向的是NettyChannel
    12. channel.send(message, sent);
    13. }
    14. }