一、时序图
二、源码分析
1、InvokerInvocationHandler
public class InvokerInvocationHandler implements InvocationHandler {public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();if (method.getDeclaringClass() == Object.class) {return method.invoke(invoker, args);}if ("toString".equals(methodName) && parameterTypes.length == 0) {return invoker.toString();}if ("hashCode".equals(methodName) && parameterTypes.length == 0) {return invoker.hashCode();}if ("equals".equals(methodName) && parameterTypes.length == 1) {return invoker.equals(args[0]);}//invoker指向的是MockClusterInvoker,将方法和参数封装成RpcInvocationreturn invoker.invoke(new RpcInvocation(method, args)).recreate();}}
2、MockClusterInvoker
public class MockClusterInvoker<T> implements Invoker<T> {public Result invoke(Invocation invocation) throws RpcException {Result result = null;String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();if (value.length() == 0 || value.equalsIgnoreCase("false")) {//no mock,invoker指向的是FailoverClusterInvokerresult = this.invoker.invoke(invocation);} else if (value.startsWith("force")) {if (logger.isWarnEnabled()) {logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());}//force:direct mockresult = doMockInvoke(invocation, null);} else {//fail-mocktry {result = this.invoker.invoke(invocation);} catch (RpcException e) {if (e.isBiz()) {throw e;}if (logger.isWarnEnabled()) {logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);}result = doMockInvoke(invocation, e);}}return result;}}
3、FailoverClusterInvoker
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// binding attachments into invocation.Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addAttachments(contextAttachments);}List<Invoker<T>> invokers = list(invocation);//选择负载均衡处理器LoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//doInvokereturn doInvoke(invocation, invokers, loadbalance);}//spi加载默认负载均衡处理器,默认是random,随机选择protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {if (CollectionUtils.isNotEmpty(invokers)) {return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));} else {return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);}}protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);//ignore overloaded methodif (stickyInvoker != null && !invokers.contains(stickyInvoker)) {stickyInvoker = null;}//ignore concurrency problemif (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {if (availablecheck && stickyInvoker.isAvailable()) {return stickyInvoker;}}//负载均衡选择Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);if (sticky) {stickyInvoker = invoker;}return invoker;}private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}if (invokers.size() == 1) {return invokers.get(0);}Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.if ((selected != null && selected.contains(invoker))|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {try {Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);if (rInvoker != null) {invoker = rInvoker;} else {//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.int index = invokers.indexOf(invoker);try {//Avoid collisioninvoker = invokers.get((index + 1) % invokers.size());} catch (Exception e) {logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);}}} catch (Throwable t) {logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);}}return invoker;}}public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {//父类invoke方法调用子类doInvoke方法public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyInvokers = invokers;checkInvokers(copyInvokers, invocation);String methodName = RpcUtils.getMethodName(invocation);//获取重试次数,默认三次int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if (i > 0) {checkWhetherDestroyed();copyInvokers = list(invocation);// check againcheckInvokers(copyInvokers, invocation);}//调用父类的方法Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List) invoked);try {//invoker指向的是InvokerDelegate,它是RegistryDirectory的内部类Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("...");}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(le.getCode(), ...);}}
4、InvokerDelegate
private static class InvokerDelegate<T> extends InvokerWrapper<T> {private URL providerUrl;public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {super(invoker, url);this.providerUrl = providerUrl;}public URL getProviderUrl() {return providerUrl;}}public class InvokerWrapper<T> implements Invoker<T> {...private final Invoker<T> invoker;private final URL url;public InvokerWrapper(Invoker<T> invoker, URL url) {this.invoker = invoker;this.url = url;}public Result invoke(Invocation invocation) throws RpcException {//invoker指向的是ListenerInvokerWrapperreturn invoker.invoke(invocation);}...}
5、ListenerInvokerWrapper
public class ListenerInvokerWrapper<T> implements Invoker<T> {public Result invoke(Invocation invocation) throws RpcException {//invoker指向的是ProtocolFilterWrapperreturn invoker.invoke(invocation);}}
6、ProtocolFilterWrapper$CallbackRegistrationInvoker
static class CallbackRegistrationInvoker<T> implements Invoker<T> {private final Invoker<T> filterInvoker;private final List<Filter> filters;public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {this.filterInvoker = filterInvoker;this.filters = filters;}public Result invoke(Invocation invocation) throws RpcException {//filterInvoker是一个责任链模式,将DubboInvoker封装在了filter过滤中,//参见图1.0,filter过滤器的代码就不细致看了,直接看DubboInvoker代码Result asyncResult = filterInvoker.invoke(invocation);asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {for (int i = filters.size() - 1; i >= 0; i--) {Filter filter = filters.get(i);// onResponse callbackif (filter instanceof ListenableFilter) {Filter.Listener listener = ((ListenableFilter) filter).listener();if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} else {filter.onResponse(r, filterInvoker, invocation);}}});return asyncResult;}...}

图1.0
7、DubboInvoker
public class DubboInvoker<T> extends AbstractInvoker<T> {protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);ExchangeClient currentClient;//clients是在DubboProcotol类的protocolBindingRefer方法中构造DubboInvoker时生成的//里面的元素是ReferenceCountExchangeClient,共享连接实例if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);//oneway:只发送不接收; 从dubbo2.7.3可以看出,dubbo的rpc调用现在只有两种方式:oneway和//异步调用,没有同步调用了if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else {AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);//currentClient指ReferenceCountExchangeClientCompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);asyncRpcResult.subscribeTo(responseFuture);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapterFutureContext.getContext().setCompatibleFuture(responseFuture);return asyncRpcResult;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}}
8、ReferenceCountExchangeClient
final class ReferenceCountExchangeClient implements ExchangeClient {public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {//client指向的是HeaderExchangeClientreturn client.request(request, timeout);}}
9、HeaderExchangeClient
public class HeaderExchangeClient implements ExchangeClient {public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {//channel指向的是HeaderExchangeChannelreturn channel.request(request, timeout);}}
10、HeaderExchangeChannel
final class HeaderExchangeChannel implements ExchangeChannel {public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);try {//channel指向的是NettyClientchannel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}}
11、NettyClient
public abstract class AbstractClient extends AbstractEndpoint implements Client {public void send(Object message, boolean sent) throws RemotingException {if (needReconnect && !isConnected()) {connect();}Channel channel = getChannel();//TODO Can the value returned by getChannel() be null? need improvement.if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}//channel指向的是NettyChannelchannel.send(message, sent);}}
