Dubbo四种负载均衡实现

1、Random LoadBalance(随机均衡算法)

  • 随机,按权重设置随机概率。
  • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    2、RoundRobin LoadBalance(权重轮循均衡算法)

  • 轮循,按公约后的权重设置轮循比率。

  • 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。(针对此种情况,需要降低该服务的权值,以减少对其调用)

    3、LeastAction LoadBalance(最少活跃调用数均衡算法)

  • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

  • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    4、ConsistentHash LoadBalance(一致性Hash均衡算法)

  • 一致性Hash,相同参数的请求总是发到同一提供者。

  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

    源码分析:

    初始接口:LoadBalance

    ```java

@SPI(RandomLoadBalance.NAME) public interface LoadBalance {

  1. /**
  2. * select one invoker in list.
  3. *select方法作用是从invokers选出下一个被调用的invoker
  4. * @param invokers invokers.
  5. * @param url refer url
  6. * @param invocation invocation.
  7. * @return selected invoker.
  8. */
  9. @Adaptive("loadbalance")
  10. <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

<a name="SjsNp"></a>
## 抽象类:AbstractLoadBalance
```java
public abstract class AbstractLoadBalance implements LoadBalance {
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.isEmpty())
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        //获取provider的权重
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            //provider的启动时间戳
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                //计算启动时长
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                //如果启动时间小于预热时间,默认是10min,则重新计算权重
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }
}

TIPS:为什么要预热?
privoder刚启动的字节码肯定不是最优的,JVM需要对字节码进行优化。预热保证了调用的体验,谨防由此引发的调用超时问题。

DUBBO用到的四种负载均衡算法分析:

Random LoadBalance(随机均衡算法)

  • 随机,按权重设置随机概率。
  • 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    public class RandomLoadBalance extends AbstractLoadBalance {
    
      public static final String NAME = "random";
    
      private final Random random = new Random();
    
      @Override
      protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
          //provider 的数量
          int length = invokers.size(); // Number of invokers
          int totalWeight = 0; // The sum of weights
          boolean sameWeight = true; // Every invoker has the same weight?
          for (int i = 0; i < length; i++) {
              int weight = getWeight(invokers.get(i), invocation);
              totalWeight += weight; // Sum
              if (sameWeight && i > 0
                      && weight != getWeight(invokers.get(i - 1), invocation)) {
                  sameWeight = false;
              }
          }
          if (totalWeight > 0 && !sameWeight) {
              // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
              int offset = random.nextInt(totalWeight);
              // Return a invoker based on the random value.
              // 可以理解成:[0,totalWeight)取随机数,看这个随机数(每比较一次,减去响应的权重)
              // 落在了以权重为刻度的数轴哪个区间内,落在那个区间即返回哪个provider
              for (int i = 0; i < length; i++) {
                  offset -= getWeight(invokers.get(i), invocation);
                  if (offset < 0) {
                      return invokers.get(i);
                  }
              }
          }
          // If all invokers have the same weight value or totalWeight=0, return evenly.
          return invokers.get(random.nextInt(length));
      }
    }
    

    RoundRobin LoadBalance(权重轮循均衡算法)

  • 轮循,按公约后的权重设置轮循比率。

  • 存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。(针对此种情况,需要降低该服务的权值,以减少对其调用)

      @Override
      protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
          String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
          int length = invokers.size(); // Number of invokers
          int maxWeight = 0; // The maximum weight
          int minWeight = Integer.MAX_VALUE; // The minimum weight
          final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
          int weightSum = 0;
          //初始化maxWeight,minWeight,weightSum,invokerToWeightMap
          for (int i = 0; i < length; i++) {
              int weight = getWeight(invokers.get(i), invocation);
              maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
              minWeight = Math.min(minWeight, weight); // Choose the minimum weight
              if (weight > 0) {
                  invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                  weightSum += weight;
              }
          }
          // 获取自增调用次数
          AtomicPositiveInteger sequence = sequences.get(key);
          if (sequence == null) {
              sequences.putIfAbsent(key, new AtomicPositiveInteger());
              sequence = sequences.get(key);
          }
          // ?个人理解为当前调用总次数
          int currentSequence = sequence.getAndIncrement();
          //当权重不一样的时候,通过加权轮询获取到invoker,权值越大,则被选中的几率也越大
          if (maxWeight > 0 && minWeight < maxWeight) {
              int mod = currentSequence % weightSum;
              for (int i = 0; i < maxWeight; i++) {
                  //遍历invoker的数量
                  for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                      final Invoker<T> k = each.getKey();
                      //invoker的权重
                      final IntegerWrapper v = each.getValue();
                      if (mod == 0 && v.getValue() > 0) {
                          return k;
                      }
                      if (v.getValue() > 0) {
                          //当前invoker的可调用次数减1
                          v.decrement();
                          mod--;
                      }
                  }
              }
          }
          // Round robin 权重一样的情况下,就取余的方式获取到invoker
          return invokers.get(currentSequence % length);
      }
    
      private static final class IntegerWrapper {
          private int value;
    
          public IntegerWrapper(int value) {
              this.value = value;
          }
    
          public int getValue() {
              return value;
          }
    
          public void setValue(int value) {
              this.value = value;
          }
    
          public void decrement() {
              this.value--;
          }
      }
    

    对于加权轮询,如果不了解算法的话,看起来还是很绕的,可以单独将算法代码拷出来进行分析,如下:

    public static void main(String[] args) throws IOException {
          //默认invoker{"1":"1","2":"2","3":"3","4","4"}
          //循环调用1000次的结果
          for(int i = 0; i < 1000; i ++){
              int mod = i % 10;
              Map<String,IntegerWrapper> invokerToWeightMap = new LinkedHashMap<String, IntegerWrapper>();
              invokerToWeightMap.put("1", new IntegerWrapper(1));
              invokerToWeightMap.put("2", new IntegerWrapper(2));
              invokerToWeightMap.put("3", new IntegerWrapper(3));
              invokerToWeightMap.put("4", new IntegerWrapper(4));
              for (int j = 0; j < 4; j++) {
                  //遍历invoker的数量
                  for (Map.Entry<String, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                      final String k = each.getKey();
                      //invoker的权重
                      final IntegerWrapper v = each.getValue();
                      //通过 (i+1) *  invokerToWeightMap.size轮获取invoker
                      if (mod == 0 && v.getValue() > 0) {
                          System.out.println("服务:"+k);
                          return ;
                      }
                      if (v.getValue() > 0) {
                          //当前invoker的可调用次数减1
                          v.decrement();
                          mod--;
                      }
                  }
              }
          }
      }
    
      private static final class IntegerWrapper {
          private int value;
    
          public IntegerWrapper(int value) {
              this.value = value;
          }
    
          public int getValue() {
              return value;
          }
    
          public void setValue(int value) {
              this.value = value;
          }
    
          public void decrement() {
              this.value--;
          }
      }
    

    LeastAction LoadBalance(最少活跃调用数均衡算法)

  • 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

  • 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    ConsistentHash LoadBalance(一致性Hash均衡算法)

  • 一致性Hash,相同参数的请求总是发到同一提供者。

  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 哈希一致性算法参看另一篇文章:https://blog.csdn.net/piqianming/article/details/79670051

    @SuppressWarnings("unchecked")
      @Override
      protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
          String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
          int identityHashCode = System.identityHashCode(invokers);
          ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
          if (selector == null || selector.identityHashCode != identityHashCode) {
              selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
              selector = (ConsistentHashSelector<T>) selectors.get(key);
          }
          return selector.select(invocation);
      }
    
      private static final class ConsistentHashSelector<T> {
    
          private final TreeMap<Long, Invoker<T>> virtualInvokers;
    
          private final int replicaNumber;
    
          private final int identityHashCode;
    
          private final int[] argumentIndex;
    
          ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
              this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
              this.identityHashCode = identityHashCode;
              URL url = invokers.get(0).getUrl();
              this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
              String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
              argumentIndex = new int[index.length];
              for (int i = 0; i < index.length; i++) {
                  argumentIndex[i] = Integer.parseInt(index[i]);
              }
              for (Invoker<T> invoker : invokers) {
                  String address = invoker.getUrl().getAddress();
                  for (int i = 0; i < replicaNumber / 4; i++) {
                      byte[] digest = md5(address + i);
                      for (int h = 0; h < 4; h++) {
                          long m = hash(digest, h);
                          virtualInvokers.put(m, invoker);
                      }
                  }
              }
          }
    
          public Invoker<T> select(Invocation invocation) {
              String key = toKey(invocation.getArguments());
              byte[] digest = md5(key);
              return selectForKey(hash(digest, 0));
          }
    
          private String toKey(Object[] args) {
              StringBuilder buf = new StringBuilder();
              for (int i : argumentIndex) {
                  if (i >= 0 && i < args.length) {
                      buf.append(args[i]);
                  }
              }
              return buf.toString();
          }
    
          private Invoker<T> selectForKey(long hash) {
              Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
              if (entry == null) {
                  entry = virtualInvokers.firstEntry();
              }
              return entry.getValue();
          }
    
          private long hash(byte[] digest, int number) {
              return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                      | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                      | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                      | (digest[number * 4] & 0xFF))
                      & 0xFFFFFFFFL;
          }
    
          private byte[] md5(String value) {
              MessageDigest md5;
              try {
                  md5 = MessageDigest.getInstance("MD5");
              } catch (NoSuchAlgorithmException e) {
                  throw new IllegalStateException(e.getMessage(), e);
              }
              md5.reset();
              byte[] bytes;
              try {
                  bytes = value.getBytes("UTF-8");
              } catch (UnsupportedEncodingException e) {
                  throw new IllegalStateException(e.getMessage(), e);
              }
              md5.update(bytes);
              return md5.digest();
          }
    
      }