image.png

服务消费端发起调用

服务消费端通过 ReferenceConfig 的 get 方法返回的是一个代理类,并且方法拦截器为 InvokerInvocationHandler。所以当消费方调用了服务的接口方法后会被 InvokerInvocationHandler 拦截,执行如下逻辑:

  1. @Override
  2. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  3. ......
  4. // 创建RpcInvocation,封装一些服务接口相关的信息
  5. // method为调用的接口方法,args为方法入参信息
  6. RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
  7. String serviceKey = invoker.getUrl().getServiceKey();
  8. rpcInvocation.setTargetServiceUniqueName(serviceKey);
  9. // 发起方法调用
  10. return invoker.invoke(rpcInvocation).recreate();
  11. }

这里实际调用的 invoker 对象就是服务消费端启动时初始化的 FailoverClusterInvoker 对象。注意,这个 invoker 对象在创建时会有一些 AOP 的增强类对其进行包装,这里不讨论增强逻辑,它最终会调用 FailoverClusterInvoker 的 invoke 方法进行远程方法调用。

1. FailoverClusterInvoker

Dubbo 提供了多种集群容错策略,FailoverClusterInvoker 是默认的集群容错策略。
image.png

  • FailoverClusterInvoker:失败重试。当服务消费方调用服务提供者失败后,会自动切换到其他服务提供者服务器进行重试,这通常用于读操作或具有零等的写操作。

  • FailfastClusterInvoker:快速失败。当服务消费方调用服务提供者失败后,立即报错,也就是只调用一次。通常,这种模式用于非幂等性的写操作。

  • FailsafeClusterInvoker:安全失败。当服务消费者调用服务出现异常时,直接忽略异常。这种模式通常用于写入审计日志等不重要的操作。

  • FailbackClusterInvoker:失败自动恢复。当服务消费端调用服务出现异常后,在后台记录失败的请求,并按照一定的策略后期再进行重试。这种模式通常用于消息通知操作。

  • ForkingClusterInvoker:并行调用。当消费方调用一个接口方法后,Dubbo Client 会并行调用多个服务提供者的服务,只要其中有一个成功即返回。这种模式通常用于实时性要求较高的读操作,但需要浪费更多的服务资源。

  • BroadcastClusterInvoker:广播调用。当消费者调用一个接口方法后,Dubbo Client 会逐个调用所有服务提供者,任意一台服务器调用异常则这次调用就标志失败,这种模式通常用于通知所有提供者更新缓存或日志等本地资源信息。

FailoverClusterInvoker 继承了 AbstractClusterInvoker 类,在该父类中提供了 invoke 方法的具体实现:

  1. @Override
  2. public Result invoke(final Invocation invocation) throws RpcException {
  3. // 获取所有远程服务调用列表
  4. List<Invoker<T>> invokers = list(invocation);
  5. // 初始化负载均衡策略
  6. LoadBalance loadbalance = initLoadBalance(invokers, invocation);
  7. return doInvoke(invocation, invokers, loadbalance);
  8. }

其中,doInvoke 方法是由具体的子类实现的,FailoverClusterInvoker 实现的 doInvoke 逻辑如下:

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. List<Invoker<T>> copyInvokers = invokers;
  3. String methodName = RpcUtils.getMethodName(invocation);
  4. // 根据 retries 参数确定重试次数
  5. int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
  6. if (len <= 0) {
  7. len = 1;
  8. }
  9. // 进行重试循环
  10. RpcException le = null; // last exception.
  11. List<Invoker<T>> invoked = new ArrayList<>(copyInvokers.size());
  12. Set<String> providers = new HashSet<>(len);
  13. for (int i = 0; i < len; i++) {
  14. // 根据负载均衡策略选择一个调用者
  15. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
  16. invoked.add(invoker);
  17. RpcContext.getContext().setInvokers((List) invoked);
  18. try {
  19. // 发起远程调用
  20. Result result = invoker.invoke(invocation);
  21. return result;
  22. } catch (RpcException e) {
  23. // 根据异常类型判断是直接抛出异常还是进行重试
  24. if (e.isBiz()) {
  25. throw e;
  26. }
  27. le = e;
  28. } catch (Throwable e) {
  29. // 记录异常信息
  30. le = new RpcException(e.getMessage(), e);
  31. } finally {
  32. providers.add(invoker.getUrl().getAddress());
  33. }
  34. }
  35. // 如果执行了指定重试次数,仍未执行成功,则抛出异常
  36. throw new RpcException(le.getCode(), "Failed to invoke the method "
  37. + methodName + " in the service " + getInterface().getName()
  38. + ". Tried " + len + " times of the providers " + providers
  39. + " (" + providers.size() + "/" + copyInvokers.size()
  40. + ") from the registry " + directory.getUrl().getAddress()
  41. + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
  42. + Version.getVersion() + ". Last error is: "
  43. + le.getMessage(), le.getCause() != null ? le.getCause() : le);
  44. }

默认的集群容错策略 FailoverClusterInvoker,其内部首先根据设置的负载均衡策略 LoadBalance 的扩展实现,从 Directory 返回的 Invoker 列表中选择一个 Invoker 作为 FailoverClusterInvoker 具体的远程调用者,如果调用发生异常,则重新选择一个 Invoker 进行调用。当达到最大重试次数后,不再重试,抛出调用异常。

2. LoadBalance

当服务提供方是集群时,为了避免大量请求一直集中在一个或者几个服务提供方机器上,从而使这些机器负载很高,其至导致服务不可用,就需要做一定的负载均衡策略。Dubbo 提供了多种负载均衡策略,默认为 random,也就是每次随机调用一台服务提供者的服务。Dubbo 提供的负载均衡策略有如下几种:
image.png

  • RandomLoadBalance:随机策略。按照概率设置权重,比较均匀,并且可以动态调节提供者的权重。


  • RoundRobinLoadBalance:轮训策略。按公约后的权重设置轮训比例,会存在执行比较慢的服务提供者堆积请求的情況,比如一个机器执行得非常慢,但是机器没有宕机,当很多新的请求到达该机器后,由于之前的请求还没处理完,会导致新的请求被堆积,久而久之,消费者调用这台机器上的所有请求都会被阻塞。


  • LeastActiveLoadBalance:最少活跃调用数。如果每个提供者的活跃数相同,则随机选择一个。在每个服务提供者里维护着一个活跃数计数器,用来记录当前同时处理请求的个数,也就是并发处理任务的个数。这个值越小,说明当前服务提供者处理的速度越快或者当前机器的负载此较低,所以路由选择时就选择该活跃度最低的机器。如果一个服务提供者处理速度很慢,由于堆积,那么同时处理的请求就比较多,也就是活跃调用数较大(活跃度较高),这时,处理速度慢的提供者将收到更少的请求。


  • ConsistentHashLoadBalance:一致性 Hash 策略。一致性 Hash 可以保证相同参数的请求总是发到同一提供者,当某一台提供者机器宕机时,原本发往该提供者的请求,将基于虚拟节点平推给其他提供者,这样就不会引起剧烈变动。

AbstractLoadBalance 实现了 LoadBalance 接口,上面的各种负载均衡策略实际上就是继承了这个抽象类,但重写了其 doSelect 方法,下面我们看下默认的 RandomLoadBalance 策略的 doSelect 方法实现:

  1. @Override
  2. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  3. // 服务提供者的总个数
  4. int length = invokers.size();
  5. // 所有服务提供者的权重是否都是一样的
  6. boolean sameWeight = true;
  7. // 存放所有服务提供者设置的权重
  8. int[] weights = new int[length];
  9. // 总权重,用来进行单个Invoker的权重比例计算
  10. int totalWeight = 0;
  11. for (int i = 0; i < length; i++) {
  12. // 累加所有服务提供者设置的权重
  13. int weight = getWeight(invokers.get(i), invocation);
  14. totalWeight += weight;
  15. weights[i] = totalWeight;
  16. if (sameWeight && totalWeight != weight * (i + 1)) {
  17. sameWeight = false;
  18. }
  19. }
  20. // 如果所有服务提供者的权重并不都是一样的,并且至少有一个提供者的权重大于0,则基于总权重随机选择一个
  21. if (totalWeight > 0 && !sameWeight) {
  22. // 基于随机值返回一个Invoker,判断这个随机值落在哪个权重区间内就选择哪个Invoker
  23. int offset = ThreadLocalRandom.current().nextInt(totalWeight);
  24. for (int i = 0; i < length; i++) {
  25. if (offset < weights[i]) {
  26. return invokers.get(i);
  27. }
  28. }
  29. }
  30. // 随机选择
  31. return invokers.get(ThreadLocalRandom.current().nextInt(length));
  32. }

这里生成随机数采用的是 ThreadLocalRandom 类,这是出于性能上的考虑,因为 Random 在高并发下会导致大量线程竞争同一个原子变量,导致大量线程原地自旋,从而浪费 CPU 资源。

3. DubboInvoker

在上面代码第 20 行进行远程调用时使用的 invoker 对象,也是一个经过层层增强的包装类,但最终调用到的是原生的 DubboInvoker 中的 invoke 方法,其使用 NettyClient 与服务提供者进行交互,其核心逻辑主要在 DubboInvoker 中的 doInvoke 方法中,具体逻辑如下:

  1. @Override
  2. protected Result doInvoke(final Invocation invocation) throws Throwable {
  3. // 设置Invocation附加属性
  4. RpcInvocation inv = (RpcInvocation) invocation;
  5. final String methodName = RpcUtils.getMethodName(invocation);
  6. inv.setAttachment(PATH_KEY, getUrl().getPath());
  7. inv.setAttachment(VERSION_KEY, version);
  8. // 获取用于远程调用的NettyClient
  9. ExchangeClient currentClient;
  10. if (clients.length == 1) {
  11. currentClient = clients[0];
  12. } else {
  13. currentClient = clients[index.getAndIncrement() % clients.length];
  14. }
  15. // 执行远程调用
  16. try {
  17. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  18. // 超时等待时间
  19. int timeout = calculateTimeout(invocation, methodName);
  20. invocation.put(TIMEOUT_KEY, timeout);
  21. // 是否为Oneway,也就是不需要响应结果的请求
  22. if (isOneway) {
  23. // 不需要响应的请求
  24. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
  25. currentClient.send(inv, isSent);
  26. return AsyncRpcResult.newDefaultAsyncResult(invocation);
  27. } else {
  28. // 获取消费端线程池,用来执行CompletableFuture的回调逻辑
  29. ExecutorService executor = getCallbackExecutor(getUrl(), inv);
  30. // request方法用来通过NettyClient与服务提供者进行请求,这里不会阻塞获取响应结果
  31. CompletableFuture<AppResponse> appResponseFuture =
  32. currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
  33. AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
  34. return result;
  35. }
  36. } catch (TimeoutException e) {
  37. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  38. } catch (RemotingException e) {
  39. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  40. }
  41. }

request 方法的核心逻辑如下:

  1. @Override
  2. public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
  3. // 创建请求信息
  4. Request req = new Request();
  5. req.setVersion(Version.getProtocolVersion());
  6. req.setTwoWay(true);
  7. req.setData(request);
  8. // 将channel封装到Future中,后续接收响应信息都在这个Future中
  9. DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
  10. try {
  11. // 发送出去后,主线程就直接返回了
  12. channel.send(req);
  13. } catch (RemotingException e) {
  14. future.cancel();
  15. throw e;
  16. }
  17. return future;
  18. }

可以看到,服务消费者并不是直接阻塞等待提供者的响应的,而是通过回调建立了一个异步逻辑,避免了主线程一直阻塞。并且,doInvoke 方法最终返回的是一个 AsyncRpcResult 对象,这个对象只有在真正获取结果信息时才会去尝试阻塞获取 Future 的值。当服务消费端需要将调用结果返回给调用方时,会执行 Result 接口的 getValue 方法,而 AsyncRpcResult 重写了该方法:

  1. public class AsyncRpcResult implements Result {
  2. @Override
  3. public Object getValue() {
  4. return getAppResponse().getValue();
  5. }
  6. }
  7. public Result getAppResponse() {
  8. try {
  9. if (responseFuture.isDone()) {
  10. // 调用Future的get方法阻塞主线程直到获取了响应结果
  11. return responseFuture.get();
  12. }
  13. } catch (Exception e) {
  14. throw new RpcException(e);
  15. }
  16. return createDefaultValue(invocation);
  17. }

服务提供端处理请求

服务提供端在暴露服务时,会启动一个 NettyServer 服务器监听指定端口的连接,当接收到服务消费端发来的请求时,服务提供方的 NettyServer 的 connect 方法会被激活,该方法的执行是在 Netty 的 I/O 线程上执行的,为了可以及时释放 I/O 线程,Netty 默认的线程模型为 ALL,即所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求事件、响应事件、连接事件、断开事件、心跳事件等。

1. Dubbo 线程模型

Dubbo 默认的底层网络通信使用的是 Netty,服务提供方 Netty Server 使用两级线程池,其中 EventLoopGroup(boss) 主要用来接收客户端的连接请求,并把完成 TCP 三次握手的连接分发给 EventLoopGroup(worker) 来处理,我们把 boss 和 worker 线程组称为 IO 线程。

如果服务提供方的逻辑处理能迅速完成,并且不会发起新的 IO 请求,那么直接在 IO 线程上处理会更快,因为这样减少了线程池调度与上下文切换的开销。但如果处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则 IO 线程必须派发请求到新的线程池进行处理,否则 IO 线程会被阻寒,导致不能接收其他请求,根据请求的消息类是被 IO 线程处理还是被业务线程池处理,Dubbo 提供了下面几种线程模型:
image.png

  • all(AllDispatcher 类):所有的消息都派发到 Dubbo 内部的业务线程池,这些消息包括请求、响应、连接事件、断开事件、心跳事件等。它是默认的线程模型。

  • direct(DirectDispatcher 类):所有的消息都不派发到 Dubbo 内部的业务线程池,全部在 IO 线程上直接执行。

  • message(MessageOnlyDispatcher 类):只有请求响应消息派发到 Dubbo 内部的业务线程池,其他消息如连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。

  • execution(ExecutionDispatcher 类):只把请求类消息派发到 Dubbo 内部的业务线程池,其他消息如响应、连接事件、断开事件、心跳事件等,直接在 IO 线程上执行。

  • connection(ConnectionOrderedDispatcher 类):在 IO 线程上将连接事件、断开事件放入队列,有序地逐个执行,其他消息派发到 Dubbo 内部的业务线程池。

2. AllChannelHandler

AllDispatcher 类的具体实现在 AllChannelHandler 中。所以,NettyServer 的 connect 方法被激活后,最终执行的是 AllChannelHandler 中的 connect 方法。AllChannelHandler 类会把 I/O 线程接收到的所有消息包装为 ChannelEventRunnable 任务并投递到线程池。

  1. @Override
  2. public void connected(Channel channel) throws RemotingException {
  3. ExecutorService executor = getExecutorService();
  4. try {
  5. // 交给Dubbo内部的业务线程池处理,不阻塞IO线程
  6. executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
  7. } catch (Throwable t) {
  8. throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
  9. }
  10. }

线程池里的任务被执行后,最终会调用到 DubboProtocol 的 connected 方法:

  1. @Override
  2. public void connected(Channel channel) throws RemotingException {
  3. invoke(channel, ON_CONNECT_KEY);
  4. }

3. DubboProtocol

DubboProtocol 的 invoke 方法是一个通用方法,其逻辑如下:

  1. private void invoke(Channel channel, String methodKey) {
  2. // 创建Invocation对象
  3. Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
  4. if (invocation != null) {
  5. try {
  6. // 接收服务消费端传过来的数据
  7. received(channel, invocation);
  8. } catch (Throwable t) {
  9. logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
  10. }
  11. }
  12. }
  13. @Override
  14. public void received(Channel channel, Object message) throws RemotingException {
  15. // 可以看到,入参message的类型为Invocation,所以这里会调用reply方法
  16. if (message instanceof Invocation) {
  17. reply((ExchangeChannel) channel, message);
  18. } else {
  19. super.received(channel, message);
  20. }
  21. }

reply 方法逻辑如下:

  1. @Override
  2. public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
  3. Invocation inv = (Invocation) message;
  4. // 获取该调用服务对应的 Invoker 对象
  5. Invoker<?> invoker = getInvoker(channel, inv);
  6. // 判断调用的方法是否存在
  7. ......
  8. // 将对端地址放到上下文对象中
  9. RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
  10. // 执行invoke调用链
  11. Result result = invoker.invoke(inv);
  12. return result.thenApply(Function.identity());
  13. }

在 getInvoker 方法中通过 DubboExporter 对象来获取对应 Invoker 对象:

  1. Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
  2. String serviceKey = serviceKey(
  3. port, path,
  4. (String) inv.getObjectAttachments().get(VERSION_KEY),
  5. (String) inv.getObjectAttachments().get(GROUP_KEY)
  6. );
  7. // 在服务提供方暴露服务时,会将DubboExporter缓存在exporterMap中,所以这里直接从缓冲中获取
  8. DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
  9. return exporter.getInvoker();
  10. }

这里导出的 Invoker 内部有个调用链,经过调用链后最终调用了服务提供方启动时 AbstractProxyInvoker 代理类的 invoke 方法,其代码如下:

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. try {
  3. // 具体执行本地服务调用
  4. Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
  5. CompletableFuture<Object> future = wrapWithFuture(value);
  6. // 将结果封装成AsyncRpcResult,用以异步获取方法调用结果
  7. CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
  8. AppResponse result = new AppResponse(invocation);
  9. if (t != null) {
  10. if (t instanceof CompletionException) {
  11. result.setException(t.getCause());
  12. } else {
  13. result.setException(t);
  14. }
  15. } else {
  16. result.setValue(obj);
  17. }
  18. return result;
  19. });
  20. return new AsyncRpcResult(appResponseFuture, invocation);
  21. }
  22. ......
  23. }

4. 执行 Invoker

在服务提供者暴露服务时,创建的 Invoker 逻辑如下:

  1. public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
  2. final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
  3. return new AbstractProxyInvoker<T>(proxy, type, url) {
  4. @Override
  5. protected Object doInvoke(T proxy, String methodName,
  6. Class<?>[] parameterTypes,
  7. Object[] arguments) throws Throwable {
  8. return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
  9. }
  10. };
  11. }

所以,上面调用的 doInvoke 方法实际调用的是 Wrapper 包装类的 invokeMethod 方法。这个 Wrapper 实例是通过 Javassist 动态代理生成的类,具体逻辑就是调用服务提供方接口的实现类来执行本地服务。