Dubbo官网负载均衡介绍

负载均衡策略

目前 Dubbo 内置了如下负载均衡算法,用户可直接配置使用:

算法 特性 备注
RandomLoadBalance 加权随机 默认算法,默认权重相同
RoundRobinLoadBalance 加权轮询 借鉴于 Nginx 的平滑加权轮询算法,默认权重相同,
LeastActiveLoadBalance 最少活跃优先 + 加权随机 背后是能者多劳的思想
ShortestResponseLoadBalance 最短响应优先 + 加权随机 更加关注响应速度
ConsistentHashLoadBalance 一致性 Hash 确定的入参,确定的提供者,适用于有状态请求

源码版本

查看Dubbo源码版本是2.77,maven依赖如下:

  1. <dependency>
  2. <groupId>org.apache.dubbo</groupId>
  3. <artifactId>dubbo-spring-boot-starter</artifactId>
  4. <version>${dubbo-starter.version}</version>
  5. </dependency>

负载均衡的抽象父类

  1. public abstract class AbstractLoadBalance implements LoadBalance {
  2. static int calculateWarmupWeight(int uptime, int warmup, int weight) {
  3. //uptime:启动时间 warmup:invoker的预热时间,weight:invoker配置的权重
  4. //权重=uptime/(warmup/weight)
  5. int ww = (int) ( uptime / ((float) warmup / weight));
  6. //计算后的权重小于1则返回1,否则去计算后的权重和配置的权重中较小那个
  7. return ww < 1 ? 1 : (Math.min(ww, weight));
  8. }
  9. @Override
  10. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  11. if (CollectionUtils.isEmpty(invokers)) {
  12. return null;
  13. }
  14. if (invokers.size() == 1) {
  15. return invokers.get(0);
  16. }
  17. return doSelect(invokers, url, invocation);
  18. }
  19. //抽象方法,由负载均衡实现类实现方法
  20. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
  21. int getWeight(Invoker<?> invoker, Invocation invocation) {
  22. int weight;
  23. URL url = invoker.getUrl();
  24. // Multiple registry scenario, load balance among multiple registries
  25. if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
  26. weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
  27. } else {
  28. // 获取invoker配置的权重,默认权重(int DEFAULT_WEIGHT = 100;)
  29. weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
  30. if (weight > 0) {
  31. // 获取invoker的启动时间戳
  32. long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
  33. if (timestamp > 0L) {
  34. //invoker 启动时间
  35. long uptime = System.currentTimeMillis() - timestamp;
  36. if (uptime < 0) {
  37. return 1;
  38. }
  39. // invoker的预热时间,默认10分钟(int DEFAULT_WARMUP = 10 * 60 * 1000;)
  40. int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
  41. // 如果服务启动时间小于预热时间,则重新计算权重
  42. if (uptime > 0 && uptime < warmup) {
  43. //计算算法
  44. weight = calculateWarmupWeight((int)uptime, warmup, weight);
  45. }
  46. }
  47. }
  48. }
  49. return Math.max(weight, 0);
  50. }
  51. }

加权随机(默认)

源码

  1. public class RandomLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "random";
  3. /**
  4. * Select one invoker between a list using a random criteria
  5. * @param invokers List of possible invokers
  6. * @param url URL
  7. * @param invocation Invocation
  8. * @param <T>
  9. * @return The selected invoker
  10. */
  11. @Override
  12. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  13. // invoker集合长度
  14. int length = invokers.size();
  15. // 先假设所有invoker的权重都一样
  16. boolean sameWeight = true;
  17. // invokers是个集合,weights记录各个invoker的权重
  18. int[] weights = new int[length];
  19. // 先获取第一个invoker的权重
  20. int firstWeight = getWeight(invokers.get(0), invocation);
  21. weights[0] = firstWeight;
  22. // 所有invoker总权重,
  23. int totalWeight = firstWeight;
  24. // 循环更新weights,累加totalWeight,并根据权重修改sameWeight
  25. for (int i = 1; i < length; i++) {
  26. int weight = getWeight(invokers.get(i), invocation);
  27. // save for later use
  28. weights[i] = weight;
  29. // Sum
  30. totalWeight += weight;
  31. if (sameWeight && weight != firstWeight) {
  32. sameWeight = false;
  33. }
  34. }
  35. // 存在invoker权重不一致,总权重大于0
  36. if (totalWeight > 0 && !sameWeight) {
  37. // 随机获取[0,totalWeight)范围的数值,取得一个随机权重
  38. int offset = ThreadLocalRandom.current().nextInt(totalWeight);
  39. // 下面的循环是从权重数组里获取对应invoker的权重
  40. for (int i = 0; i < length; i++) {
  41. // 随机权重减去invoker的权重值
  42. offset -= weights[i];
  43. if (offset < 0) {
  44. // 如果随机权重见到小于0后,返回当前下标的invoker
  45. return invokers.get(i);
  46. }
  47. }
  48. }
  49. // 如果总权重为0,或者所有权重一样,随机返回一个
  50. return invokers.get(ThreadLocalRandom.current().nextInt(length));
  51. }
  52. }

最短优先+加权轮询随机、最少活跃优先+加权随机策略最后的实现与加权随机基本一致。

加权轮询

官网介绍

  • 加权轮询,按公约后的权重设置轮询比率,循环调用节点
  • 缺点:同样存在慢的提供者累积请求的问题。

加权轮询过程过程中,如果某节点权重过大,会存在某段时间内调用过于集中的问题。
例如 ABC 三节点有如下权重:{A: 3, B: 2, C: 1}
那么按照最原始的轮询算法,调用过程将变成:A A A B B C

对此,Dubbo 借鉴 Nginx 的平滑加权轮询算法,对此做了优化,调用过程可抽象成下表:

轮前加和权重 本轮胜者 合计权重 轮后权重(胜者减去合计权重)
起始轮 \ \ A(0), B(0), C(0)
A(3), B(2), C(1) A 6 A(-3), B(2), C(1)
A(0), B(4), C(2) B 6 A(0), B(-2), C(2)
A(3), B(0), C(3) A 6 A(-3), B(0), C(3)
A(0), B(2), C(4) C 6 A(0), B(2), C(-2)
A(3), B(4), C(-1) B 6 A(3), B(-2), C(-1)
A(6), B(0), C(0) A 6 A(0), B(0), C(0)

我们发现经过合计权重(3+2+1)轮次后,循环又回到了起点,整个过程中节点流量是平滑的,且哪怕在很短的时间周期内,概率都是按期望分布的。

如果用户有加权轮询的需求,可放心使用该算法。

源码

  1. public class RoundRobinLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "roundrobin";
  3. private static final int RECYCLE_PERIOD = 60000;
  4. protected static class WeightedRoundRobin {
  5. // 服务提供者权重
  6. private int weight;
  7. // 当前权重
  8. private AtomicLong current = new AtomicLong(0);
  9. // 最后一次更新时间
  10. private long lastUpdate;
  11. public int getWeight() {
  12. return weight;
  13. }
  14. public void setWeight(int weight) {
  15. this.weight = weight;
  16. // 初始值为0
  17. current.set(0);
  18. }
  19. public long increaseCurrent() {
  20. // CAS操作 current = current + weight
  21. return current.addAndGet(weight);
  22. }
  23. public void sel(int total) {
  24. // CAS操作 current = current - total
  25. current.addAndGet(-1 * total);
  26. }
  27. public long getLastUpdate() {
  28. return lastUpdate;
  29. }
  30. public void setLastUpdate(long lastUpdate) {
  31. this.lastUpdate = lastUpdate;
  32. }
  33. }
  34. private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
  35. /**
  36. * get invoker addr list cached for specified invocation
  37. * <p>
  38. * <b>for unit test only</b>
  39. *
  40. * @param invokers
  41. * @param invocation
  42. * @return
  43. */
  44. protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
  45. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  46. Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
  47. if (map != null) {
  48. return map.keySet();
  49. }
  50. return null;
  51. }
  52. @Override
  53. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  54. // 获取url到weightRoundRobin映射表,如果为空,则创建一个新的
  55. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  56. ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
  57. int totalWeight = 0;
  58. long maxCurrent = Long.MIN_VALUE;
  59. long now = System.currentTimeMillis();
  60. Invoker<T> selectedInvoker = null;
  61. WeightedRoundRobin selectedWRR = null;
  62. for (Invoker<T> invoker : invokers) {
  63. String identifyString = invoker.getUrl().toIdentityString();
  64. int weight = getWeight(invoker, invocation);
  65. //检测当前Invoker是否有相应的WeightedRoundRobin,没有则创建
  66. WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
  67. WeightedRoundRobin wrr = new WeightedRoundRobin();
  68. // 设置Invoker 权重
  69. wrr.setWeight(weight);
  70. return wrr;
  71. });
  72. // Invoker 权重不等于weightedRoundRobin中保存的权重,说明权重变化,进行更新
  73. if (weight != weightedRoundRobin.getWeight()) {
  74. //weight changed
  75. weightedRoundRobin.setWeight(weight);
  76. }
  77. // CAS操作 cur = current+weight
  78. long cur = weightedRoundRobin.increaseCurrent();
  79. // 设置 lastUpdate,表示近期更新过
  80. weightedRoundRobin.setLastUpdate(now);
  81. if (cur > maxCurrent) {
  82. maxCurrent = cur;
  83. // 将具有最大current 权重的Invoker赋值给selectedInvoker
  84. selectedInvoker = invoker;
  85. // 将Invoker对应的weightedRoundRobin赋值给selectedWRR,后面用到
  86. selectedWRR = weightedRoundRobin;
  87. }
  88. //计算权重之和
  89. totalWeight += weight;
  90. }
  91. // 如果Invoker个数与ConcurrentMap<String, WeightedRoundRobin>的key个数不一致
  92. if (invokers.size() != map.size()) {
  93. //去除长时间未被更新的节点(60s)
  94. map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
  95. }
  96. if (selectedInvoker != null) {
  97. // CAS操作,将最大权重的Invoker减去权重总和
  98. selectedWRR.sel(totalWeight);
  99. // 返回具有最大current的Invoker
  100. return selectedInvoker;
  101. }
  102. // should not happen here
  103. return invokers.get(0);
  104. }
  105. }

最短响应优先 + 加权随机策略

  1. org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance

主要流程如下:

  1. 从多个服务提供者中选择出调用成功的且响应时间最短的服务提供者,由于满足这样条件的服务提供者有可能有多个。所以当选择出多个服务提供者后要根据他们的权重做分析
  2. 如果只选择出来了一个,直接用选出来这个
  3. 如果真的有多个,看它们的权重是否一样,如果不一样,则走加权随机算法的逻辑
  4. 如果它们的权重是一样的,则随机调用一个

负载均衡实现代码如下:

  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  2. //服务提供者的数量
  3. int length = invokers.size();
  4. //所有服务提供者的估计最短响应时间(只是一个临时变量,在循环中存储当前最短响应时间是多少)
  5. long shortestResponse = Long.MAX_VALUE;
  6. //具有相同最短响应时间的服务提供者个数,初始化为 0
  7. int shortestCount = 0;
  8. //数组里面放的是具有相同最短响应时间的服务提供者的下标
  9. int[] shortestIndexes = new int[length];
  10. //每一个服务提供者的权重
  11. int[] weights = new int[length];
  12. //多个具有相同最短响应时间的服务提供者对应的预热
  13. int totalWeight = 0;
  14. //第一个具有最短响应时间的服务提供者的权重
  15. int firstWeight = 0;
  16. //多个满足条件的提供者的权重是否一致
  17. boolean sameWeight = true;
  18. // Filter out all the shortest response invokers
  19. for (int i = 0; i < length; i++) {
  20. Invoker<T> invoker = invokers.get(i);
  21. RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
  22. //获取调用成功的平均时间
  23. //getSucceededAverageElapsed():调用成功的请求数总数对应的总耗时 / 调用成功的请求数总数
  24. long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
  25. int active = rpcStatus.getActive();
  26. long estimateResponse = succeededAverageElapsed * active;
  27. //获取预热后的权重
  28. int afterWarmup = getWeight(invoker, invocation);
  29. weights[i] = afterWarmup;
  30. // Same as LeastActiveLoadBalance
  31. if (estimateResponse < shortestResponse) {
  32. //如果出现有更短的响应时间的服务提供者,记录更短的响应时间和当前服务者的下标
  33. shortestResponse = estimateResponse;
  34. shortestCount = 1;
  35. shortestIndexes[0] = i;
  36. totalWeight = afterWarmup;
  37. firstWeight = afterWarmup;
  38. sameWeight = true;
  39. } else if (estimateResponse == shortestResponse) {
  40. //如果出现时间一样长的服务提供者,更新shortestIndexes下标,计算总权重,判断权重是否相等
  41. shortestIndexes[shortestCount++] = i;
  42. totalWeight += afterWarmup;
  43. if (sameWeight && i > 0
  44. && afterWarmup != firstWeight) {
  45. sameWeight = false;
  46. }
  47. }
  48. }
  49. if (shortestCount == 1) {
  50. //经过选择后只有一个服务提供者,直接返回这个服务提供者
  51. return invokers.get(shortestIndexes[0]);
  52. }
  53. //加权随机负载均衡的实现
  54. if (!sameWeight && totalWeight > 0) {
  55. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
  56. for (int i = 0; i < shortestCount; i++) {
  57. int shortestIndex = shortestIndexes[i];
  58. offsetWeight -= weights[shortestIndex];
  59. if (offsetWeight < 0) {
  60. return invokers.get(shortestIndex);
  61. }
  62. }
  63. }
  64. //从多个满足条件且权重一样的服务提供者中随机选择一个
  65. return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
  66. }

最少活跃优先 + 加权随机策略

  1. org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance

代码如下:

  1. public class LeastActiveLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "leastactive";
  3. @Override
  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  5. //首先获取服务提供者(invoker)的数量,目前我们的服务提供者大小为3
  6. int length = invokers.size();
  7. //初始化最小活跃次数(minActive)
  8. //活跃次数一定是一个大于等于0的数,所以这里设置为-1就是为了minActive绐替換掉
  9. int leastActive = -1;
  10. // minactive相同的invoker的数量
  11. int leastCount = 0;
  12. // invokers是个集合,leastIndexes的作用是记录minActive相同的invoker下标
  13. int[] leastIndexes = new int[length];
  14. // invokers是个集合,weights记录各个invoker的权重,当前权重是[5,10,15]
  15. int[] weights = new int[length];
  16. // 用于记录所有invoker的权重之和,当前权重是5+10+15;
  17. int totalWeight = 0;
  18. // 记录第一个minActive的invoker的权重
  19. // 在下面的循环中和其他的具有相同minActive的invoker进行对比
  20. // 作用是判断是否所有有相同minActive的invoker的权重都一样
  21. // 例子:A,8两个服务器权重都配置的200,minActive也相等,则随机选择一个
  22. int firstWeight = 0;
  23. // 先假设所有invoker的权重都一样
  24. boolean sameWeight = true;
  25. // Filter out all the least active invokers
  26. for (int i = 0; i < length; i++) {
  27. // 遍历出每个invoker
  28. Invoker<T> invoker = invokers.get(i);
  29. // 读取当前invoker的活跃数
  30. int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
  31. // 获取当前invoker的权重
  32. int afterWarmup = getWeight(invoker, invocation);
  33. // 将invoker的权重存储到weights对应的下标
  34. weights[i] = afterWarmup;
  35. // 第一次循环的时候minActive从leastActive=-1变成了leastActive = active;(当前invoker活跃数)
  36. // 后续循环,找到活跃数更小的进行替换
  37. // leastCount、leastIndexs、totalWeight、firstWeight、sameWeight都重新开始
  38. if (leastActive == -1 || active < leastActive) {
  39. // 替换活跃度更小的值
  40. leastActive = active;
  41. // 最小活跃度的invoker重置为1
  42. leastCount = 1;
  43. // 记录活跃度更小的invoker的下标到leastIndexes
  44. leastIndexes[0] = i;
  45. // Reset totalWeight
  46. totalWeight = afterWarmup;
  47. // Record the weight the first least active invoker
  48. firstWeight = afterWarmup;
  49. // Each invoke has the same weight (only one invoker here)
  50. sameWeight = true;
  51. // If current invoker's active value equals with leaseActive, then accumulating.
  52. } else if (active == leastActive) {
  53. // 当前invoker的活跃数如果和minActive相等,在leastIndexes记录当前下标,leastCount+1
  54. leastIndexes[leastCount++] = i;
  55. // 更新总权重
  56. totalWeight += afterWarmup;
  57. // 判断这个具有相同active的invoker,和之前记录的invoker权重是否相等
  58. if (sameWeight && afterWarmup != firstWeight) {
  59. sameWeight = false;
  60. }
  61. }
  62. }
  63. // 最小活跃数的invoker为1,那么直接取leastIndexes第一个
  64. if (leastCount == 1) {
  65. // If we got exactly one invoker having the least active value, return this invoker directly.
  66. return invokers.get(leastIndexes[0]);
  67. }
  68. // 走到这里肯定是leastCount>1
  69. // 加权随机负载均衡的实现
  70. if (!sameWeight && totalWeight > 0) {
  71. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
  72. for (int i = 0; i < leastCount; i++) {
  73. int leastIndex = leastIndexes[i];
  74. offsetWeight -= weights[leastIndex];
  75. if (offsetWeight < 0) {
  76. return invokers.get(leastIndex);
  77. }
  78. }
  79. }
  80. // 两个及以上权重相同或总权重为0,随机返回一个invoker
  81. return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
  82. }
  83. }

想要模拟最小活跃数,先把生产者里三个服务提供者修改修改不同的权重,provider01、provider02、provider03权重分别设置5、10、15,为了让多个消费者都有一些调用次数,生产者所有接口响应都进行休眠10分钟,并修改接口超时时间,其中以provider03代码示例如下

  1. /**
  2. * 功能描述:
  3. *
  4. * @param:
  5. * @return:
  6. * @auther: Zywoo Lee
  7. * @date: 2021/11/20 3:02 下午
  8. */
  9. @DubboService(weight = 15,timeout = 700000, loadbalance = "leastactive")
  10. public class DemoServiceImpl implements DemoService {
  11. private final Logger logger = LoggerFactory.getLogger(getClass());
  12. @Value("${dubbo.application.name}")
  13. private String serviceName;
  14. @Override
  15. public String sayHello(String name) {
  16. System.out.println("权重为15的服务器接受到请求了:" + name);
  17. try {
  18. TimeUnit.MINUTES.sleep(10);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. return String.format("[%s] : Hello, %s", serviceName, name);
  23. }
  24. }

然后服务消费者进行改造,一共发送21个请求,前20请求分别落在三台服务提供者上,然后最后debug第21个线程,代码如下:

  1. /**
  2. * @Auther: Zywoo Lee
  3. * @Date: 2021/11/20 15:35
  4. * @Description:
  5. */
  6. @RestController
  7. @RequestMapping("/test")
  8. @Api(tags = "Dubbo服务测试类")
  9. public class DemoController {
  10. @DubboReference(loadbalance = "leastactive",filter = "activelimit")
  11. private DemoService demoService;
  12. @ApiOperation(value = "服务调用测试", notes = "")
  13. @GetMapping("sayHello")
  14. public void sayHello() {
  15. for (int i = 0; i < 20; i++) {
  16. int finalI = i;
  17. new Thread(() -> {
  18. demoService.sayHello("ZywooLee-" + finalI);
  19. }).start();
  20. }
  21. try {
  22. TimeUnit.SECONDS.sleep(10);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. demoService.sayHello("ZywooLee-debug");
  27. }
  28. }

active为0的情况

请注意:@DubboReference一定要配置filter = “activelimit”,不然在负载均衡的doSelect()方法会出现active为0的情况。原因在于客户端没有配置ActiveLimitFilter

  1. int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();

测试结果

image.png

image.png

image.png

然后开始打断点,进入第21个请求。最终可以看到请求落向选择最少连接数的提供者

image.png

一致性哈希负载均衡

  1. org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance

因为原代码debug查看hash环生成不方便,所以copy源码加入一些输出打印,并继承AbstractLoadBalance,然后指定自定义负载均衡

  1. package com.dubbo.loadbalance;
  2. import com.alibaba.dubbo.rpc.support.RpcUtils;
  3. import org.apache.dubbo.common.URL;
  4. import org.apache.dubbo.rpc.Invocation;
  5. import org.apache.dubbo.rpc.Invoker;
  6. import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
  7. import org.springframework.stereotype.Component;
  8. import java.nio.charset.StandardCharsets;
  9. import java.security.MessageDigest;
  10. import java.security.NoSuchAlgorithmException;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.TreeMap;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.concurrent.ConcurrentMap;
  16. import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
  17. /**
  18. * @Auther: Zywoo Lee
  19. * @Date: 2021/11/20 18:00
  20. * @Description:
  21. */
  22. public class ZywooLeeConsistentHashLoadBalance extends AbstractLoadBalance {
  23. public static final String NAME = "consistenthash";
  24. /**
  25. * Hash nodes name
  26. */
  27. public static final String HASH_NODES = "hash.nodes";
  28. /**
  29. * Hash arguments name
  30. */
  31. public static final String HASH_ARGUMENTS = "hash.arguments";
  32. private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
  33. @SuppressWarnings("unchecked")
  34. @Override
  35. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  36. String methodName = RpcUtils.getMethodName(invocation);
  37. String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
  38. System.out.println("selectors中获取value的key="+key);
  39. // 获取invokers的hashCode
  40. int invokersHashCode = invokers.hashCode();
  41. ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
  42. // 如果invokers是一个新的List对象,意味着服务提供者数量发生变化,可能新增也可能减少
  43. // 此时selector.identityHashCode != invokersHashCode条件成立
  44. // 如果第一次调用 selector == null成立
  45. if (selector == null || selector.identityHashCode != invokersHashCode) {
  46. System.out.println("新的invokers:"+ invokersHashCode+",原"+(selector==null ? "null" : selector.identityHashCode));
  47. selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
  48. selector = (ConsistentHashSelector<T>) selectors.get(key);
  49. System.out.println("--哈希环构建完成--");
  50. for (Map.Entry<Long, Invoker<T>> entry : selector.virtualInvokers.entrySet()) {
  51. System.out.println("key(哈希值)="+entry.getKey()+", value(虚拟节点)="+entry.getValue());
  52. }
  53. }
  54. System.out.println("---执行select方法选择invoker---");
  55. // 创建新的ConsistentHashSelector的select方法选择invoker
  56. return selector.select(invocation);
  57. }
  58. private static final class ConsistentHashSelector<T> {
  59. // 使用TreeMap存储Invoker的虚拟节点
  60. private final TreeMap<Long, Invoker<T>> virtualInvokers;
  61. // 虚拟节点数
  62. private final int replicaNumber;
  63. // hashcode
  64. private final int identityHashCode;
  65. // 请求中的参数下标
  66. // 需要对请求中对应的下标的参数进行哈希计算
  67. private final int[] argumentIndex;
  68. ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
  69. this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
  70. this.identityHashCode = identityHashCode;
  71. URL url = invokers.get(0).getUrl();
  72. // 即时启动多个invoker,每个invoker对应的url上的虚拟节点数配置的都是一样的
  73. // 这里默认160个,可以在服务提供者配置parameters->hash.nodes,我这边设置为4个便于debug
  74. this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
  75. // 获取参与哈希计算的参数下标值,默认对第一个参数进行哈希计算
  76. String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
  77. argumentIndex = new int[index.length];
  78. // for循环对argumentIndex赋值
  79. for (int i = 0; i < index.length; i++) {
  80. argumentIndex[i] = Integer.parseInt(index[i]);
  81. }
  82. // 遍历服务提供者
  83. for (Invoker<T> invoker : invokers) {
  84. // 获取每个invoker的地址
  85. String address = invoker.getUrl().getAddress();
  86. for (int i = 0; i < replicaNumber / 4; i++) {
  87. // adress+i进行md5运算得到一个长度为16的字节数组
  88. byte[] digest = md5(address + i);
  89. // 对digest部分字节进行4次hash运算得到四个不同的long型正整数
  90. for (int h = 0; h < 4; h++) {
  91. // h==0:digest中下标0~3的4个字节进行位运算
  92. // h==1:digest中下标4~7的4个字节进行位运算
  93. // h==2:digest中下标8~11的4个字节进行位运算
  94. // h==3:digest中下标12~15的4个字节进行位运算
  95. long m = hash(digest, h);
  96. // 将hash到invoker的映射关系存储到virtualInvokers中
  97. // virtualInvokers需要提供高效查询操作,使用TreeMap数据结构
  98. virtualInvokers.put(m, invoker);
  99. }
  100. }
  101. }
  102. }
  103. public Invoker<T> select(Invocation invocation) {
  104. String key = toKey(invocation.getArguments());
  105. byte[] digest = md5(key);
  106. // 取digest数组的前四个字节进行hash运算,再将hash值传给selectForKey()
  107. // 寻找合适的invoker
  108. long hash = hash(digest, 0);
  109. System.out.println("参与hash计算的key:"+"经过hash计算后hash="+hash);
  110. return selectForKey(hash);
  111. }
  112. // argumentIndex转换成key
  113. private String toKey(Object[] args) {
  114. StringBuilder buf = new StringBuilder();
  115. for (int i : argumentIndex) {
  116. if (i >= 0 && i < args.length) {
  117. buf.append(args[i]);
  118. }
  119. }
  120. return buf.toString();
  121. }
  122. private Invoker<T> selectForKey(long hash) {
  123. //到TreeMap中找到第一个节点值大于或等于当前hash的Invoker
  124. Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
  125. //如果hash大于Invoker在圆环上最大位置,此时entry=null
  126. if (entry == null) {
  127. // 将TreeMap头节点赋值给entry
  128. entry = virtualInvokers.firstEntry();
  129. }
  130. System.out.println("根据key计算出hash="+hash+",获取出来的invoker是:"+entry.getValue());
  131. return entry.getValue();
  132. }
  133. private long hash(byte[] digest, int number) {
  134. return (((long) (digest[3 + number * 4] & 0xFF) << 24)
  135. | ((long) (digest[2 + number * 4] & 0xFF) << 16)
  136. | ((long) (digest[1 + number * 4] & 0xFF) << 8)
  137. | (digest[number * 4] & 0xFF))
  138. & 0xFFFFFFFFL;
  139. }
  140. private byte[] md5(String value) {
  141. MessageDigest md5;
  142. try {
  143. md5 = MessageDigest.getInstance("MD5");
  144. } catch (NoSuchAlgorithmException e) {
  145. throw new IllegalStateException(e.getMessage(), e);
  146. }
  147. md5.reset();
  148. byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
  149. md5.update(bytes);
  150. return md5.digest();
  151. }
  152. }
  153. }

测试结果

服务提供者代码如下:
parameters如果不指定的话,默认为160个,在源码里可以看到

  1. @DubboService(timeout = 700000, loadbalance = "zywooLeeConsistentHash",parameters={"hash.nodes","4"})
  2. public class DemoServiceImpl implements DemoService {
  3. private final Logger logger = LoggerFactory.getLogger(getClass());
  4. @Value("${dubbo.application.name}")
  5. private String serviceName;
  6. @Override
  7. public String sayHello(String name) {
  8. return String.format("[%s] : Hello, %s", serviceName, name);
  9. }
  10. }

服务消费者代码如下:

  1. /**
  2. * @Auther: Zywoo Lee
  3. * @Date: 2021/11/20 15:35
  4. * @Description:
  5. */
  6. @RestController
  7. @RequestMapping("/test")
  8. @Api(tags = "Dubbo服务测试类")
  9. public class DemoController {
  10. @DubboReference(loadbalance = "zywooLeeConsistentHash",filter = "activelimit")
  11. private DemoService demoService;
  12. @ApiOperation(value = "服务调用测试", notes = "")
  13. @GetMapping("sayHello")
  14. public void sayHello() {
  15. String name = demoService.sayHello("ZywooLee-debug");
  16. System.out.println(name);
  17. }
  18. }

首先先启动两个服务提供者,调用方法控制台输出如下:

image.png

服务提供者数量不变,第二次调用,可以看到selector不为null,invokers没有变化,自然selector.identityHashCode != invokersHashCode为false,直接进入selector.select(invocation)逻辑。

image.png

第二次调用控制台输出如下:

image.png

然后开启第三个服务提供者,此时invokers自然发生了变化,再次发起服务请求,控制台打印如下:

image.png