Dubbo四种负载均衡实现
1、Random LoadBalance(随机均衡算法)
- 随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
2、RoundRobin LoadBalance(权重轮循均衡算法)
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。(针对此种情况,需要降低该服务的权值,以减少对其调用)
3、LeastAction LoadBalance(最少活跃调用数均衡算法)
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
4、ConsistentHash LoadBalance(一致性Hash均衡算法)
一致性Hash,相同参数的请求总是发到同一提供者。
- 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
源码分析:
初始接口:LoadBalance
```java
@SPI(RandomLoadBalance.NAME) public interface LoadBalance {
/*** select one invoker in list.*select方法作用是从invokers选出下一个被调用的invoker* @param invokers invokers.* @param url refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
<a name="SjsNp"></a>
## 抽象类:AbstractLoadBalance
```java
public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
//获取provider的权重
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//provider的启动时间戳
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//计算启动时长
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
//如果启动时间小于预热时间,默认是10min,则重新计算权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
TIPS:为什么要预热?
privoder刚启动的字节码肯定不是最优的,JVM需要对字节码进行优化。预热保证了调用的体验,谨防由此引发的调用超时问题。
DUBBO用到的四种负载均衡算法分析:
Random LoadBalance(随机均衡算法)
- 随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //provider 的数量 int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. // 可以理解成:[0,totalWeight)取随机数,看这个随机数(每比较一次,减去响应的权重) // 落在了以权重为刻度的数轴哪个区间内,落在那个区间即返回哪个provider for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); } }RoundRobin LoadBalance(权重轮循均衡算法)
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。(针对此种情况,需要降低该服务的权值,以减少对其调用)
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; //初始化maxWeight,minWeight,weightSum,invokerToWeightMap for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } // 获取自增调用次数 AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } // ?个人理解为当前调用总次数 int currentSequence = sequence.getAndIncrement(); //当权重不一样的时候,通过加权轮询获取到invoker,权值越大,则被选中的几率也越大 if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { //遍历invoker的数量 for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); //invoker的权重 final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { //当前invoker的可调用次数减1 v.decrement(); mod--; } } } } // Round robin 权重一样的情况下,就取余的方式获取到invoker return invokers.get(currentSequence % length); } private static final class IntegerWrapper { private int value; public IntegerWrapper(int value) { this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } public void decrement() { this.value--; } }对于加权轮询,如果不了解算法的话,看起来还是很绕的,可以单独将算法代码拷出来进行分析,如下:
public static void main(String[] args) throws IOException { //默认invoker{"1":"1","2":"2","3":"3","4","4"} //循环调用1000次的结果 for(int i = 0; i < 1000; i ++){ int mod = i % 10; Map<String,IntegerWrapper> invokerToWeightMap = new LinkedHashMap<String, IntegerWrapper>(); invokerToWeightMap.put("1", new IntegerWrapper(1)); invokerToWeightMap.put("2", new IntegerWrapper(2)); invokerToWeightMap.put("3", new IntegerWrapper(3)); invokerToWeightMap.put("4", new IntegerWrapper(4)); for (int j = 0; j < 4; j++) { //遍历invoker的数量 for (Map.Entry<String, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final String k = each.getKey(); //invoker的权重 final IntegerWrapper v = each.getValue(); //通过 (i+1) * invokerToWeightMap.size轮获取invoker if (mod == 0 && v.getValue() > 0) { System.out.println("服务:"+k); return ; } if (v.getValue() > 0) { //当前invoker的可调用次数减1 v.decrement(); mod--; } } } } } private static final class IntegerWrapper { private int value; public IntegerWrapper(int value) { this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } public void decrement() { this.value--; } }LeastAction LoadBalance(最少活跃调用数均衡算法)
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
ConsistentHash LoadBalance(一致性Hash均衡算法)
一致性Hash,相同参数的请求总是发到同一提供者。
- 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
哈希一致性算法参看另一篇文章:https://blog.csdn.net/piqianming/article/details/79670051
@SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker<T> selectForKey(long hash) { Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } }
