在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群 的形式出现的。当某个服务调用出现异常时,如网络抖动、服务短暂不可用需要自动容错、服务降级,就需要使用到集群容错机制。

Cluster层

Cluster可以看作是一个集群容错层,该层中包含Cluster、Directory、Router、LoadBalance几大核心接口。

【Dubbo】集群容错机制 - 图1

Cluster层的整体工作流程如上图所示,其中第1、2、3步都是在AbstractClusterInvoker类中实现,第4步会调用doInvoke方法,该方法为一个模板方法,交由子类去实现。

容错机制扩展

Dubbo提供了Failover、Failfast、Failsafe、Failback、Forking、Broadcast、AvailableClusterInvoker等容错机制,默认选择Failover机制。下面简单介绍一下这几种容错机制:

Failover

当出现失败时,会重试其他服务器。用户可以通过retries="2",设置重试次数(不包含第一次)。这是Dubbo默认容错机制,会对请求做负载均衡。通常使用在读操作或幂等写操作上, 但重试会导致接口的延迟增大,在下游机器负载已经达到极限时,重试容易加重下游服务的负载。

Failfast

快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制会对请求做负载均衡,通常使用在非幂等接口的调用上,比如新增记录。

Failsafe

当出现异常时,直接忽略异常。会对请求做负载均衡。通常使用在“佛系”调用场景, 即不关心调用是否成功,并且不想抛异常影响外层调用,如某些不重要的日志同步,即使出现异常也无所谓。

Failback

请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些异步或最终一致性的请求。请求会做负载均。

Forking

同时调用多个相同的服务,只要其中一个返回,则立即返回结果。用户可以配置forks="2"来设置最大并行调用的服务数量。通常使用在对接口实时性要求极高的调用上,但也会浪费更多的资源。

Broadcast

广播调用所有可用的服务,任意一个节点报错则报错。由于是广播,因此请求不需要做负载均衡。通常用于服务状态更新后的广播。

Available

请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点, 直接请求并返回结果。如果没有可用的节点,则直接抛出异常。

Mock

提供调用失败时,返回伪造的响应结果。或直接强制返回伪造的结果,不会发起远程调用。

Mergeable

可以自动把多个节点请求得到的结果进行合并。

容错接口

容错接口主要分为两大类,Cluester以及ClusterInvoker。先来看一下CluesterClusterInvoker接口的类图,如下图所示:

【Dubbo】集群容错机制 - 图2

【Dubbo】集群容错机制 - 图3

两者的关系如以下代码所示,Cluster接口下面有多种不同的实现,每种实现中都需要实现接口的join方法,在方法中会“new”一个对应的ClusterInvoker

  1. @SPI(FailoverCluster.NAME)
  2. public interface Cluster {
  3. @Adaptive
  4. <T> Invoker<T> join(Directory<T> directory) throws RpcException;
  5. }
  6. public class FailoverCluster implements Cluster {
  7. public final static String NAME = "failover";
  8. @Override
  9. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  10. return new FailoverClusterInvoker<T>(directory);
  11. }
  12. }

源码解析

AbstractClusterInvoker#invoke方法

invoke方法主要包含以下逻辑:

  1. 设置attachments
  2. 获取可用的Invoker列表
  3. 获取负载均衡策略(默认random
  4. 调用子类实现的doInvoke方法
  1. public Result invoke(final Invocation invocation) throws RpcException {
  2. checkWhetherDestroyed();
  3. LoadBalance loadbalance = null;
  4. // binding attachments into invocation.
  5. //设置attachments,attachments用来在服务消费方和提供方之间进行参数的隐式传递
  6. //可以看官方文档http://dubbo.apache.org/zh-cn/docs/user/demos/attachment.html
  7. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
  8. if (contextAttachments != null && contextAttachments.size() != 0) {
  9. ((RpcInvocation) invocation).addAttachments(contextAttachments);
  10. }
  11. //获取可调用的Invoker列表
  12. List<Invoker<T>> invokers = list(invocation);
  13. //获取负载均衡策略,默认random
  14. if (invokers != null && !invokers.isEmpty()) {
  15. loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
  16. }
  17. //幂等操作,如果是异步调用,则在attachments里添加invocationId,每次异步调用id都会+1。
  18. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
  19. //调用子类实现的doInvoke方法
  20. return doInvoke(invocation, invokers, loadbalance);
  21. }

AbstractClusterInvoker#list方法

list方法的逻辑比较简单,直接调用Directory#list方法获取可用的Invoker列(关于Directory#list方法后续再分析),代码如下所示:

  1. protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
  2. List<Invoker<T>> invokers = directory.list(invocation);
  3. return invokers;
  4. }

FailoverClusterInvoker#doInvoke

FailoverClusterInvoker#doInvoke方法主要包含以下逻辑:

  1. 校验invoker列表是否为空
  2. 获取重试次数
  3. 循环调用
    1. 重新获取invokers(如果是重试阶段),并再次校验invoker列表是否为空
    2. 负载均衡选择一个invoker
    3. 远程调用,成功则返回
    4. 失败则记录异常和providers
  4. 重试完了还没成功,抛出异常

代码如下所示:

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. List<Invoker<T>> copyinvokers = invokers;
  3. //检测invokers是否为空
  4. checkInvokers(copyinvokers, invocation);
  5. String methodName = RpcUtils.getMethodName(invocation);
  6. //获取重试次数+1,因为设置的值是不包括第一次的
  7. int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
  8. if (len <= 0) {
  9. len = 1;
  10. }
  11. // retry loop.
  12. RpcException le = null; // last exception.
  13. //已经调用过的invoker
  14. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
  15. Set<String> providers = new HashSet<String>(len);
  16. //循环重试
  17. for (int i = 0; i < len; i++) {
  18. //Reselect before retry to avoid a change of candidate `invokers`.
  19. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  20. if (i > 0) {
  21. checkWhetherDestroyed();
  22. //在重试之前重新获取invokers列表。
  23. copyinvokers = list(invocation);
  24. // check again
  25. //再次检测invokers是否为空
  26. checkInvokers(copyinvokers, invocation);
  27. }
  28. //负载均衡选择Invoker
  29. Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
  30. //将invoker添加到已经调用过的invoked列表
  31. invoked.add(invoker);
  32. //设置invoker到rpc上下文里
  33. RpcContext.getContext().setInvokers((List) invoked);
  34. try {
  35. //调用
  36. Result result = invoker.invoke(invocation);
  37. //省略日志输出代码
  38. }
  39. return result;
  40. } catch (RpcException e) {
  41. if (e.isBiz()) { // biz exception.
  42. throw e;
  43. }
  44. le = e;
  45. } catch (Throwable e) {
  46. le = new RpcException(e.getMessage(), e);
  47. } finally {
  48. providers.add(invoker.getUrl().getAddress());
  49. }
  50. }
  51. //重试完了,还没成功
  52. throw new RpcException("省略异常抛出内容代码");
  53. }

FailfastClusterInvoker#doInvoke方法

由于前面的介绍可以知道,failfast策略遇到异常会直接抛出。所以该doInvoker方法主要包含以下逻辑:

  1. 检测invokers是否为空
  2. 负载均衡选择Invoker
  3. 调用,成功就返回,遇到异常直接抛出

代码如下所示:

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. //检测invokers是否为空
  3. checkInvokers(invokers, invocation);
  4. //负载均衡选择Invoker
  5. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
  6. try {
  7. //调用,遇到异常直接抛出
  8. return invoker.invoke(invocation);
  9. } catch (Throwable e) {
  10. if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
  11. throw (RpcException) e;
  12. }
  13. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
  14. }
  15. }

由以上两个doInvoke方法的分析可知,ClusterInvoker的大致流程是:检测 -> 负载均衡 -> 调用 -> 异常处理。其他ClusterInvoker的逻辑都与其类似,故此处暂且不做分析。

AbstractClusterInvoker#select方法

我们可以注意到,如果需要负载均衡,在doInvoker方法内部都会调用select方法。AbstractClusterInvoker#select主要是针对粘滞连接做了特定的处理,若粘滞连接为空或不可用则调用doSelect方法重新选取Invoker,具体可看代码注释。这里再讲一下几个参数的含义:

  1. invokers:可用的服务列表
  2. invoked:已经调用过的服务列表(没调成功的)
  3. 粘滞连接:用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。

代码如下所示:

  1. protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  2. if (invokers == null || invokers.isEmpty())
  3. return null;
  4. //获取方法名
  5. String methodName = invocation == null ? "" : invocation.getMethodName();
  6. //获取sticky,sticky表示粘滞连接。
  7. boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
  8. {
  9. //ignore overloaded method
  10. //如果invokers列表不包括stickyInvoker,则说明stickyInvoker挂了,这里将其置空
  11. if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
  12. stickyInvoker = null;
  13. }
  14. //ignore concurrency problem
  15. //selected是已经调用过的Invoker列表。如果selected包含stickyInvoker,则说明stickyInvoker没调成功。但是如果invokers还是包含stickyInvoker话,说明stickyInvoker没挂。
  16. //判断的含义 : (支持粘滞连接 && 粘滞连接的Invoker不为空 && (粘滞连接未被调用过))
  17. if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
  18. //如果打开了可用性检查,则检查stickyInvoker是否可用,可用则返回
  19. if (availablecheck && stickyInvoker.isAvailable()) {
  20. return stickyInvoker;
  21. }
  22. }
  23. }
  24. //重新选一个
  25. Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
  26. //如果支持粘滞连接
  27. if (sticky) {
  28. stickyInvoker = invoker;
  29. }
  30. return invoker;
  31. }

AbstractClusterInvoker#doSelect方法

AbstractClusterInvoker#doSelect方法主要包含以下逻辑:

  1. 通过负载均衡策略选择Invoker
  2. 如果Invoker已经被调用过了,或者未进行或未通过可用性检查,则进行重选
  3. 重选成功则返回,失败则选(第一步选出来的Invoker所在列表)下一个
  1. private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  2. if (invokers == null || invokers.isEmpty())
  3. return null;
  4. //如果就一个,选个🔨,直接返回
  5. if (invokers.size() == 1)
  6. return invokers.get(0);
  7. //如果loadbalance,则加载默认的负载均衡策略
  8. if (loadbalance == null) {
  9. loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
  10. }
  11. Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
  12. //selected包含该invoker || (invoker未进行或未通过可用性检查) -> 重选
  13. if ((selected != null && selected.contains(invoker))
  14. || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
  15. try {
  16. Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
  17. //如果重选的rinvoker不为空,则赋值给invoker
  18. if (rinvoker != null) {
  19. invoker = rinvoker;
  20. }
  21. //如果重选的rinvoker为空,则选择 (刚刚选出来的那个invoker所在列表) 的下一个,如果是最后一个则选第一个
  22. else {
  23. int index = invokers.indexOf(invoker);
  24. try {
  25. invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
  26. } //省略异常日志代码
  27. }
  28. } //省略异常日志代码
  29. }
  30. return invoker;
  31. }

AbstractClusterInvoker#reSelect方法

AbstractClusterInvoker#reSelect方法主要进行重选操作,逻辑如下所示:

  1. 找到可用的Invoker,加入到reselectInvokers
  2. 如果reselectInvokers不为空,则通过负载均衡策略再次选择

代码如下所示:

  1. private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
  2. List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
  3. throws RpcException {
  4. //Allocating one in advance, this list is certain to be used.
  5. List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
  6. //First, try picking a invoker not in `selected`.
  7. //允许可用性检查,遍历invokers列表,找到可用的,且未被调用过的,丢到reselectInvokers列表中,再用reselectInvokers进行负载均衡选择并返回
  8. if (availablecheck) { // invoker.isAvailable() should be checked
  9. for (Invoker<T> invoker : invokers) {
  10. if (invoker.isAvailable()) {
  11. if (selected == null || !selected.contains(invoker)) {
  12. reselectInvokers.add(invoker);
  13. }
  14. }
  15. }
  16. if (!reselectInvokers.isEmpty()) {
  17. return loadbalance.select(reselectInvokers, getUrl(), invocation);
  18. }
  19. }
  20. //不允许可用性检查,遍历invokers列表,找到未被调用过的,丢到reselectInvokers列表中,再用reselectInvokers进行负载均衡选择并返回
  21. else { // do not check invoker.isAvailable()
  22. for (Invoker<T> invoker : invokers) {
  23. if (selected == null || !selected.contains(invoker)) {
  24. reselectInvokers.add(invoker);
  25. }
  26. }
  27. if (!reselectInvokers.isEmpty()) {
  28. return loadbalance.select(reselectInvokers, getUrl(), invocation);
  29. }
  30. }
  31. // Just pick an available invoker using loadbalance policy
  32. //如果执行到这了,则说明reselectInvokers为空,直接在selected里找可用,丢到reselectInvokers列表中,再用reselectInvokers进行负载均衡选择并返回
  33. {
  34. if (selected != null) {
  35. for (Invoker<T> invoker : selected) {
  36. if ((invoker.isAvailable()) // available first
  37. && !reselectInvokers.contains(invoker)) {
  38. reselectInvokers.add(invoker);
  39. }
  40. }
  41. }
  42. if (!reselectInvokers.isEmpty()) {
  43. return loadbalance.select(reselectInvokers, getUrl(), invocation);
  44. }
  45. }
  46. return null;
  47. }

关于LoadBalance#select方法的代码后续再分析。

参考

Dubbo官网

《深入理解Apache Dubbo与实战》