集群模块简介

集群,是指同一个服务 被部署在了多个服务器上,每个服务器的任务都相同,能够以较高的性价比,提升系统的 性能、可靠性、灵活性,但同时也要面对 集群中会出现的 负载均衡、容错等问题。dubbo 的集群模块,主要涉及以下几部分内容。

  • 负载均衡策略:dubbo 支持的所有负载均衡策略算法;
  • 集群容错:Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个;
  • 路由:dubbo 路由规则,路由规则决定了一次 dubbo 服务调用的目标服务器,路由规则分两种:条件路由规则和脚本路由规则,并且支持可拓展;
  • 配置:根据 url 上的配置规则生成配置信息;
  • 分组聚合:合并返回结果;
  • 本地伪装:mock 通常用于服务降级,mock 只在非业务异常时执行,如 超时、网络异常等。

集群工作过程可分为两个阶段,第一个阶段是在消费者初始化期间,集群 Cluster 为消费者创建 ClusterInvoker 实例。第二个阶段是在消费者进行 RPC 时,以 FailoverClusterInvoker 为例,该实例首先会调用 Directory 的 list()方法 获取 Invoker 列表,然后根据配置的 负载均衡策略,从 Invoker 列表 中选择一个 Inovker,最后将参数传给选择出的 Invoker 实例 进行真正的远程调用。

可将上文中出现的 Invoker 简单理解为服务提供者,Directory 的用途是保存 Invoker 列表,实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Inovker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Inovker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。

下面我们来看一下 集群模块的项目结构图,结合上文的描述,可以对其有更加深刻的理解。

avatar

集群模块核心 API 源码解析

从上图应该也能看出其核心 API 在哪个包里。

avatar

各核心接口的源码如下。

  1. /**
  2. * 集群接口
  3. */
  4. @SPI(FailoverCluster.NAME)
  5. public interface Cluster {
  6. /**
  7. * 将目录调用程序合并到虚拟调用程序。
  8. * 基于 Directory ,创建 Invoker 对象,实现统一、透明的 Invoker 调用过程
  9. *
  10. * @param directory Directory 对象
  11. * @param <T> 泛型
  12. * @return cluster invoker
  13. * @throws RpcException
  14. */
  15. @Adaptive
  16. <T> Invoker<T> join(Directory<T> directory) throws RpcException;
  17. }
  18. /**
  19. * Configurator 接口
  20. */
  21. public interface Configurator extends Comparable<Configurator> {
  22. /**
  23. * 配置规则,生成url
  24. */
  25. URL getUrl();
  26. /**
  27. * 把规则配置到URL中
  28. */
  29. URL configure(URL url);
  30. }
  31. @SPI
  32. public interface ConfiguratorFactory {
  33. /**
  34. * 获得 Configurator实例
  35. */
  36. @Adaptive("protocol")
  37. Configurator getConfigurator(URL url);
  38. }
  39. public interface Directory<T> extends Node {
  40. /**
  41. * 获得服务类型,例如:com.alibaba.dubbo.demo.DemoService
  42. */
  43. Class<T> getInterface();
  44. /**
  45. * 获得所有服务 Invoker 集合
  46. */
  47. List<Invoker<T>> list(Invocation invocation) throws RpcException;
  48. }
  49. @SPI(RandomLoadBalance.NAME)
  50. public interface LoadBalance {
  51. /**
  52. * 从 Invoker 集合中,选择一个合适的 Invoker
  53. */
  54. @Adaptive("loadbalance")
  55. <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
  56. }
  57. @SPI
  58. public interface Merger<T> {
  59. /**
  60. * 合并 T 数组,返回合并后的 T 对象
  61. *
  62. * @param items T 数组
  63. * @return T 对象
  64. */
  65. T merge(T... items);
  66. }
  67. /**
  68. * 路由规则接口
  69. */
  70. public interface Router extends Comparable<Router> {
  71. /**
  72. * 获得路由规则的url
  73. */
  74. URL getUrl();
  75. /**
  76. * 筛选出跟规则匹配的Invoker集合
  77. *
  78. * @param invokers Invoker 集合
  79. * @param url refer url
  80. * @param invocation
  81. * @return routed invokers 路由后的 Invoker 集合
  82. * @throws RpcException
  83. */
  84. <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
  85. }
  86. @SPI
  87. public interface RouterFactory {
  88. /**
  89. * 创建 Router 对象
  90. */
  91. @Adaptive("protocol")
  92. Router getRouter(URL url);
  93. }