LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费image.png
在 Dubbo 中,所有负载均衡实现类均继承自 AbstractLoadBalance,该类实现了 LoadBalance 接口,并封装了一些公共的逻辑。所以在分析负载均衡实现之前,先来看一下 AbstractLoadBalance 的逻辑。首先来看一下负载均衡的入口方法 select,如下:

  1. @Override
  2. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  3. if (invokers == null || invokers.isEmpty())
  4. return null;
  5. // 如果 invokers 列表中仅有一个 Invoker,直接返回即可,无需进行负载均衡
  6. if (invokers.size() == 1)
  7. return invokers.get(0);
  8. // 调用 doSelect 方法进行负载均衡,该方法为抽象方法,由子类实现
  9. return doSelect(invokers, url, invocation);
  10. }
  11. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

select 方法的逻辑比较简单,首先会检测 invokers 集合的合法性,然后再检测 invokers 集合元素数量。如果只包含一个 Invoker,直接返回该 Inovker 即可。如果包含多个 Invoker,此时需要通过负载均衡算法选择一个 Invoker。具体的负载均衡算法由子类实现,接下来章节会对这些子类一一进行详细分析。
AbstractLoadBalance 除了实现了 LoadBalance 接口方法,还封装了一些公共逻辑,比如服务提供者权重计算逻辑。具体实现如下:

  1. protected int getWeight(Invoker<?> invoker, Invocation invocation) {
  2. // 从 url 中获取权重 weight 配置值
  3. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
  4. if (weight > 0) {
  5. // 获取服务提供者启动时间戳
  6. long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
  7. if (timestamp > 0L) {
  8. // 计算服务提供者运行时长
  9. int uptime = (int) (System.currentTimeMillis() - timestamp);
  10. // 获取服务预热时间,默认为10分钟
  11. int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
  12. // 如果服务运行时间小于预热时间,则重新计算服务权重,即降权
  13. if (uptime > 0 && uptime < warmup) {
  14. // 重新计算服务权重
  15. weight = calculateWarmupWeight(uptime, warmup, weight);
  16. }
  17. }
  18. }
  19. return weight;
  20. }
  21. static int calculateWarmupWeight(int uptime, int warmup, int weight) {
  22. // 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
  23. // 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
  24. int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
  25. return ww < 1 ? 1 : (ww > weight ? weight : ww);
  26. }

1.RandomLoadBalance

RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

  1. public class RandomLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "random";
  3. private final Random random = new Random();
  4. @Override
  5. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  6. int length = invokers.size();
  7. int totalWeight = 0;
  8. boolean sameWeight = true;
  9. // 下面这个循环有两个作用,第一是计算总权重 totalWeight,
  10. // 第二是检测每个服务提供者的权重是否相同
  11. for (int i = 0; i < length; i++) {
  12. int weight = getWeight(invokers.get(i), invocation);
  13. // 累加权重
  14. totalWeight += weight;
  15. // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
  16. // 不相同的话,则将 sameWeight 置为 false。
  17. if (sameWeight && i > 0
  18. && weight != getWeight(invokers.get(i - 1), invocation)) {
  19. sameWeight = false;
  20. }
  21. }
  22. // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
  23. if (totalWeight > 0 && !sameWeight) {
  24. // 随机获取一个 [0, totalWeight) 区间内的数字
  25. int offset = random.nextInt(totalWeight);
  26. // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
  27. // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
  28. // 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
  29. // 表明其不会落在服务器 A 对应的区间上。
  30. // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
  31. // 表明其会落在服务器 B 对应的区间上
  32. for (int i = 0; i < length; i++) {
  33. // 让随机值 offset 减去权重值
  34. offset -= getWeight(invokers.get(i), invocation);
  35. if (offset < 0) {
  36. // 返回相应的 Invoker
  37. return invokers.get(i);
  38. }
  39. }
  40. }
  41. // 如果所有服务提供者权重值相同,此时直接随机返回一个即可
  42. return invokers.get(random.nextInt(length));
  43. }
  44. }

2.LeastActiveLoadBalance

LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

  1. public class LeastActiveLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "leastactive";
  3. private final Random random = new Random();
  4. @Override
  5. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  6. int length = invokers.size();
  7. // 最小的活跃数
  8. int leastActive = -1;
  9. // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
  10. int leastCount = 0;
  11. // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
  12. int[] leastIndexs = new int[length];
  13. int totalWeight = 0;
  14. // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
  15. // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
  16. int firstWeight = 0;
  17. boolean sameWeight = true;
  18. // 遍历 invokers 列表
  19. for (int i = 0; i < length; i++) {
  20. Invoker<T> invoker = invokers.get(i);
  21. // 获取 Invoker 对应的活跃数
  22. int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
  23. // 获取权重 - ⭐️
  24. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
  25. // 发现更小的活跃数,重新开始
  26. if (leastActive == -1 || active < leastActive) {
  27. // 使用当前活跃数 active 更新最小活跃数 leastActive
  28. leastActive = active;
  29. // 更新 leastCount 为 1
  30. leastCount = 1;
  31. // 记录当前下标值到 leastIndexs 中
  32. leastIndexs[0] = i;
  33. totalWeight = weight;
  34. firstWeight = weight;
  35. sameWeight = true;
  36. // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
  37. } else if (active == leastActive) {
  38. // 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
  39. leastIndexs[leastCount++] = i;
  40. // 累加权重
  41. totalWeight += weight;
  42. // 检测当前 Invoker 的权重与 firstWeight 是否相等,
  43. // 不相等则将 sameWeight 置为 false
  44. if (sameWeight && i > 0
  45. && weight != firstWeight) {
  46. sameWeight = false;
  47. }
  48. }
  49. }
  50. // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
  51. if (leastCount == 1) {
  52. return invokers.get(leastIndexs[0]);
  53. }
  54. // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
  55. if (!sameWeight && totalWeight > 0) {
  56. // 随机生成一个 [0, totalWeight) 之间的数字
  57. int offsetWeight = random.nextInt(totalWeight);
  58. // 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
  59. // 当 offset 小于等于0时,返回相应的 Invoker
  60. for (int i = 0; i < leastCount; i++) {
  61. int leastIndex = leastIndexs[i];
  62. // 获取权重值,并让随机数减去权重值 - ⭐️
  63. offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
  64. if (offsetWeight < 0)
  65. return invokers.get(leastIndex);
  66. }
  67. }
  68. // 如果权重相同或权重为0时,随机返回一个 Invoker
  69. return invokers.get(leastIndexs[random.nextInt(leastCount)]);
  70. }
  71. }

3.ConsistentHashLoadBalance

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

  1. public class ConsistentHashLoadBalance extends AbstractLoadBalance {
  2. private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors =
  3. new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
  4. @Override
  5. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  6. String methodName = RpcUtils.getMethodName(invocation);
  7. String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
  8. // 获取 invokers 原始的 hashcode
  9. int identityHashCode = System.identityHashCode(invokers);
  10. ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
  11. // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
  12. // 此时 selector.identityHashCode != identityHashCode 条件成立
  13. if (selector == null || selector.identityHashCode != identityHashCode) {
  14. // 创建新的 ConsistentHashSelector
  15. selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
  16. selector = (ConsistentHashSelector<T>) selectors.get(key);
  17. }
  18. // 调用 ConsistentHashSelector 的 select 方法选择 Invoker
  19. return selector.select(invocation);
  20. }
  21. private static final class ConsistentHashSelector<T> {...}
  22. }

如上,doSelect 方法主要做了一些前置工作,比如检测 invokers 列表是不是变动过,以及创建 ConsistentHashSelector。这些工作做完后,接下来开始调用 ConsistentHashSelector 的 select 方法执行负载均衡逻辑。在分析 select 方法之前,我们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

  1. private static final class ConsistentHashSelector<T> {
  2. // 使用 TreeMap 存储 Invoker 虚拟节点
  3. private final TreeMap<Long, Invoker<T>> virtualInvokers;
  4. private final int replicaNumber;
  5. private final int identityHashCode;
  6. private final int[] argumentIndex;
  7. ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
  8. this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
  9. this.identityHashCode = identityHashCode;
  10. URL url = invokers.get(0).getUrl();
  11. // 获取虚拟节点数,默认为160
  12. this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
  13. // 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
  14. String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
  15. argumentIndex = new int[index.length];
  16. for (int i = 0; i < index.length; i++) {
  17. argumentIndex[i] = Integer.parseInt(index[i]);
  18. }
  19. for (Invoker<T> invoker : invokers) {
  20. String address = invoker.getUrl().getAddress();
  21. for (int i = 0; i < replicaNumber / 4; i++) {
  22. // 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
  23. byte[] digest = md5(address + i);
  24. // 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数
  25. for (int h = 0; h < 4; h++) {
  26. // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算
  27. // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
  28. // h = 2, h = 3 时过程同上
  29. long m = hash(digest, h);
  30. // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中,
  31. // virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构
  32. virtualInvokers.put(m, invoker);
  33. }
  34. }
  35. }
  36. }
  37. }

ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比如从配置中获取虚拟节点数以及参与 hash 计算的参数下标,默认情况下只使用第一个参数进行 hash。需要特别说明的是,ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。ConsistentHashLoadBalance 不 关系权重,因此使用时需要注意一下。
在获取虚拟节点数和参数下标配置后,接下来要做的事情是计算虚拟节点 hash 值,并将虚拟节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就完成了。接下来,我们来看看 select 方法的逻辑。

  1. public Invoker<T> select(Invocation invocation) {
  2. // 将参数转为 key
  3. String key = toKey(invocation.getArguments());
  4. // 对参数 key 进行 md5 运算
  5. byte[] digest = md5(key);
  6. // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
  7. // 寻找合适的 Invoker
  8. return selectForKey(hash(digest, 0));
  9. }
  10. private Invoker<T> selectForKey(long hash) {
  11. // 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
  12. Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
  13. // 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,
  14. // 需要将 TreeMap 的头节点赋值给 entry
  15. if (entry == null) {
  16. entry = virtualInvokers.firstEntry();
  17. }
  18. // 返回 Invoker
  19. return entry.getValue();
  20. }

4.RoundRobinLoadBalance

我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求
为了使每个服务器都能均摊请可以使用平滑加权轮询

4.1平滑加权轮询

服务实例 权重值
192.168.10.1:2202 1
192.168.10.3:2202 1
192.168.10.3:2202 1

算法描述
假设有 N 台实例 S = {S1, S2, …, Sn},配置权重 W = {W1, W2, …, Wn},有效权重 CW = {CW1, CW2, …, CWn}。每个实例 i 除了存在一个配置权重 Wi 外,还存在一个当前有效权重 CWi,且 CWi 初始化为 Wi;指示变量 currentPos 表示当前选择的实例 ID,初始化为 -1;所有实例的配置权重和为 weightSum;

那么,调度算法可以描述为:
1、初始每个实例 i 的 当前有效权重 CWi 为 配置权重 Wi,并求得配置权重和 weightSum;
2、选出 当前有效权重最大 的实例,将 当前有效权重 CWi 减去所有实例的 权重和 weightSum,且变量 currentPos 指向此位置;
3、将每个实例 i 的 当前有效权重 CWi 都加上 配置权重 Wi;
4、取到变量 currentPos 指向的实例;
5、每次调度重复上述步骤 2、3、4;

请求 选中前的当前权重 currentPos 选中的实例 选中后的当前权重
1 {5, 1, 1} 0 192.168.10.1:2202 {-2, 1, 1}
2 {3, 2, 2} 0 192.168.10.1:2202 {-4, 2, 2}
3 {1, 3, 3} 1 192.168.10.2:2202 {1, -4, 3}
4 {6, -3, 4} 0 192.168.10.1:2202 {-1, -3, 4}
5 {4, -2, 5} 2 192.168.10.3:2202 {4, -2, -2}
6 {9, -1, -1} 0 192.168.10.1:2202 {2, -1, -1}
7 {7, 0, 0} 0 192.168.10.1:2202 {0, 0, 0}
8 {5, 1, 1} 0 192.168.10.1:2202 {-2, 1, 1}
  1. public class RoundRobinLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "roundrobin";
  3. private static int RECYCLE_PERIOD = 60000;
  4. protected static class WeightedRoundRobin {
  5. // 服务提供者权重
  6. private int weight;
  7. // 当前权重
  8. private AtomicLong current = new AtomicLong(0);
  9. // 最后一次更新时间
  10. private long lastUpdate;
  11. public void setWeight(int weight) {
  12. this.weight = weight;
  13. // 初始情况下,current = 0
  14. current.set(0);
  15. }
  16. public long increaseCurrent() {
  17. // current = current + weight;
  18. return current.addAndGet(weight);
  19. }
  20. public void sel(int total) {
  21. // current = current - total;
  22. current.addAndGet(-1 * total);
  23. }
  24. }
  25. // 嵌套 Map 结构,存储的数据结构示例如下:
  26. // {
  27. // "UserService.query": {
  28. // "url1": WeightedRoundRobin@123,
  29. // "url2": WeightedRoundRobin@456,
  30. // },
  31. // "UserService.update": {
  32. // "url1": WeightedRoundRobin@123,
  33. // "url2": WeightedRoundRobin@456,
  34. // }
  35. // }
  36. // 最外层为服务类名 + 方法名,第二层为 url 到 WeightedRoundRobin 的映射关系。
  37. // 这里我们可以将 url 看成是服务提供者的 id
  38. private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
  39. // 原子更新锁
  40. private AtomicBoolean updateLock = new AtomicBoolean();
  41. @Override
  42. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  43. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  44. // 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创建一个新的
  45. ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
  46. int totalWeight = 0;
  47. long maxCurrent = Long.MIN_VALUE;
  48. // 获取当前时间
  49. long now = System.currentTimeMillis();
  50. Invoker<T> selectedInvoker = null;
  51. WeightedRoundRobin selectedWRR = null;
  52. // 下面这个循环主要做了这样几件事情:
  53. // 1. 遍历 Invoker 列表,检测当前 Invoker 是否有
  54. // 相应的 WeightedRoundRobin,没有则创建
  55. // 2. 检测 Invoker 权重是否发生了变化,若变化了,
  56. // 则更新 WeightedRoundRobin 的 weight 字段
  57. // 3. 让 current 字段加上自身权重,等价于 current += weight
  58. // 4. 设置 lastUpdate 字段,即 lastUpdate = now
  59. // 5. 寻找具有最大 current 的 Invoker,以及 Invoker 对应的 WeightedRoundRobin,
  60. // 暂存起来,留作后用
  61. // 6. 计算权重总和
  62. for (Invoker<T> invoker : invokers) {
  63. String identifyString = invoker.getUrl().toIdentityString();
  64. int weight = getWeight(invoker, invocation);
  65. WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
  66. WeightedRoundRobin wrr = new WeightedRoundRobin();
  67. wrr.setWeight(weight);
  68. return wrr;
  69. });
  70. // 检测当前 Invoker 是否有对应的 WeightedRoundRobin,没有则创建
  71. if (weight != weightedRoundRobin.getWeight()) {
  72. // 设置 Invoker 权重
  73. weightedRoundRobin.setWeight(weight);
  74. }
  75. // 让 current 加上自身权重,等价于 current += weight
  76. long cur = weightedRoundRobin.increaseCurrent();
  77. // 设置 lastUpdate,表示近期更新过
  78. weightedRoundRobin.setLastUpdate(now);
  79. // 找出最大的 current
  80. if (cur > maxCurrent) {
  81. maxCurrent = cur;
  82. // 将具有最大 current 权重的 Invoker 赋值给 selectedInvoker
  83. selectedInvoker = invoker;
  84. // 将 Invoker 对应的 weightedRoundRobin 赋值给 selectedWRR,留作后用
  85. selectedWRR = weightedRoundRobin;
  86. }
  87. // 计算权重总和
  88. totalWeight += weight;
  89. }
  90. // 对 <identifyString, WeightedRoundRobin> 进行检查,过滤掉长时间未被更新的节点。
  91. // 该节点可能挂了,invokers 中不包含该节点,所以该节点的 lastUpdate 长时间无法被更新。
  92. // 若未更新时长超过阈值后,就会被移除掉,默认阈值为60秒
  93. if (invokers.size() != map.size()) {
  94. map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
  95. }
  96. if (selectedInvoker != null) {
  97. // 让 current 减去权重总和,等价于 current -= totalWeight
  98. selectedWRR.sel(totalWeight);
  99. // 返回具有最大 current 的 Invoker
  100. return selectedInvoker;
  101. }
  102. // should not happen here
  103. return invokers.get(0);
  104. }
  105. }