IRule是定义了各种负载均衡,我们分析其中几种
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
// 负载均衡器
private ILoadBalancer lb;
// 省略get/set
}
随机选择RandomRule
public class RandomRule extends AbstractLoadBalancerRule {
Random rand;
public RandomRule() {
rand = new Random();
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// 检查中断
if (Thread.interrupted()) {
return null;
}
// 可访问的服务
List<Server> upList = lb.getReachableServers();
// 所有服务
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
// 没有服务可用
return null;
}
// 生成随机数
int index = rand.nextInt(serverCount);
server = upList.get(index);
if (server == null) {
// 该服务不可访问,重新选择
Thread.yield();
continue;
}
// 可用
if (server.isAlive()) {
return (server);
}
// 重新选择
server = null;
Thread.yield();
}
return server;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
// TODO Auto-generated method stub
}
}
轮训RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
public RoundRobinRule() {
// 计数器
nextServerCyclicCounter = new AtomicInteger(0);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
//
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
// 没有可用的服务
if ((upCount == 0) || (serverCount == 0)) {
return null;
}
//
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
// 重新选择
if (server == null) {
Thread.yield();
continue;
}
// 可用服务
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
// 重试超过10次
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: " + lb);
}
return server;
}
// 轮训选择
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
// 保证原子性
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
响应实现权重分配WeightedResponseTimeRule
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
@Override
public String key() {
return "ServerWeightTaskTimerInterval";
}
};
// 默认权重更新实现间隔
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
// 权重累计
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
// 随机工具
private final Random random = new Random();
// 定时更新权重
protected Timer serverWeightTimer = null;
// 多线程更新权重时,保证只有一个执行就行
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
// 构造方法
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
// 初始化权重定时计算器
initialize(lb);
}
// 初始化权重计算定时器
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);
// do a initial run
// 一个定时任务的执行逻辑
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
}
}
内部类的定时任务
class DynamicServerWeightTask extends TimerTask {
public void run() {
// Runable任务
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
class ServerWeight {
// 定时任务,重算权重列表
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
//
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
// 开始重算
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// 每个服务的权重计数
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
// 重置权重
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
权重保存在一个列表中
// 权重累计 private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
计算方式:
假设服务有4个节点,每个节点的平均响应时间是A(10ms),B(20ms),C(30ms),D(40ms)
totalResponseTime=10+20+30+40 = 100
A权重:wt_A=100-10 = 90
B权重:wt_B=100-20 + wt_A = 100-20+90=170
C权重:wt_C=100-30 + wt_B = 100-30+170=240
D权重:wt_D=100-30 + wt_C = 100-40+240=300
权重区间:A,B,C,D分别是
【[0-90),[90-170),[170-240],[240-300)】
选择服务时,先生成一个300以内的随机数然后对300取模,得到的数在那个区间就调用哪个服务。
accumulatedWeights中存放的是累计权重accumulatedWeights=[90, 170, 240, 300]
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// 最后一个是累计权重之和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 还没有权重规则,回退到轮训算法
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 生成随机数
double randomWeight = random.nextDouble() * maxTotalWeight;
// 找到区间
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
// 找到
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
// 重新找
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
//返回节点
return (server);
}
// Next.
server = null;
}
return server;
}