IRule是定义了各种负载均衡,我们分析其中几种
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {// 负载均衡器private ILoadBalancer lb;// 省略get/set}
随机选择RandomRule
public class RandomRule extends AbstractLoadBalancerRule {Random rand;public RandomRule() {rand = new Random();}@Overridepublic Server choose(Object key) {return choose(getLoadBalancer(), key);}public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {// 检查中断if (Thread.interrupted()) {return null;}// 可访问的服务List<Server> upList = lb.getReachableServers();// 所有服务List<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {// 没有服务可用return null;}// 生成随机数int index = rand.nextInt(serverCount);server = upList.get(index);if (server == null) {// 该服务不可访问,重新选择Thread.yield();continue;}// 可用if (server.isAlive()) {return (server);}// 重新选择server = null;Thread.yield();}return server;}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {// TODO Auto-generated method stub}}
轮训RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {private AtomicInteger nextServerCyclicCounter;private static final boolean AVAILABLE_ONLY_SERVERS = true;private static final boolean ALL_SERVERS = false;public RoundRobinRule() {// 计数器nextServerCyclicCounter = new AtomicInteger(0);}public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {//List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();// 没有可用的服务if ((upCount == 0) || (serverCount == 0)) {return null;}//int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);// 重新选择if (server == null) {Thread.yield();continue;}// 可用服务if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}// 重试超过10次if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: " + lb);}return server;}// 轮训选择private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();// 保证原子性int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}}@Overridepublic Server choose(Object key) {return choose(getLoadBalancer(), key);}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {}}
响应实现权重分配WeightedResponseTimeRule
public class WeightedResponseTimeRule extends RoundRobinRule {public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {@Overridepublic String key() {return "ServerWeightTaskTimerInterval";}};// 默认权重更新实现间隔public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;// 权重累计private volatile List<Double> accumulatedWeights = new ArrayList<Double>();// 随机工具private final Random random = new Random();// 定时更新权重protected Timer serverWeightTimer = null;// 多线程更新权重时,保证只有一个执行就行protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);String name = "unknown";// 构造方法@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);if (lb instanceof BaseLoadBalancer) {name = ((BaseLoadBalancer) lb).getName();}// 初始化权重定时计算器initialize(lb);}// 初始化权重计算定时器void initialize(ILoadBalancer lb) {if (serverWeightTimer != null) {serverWeightTimer.cancel();}serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);// do a initial run// 一个定时任务的执行逻辑ServerWeight sw = new ServerWeight();sw.maintainWeights();}}
内部类的定时任务
class DynamicServerWeightTask extends TimerTask {
public void run() {
// Runable任务
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
class ServerWeight {
// 定时任务,重算权重列表
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
//
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
// 开始重算
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// 每个服务的权重计数
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
// 重置权重
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
权重保存在一个列表中
// 权重累计 private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
计算方式:
假设服务有4个节点,每个节点的平均响应时间是A(10ms),B(20ms),C(30ms),D(40ms)
totalResponseTime=10+20+30+40 = 100
A权重:wt_A=100-10 = 90
B权重:wt_B=100-20 + wt_A = 100-20+90=170
C权重:wt_C=100-30 + wt_B = 100-30+170=240
D权重:wt_D=100-30 + wt_C = 100-40+240=300
权重区间:A,B,C,D分别是
【[0-90),[90-170),[170-240],[240-300)】
选择服务时,先生成一个300以内的随机数然后对300取模,得到的数在那个区间就调用哪个服务。
accumulatedWeights中存放的是累计权重accumulatedWeights=[90, 170, 240, 300]
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// 最后一个是累计权重之和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 还没有权重规则,回退到轮训算法
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 生成随机数
double randomWeight = random.nextDouble() * maxTotalWeight;
// 找到区间
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
// 找到
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
// 重新找
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
//返回节点
return (server);
}
// Next.
server = null;
}
return server;
}
