IRule是定义了各种负载均衡,我们分析其中几种
image.png

  1. public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
  2. // 负载均衡器
  3. private ILoadBalancer lb;
  4. // 省略get/set
  5. }

随机选择RandomRule

  1. public class RandomRule extends AbstractLoadBalancerRule {
  2. Random rand;
  3. public RandomRule() {
  4. rand = new Random();
  5. }
  6. @Override
  7. public Server choose(Object key) {
  8. return choose(getLoadBalancer(), key);
  9. }
  10. public Server choose(ILoadBalancer lb, Object key) {
  11. if (lb == null) {
  12. return null;
  13. }
  14. Server server = null;
  15. while (server == null) {
  16. // 检查中断
  17. if (Thread.interrupted()) {
  18. return null;
  19. }
  20. // 可访问的服务
  21. List<Server> upList = lb.getReachableServers();
  22. // 所有服务
  23. List<Server> allList = lb.getAllServers();
  24. int serverCount = allList.size();
  25. if (serverCount == 0) {
  26. // 没有服务可用
  27. return null;
  28. }
  29. // 生成随机数
  30. int index = rand.nextInt(serverCount);
  31. server = upList.get(index);
  32. if (server == null) {
  33. // 该服务不可访问,重新选择
  34. Thread.yield();
  35. continue;
  36. }
  37. // 可用
  38. if (server.isAlive()) {
  39. return (server);
  40. }
  41. // 重新选择
  42. server = null;
  43. Thread.yield();
  44. }
  45. return server;
  46. }
  47. @Override
  48. public void initWithNiwsConfig(IClientConfig clientConfig) {
  49. // TODO Auto-generated method stub
  50. }
  51. }

轮训RoundRobinRule

  1. public class RoundRobinRule extends AbstractLoadBalancerRule {
  2. private AtomicInteger nextServerCyclicCounter;
  3. private static final boolean AVAILABLE_ONLY_SERVERS = true;
  4. private static final boolean ALL_SERVERS = false;
  5. public RoundRobinRule() {
  6. // 计数器
  7. nextServerCyclicCounter = new AtomicInteger(0);
  8. }
  9. public Server choose(ILoadBalancer lb, Object key) {
  10. if (lb == null) {
  11. return null;
  12. }
  13. Server server = null;
  14. int count = 0;
  15. while (server == null && count++ < 10) {
  16. //
  17. List<Server> reachableServers = lb.getReachableServers();
  18. List<Server> allServers = lb.getAllServers();
  19. int upCount = reachableServers.size();
  20. int serverCount = allServers.size();
  21. // 没有可用的服务
  22. if ((upCount == 0) || (serverCount == 0)) {
  23. return null;
  24. }
  25. //
  26. int nextServerIndex = incrementAndGetModulo(serverCount);
  27. server = allServers.get(nextServerIndex);
  28. // 重新选择
  29. if (server == null) {
  30. Thread.yield();
  31. continue;
  32. }
  33. // 可用服务
  34. if (server.isAlive() && (server.isReadyToServe())) {
  35. return (server);
  36. }
  37. // Next.
  38. server = null;
  39. }
  40. // 重试超过10次
  41. if (count >= 10) {
  42. log.warn("No available alive servers after 10 tries from load balancer: " + lb);
  43. }
  44. return server;
  45. }
  46. // 轮训选择
  47. private int incrementAndGetModulo(int modulo) {
  48. for (;;) {
  49. int current = nextServerCyclicCounter.get();
  50. // 保证原子性
  51. int next = (current + 1) % modulo;
  52. if (nextServerCyclicCounter.compareAndSet(current, next))
  53. return next;
  54. }
  55. }
  56. @Override
  57. public Server choose(Object key) {
  58. return choose(getLoadBalancer(), key);
  59. }
  60. @Override
  61. public void initWithNiwsConfig(IClientConfig clientConfig) {
  62. }
  63. }

响应实现权重分配WeightedResponseTimeRule

  1. public class WeightedResponseTimeRule extends RoundRobinRule {
  2. public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
  3. @Override
  4. public String key() {
  5. return "ServerWeightTaskTimerInterval";
  6. }
  7. };
  8. // 默认权重更新实现间隔
  9. public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
  10. private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
  11. // 权重累计
  12. private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
  13. // 随机工具
  14. private final Random random = new Random();
  15. // 定时更新权重
  16. protected Timer serverWeightTimer = null;
  17. // 多线程更新权重时,保证只有一个执行就行
  18. protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
  19. String name = "unknown";
  20. // 构造方法
  21. @Override
  22. public void setLoadBalancer(ILoadBalancer lb) {
  23. super.setLoadBalancer(lb);
  24. if (lb instanceof BaseLoadBalancer) {
  25. name = ((BaseLoadBalancer) lb).getName();
  26. }
  27. // 初始化权重定时计算器
  28. initialize(lb);
  29. }
  30. // 初始化权重计算定时器
  31. void initialize(ILoadBalancer lb) {
  32. if (serverWeightTimer != null) {
  33. serverWeightTimer.cancel();
  34. }
  35. serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true);
  36. serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval);
  37. // do a initial run
  38. // 一个定时任务的执行逻辑
  39. ServerWeight sw = new ServerWeight();
  40. sw.maintainWeights();
  41. }
  42. }

内部类的定时任务

class DynamicServerWeightTask extends TimerTask {

    public void run() {
        // Runable任务
        ServerWeight serverWeight = new ServerWeight();
        try {
            serverWeight.maintainWeights();
        } catch (Exception e) {
            logger.error("Error running DynamicServerWeightTask for {}", name, e);
        }
    }
}

class ServerWeight {

    // 定时任务,重算权重列表
    public void maintainWeights() {
        ILoadBalancer lb = getLoadBalancer();
        if (lb == null) {
            return;
        }
        if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
            return; 
        }
        // 
        try {
            logger.info("Weight adjusting job started");
            AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
            LoadBalancerStats stats = nlb.getLoadBalancerStats();
            if (stats == null) {
                // no statistics, nothing to do
                return;
            }
            // 开始重算
            double totalResponseTime = 0;
            // find maximal 95% response time
            for (Server server : nlb.getAllServers()) {
                // this will automatically load the stats if not in cache
                ServerStats ss = stats.getSingleServerStat(server);
                totalResponseTime += ss.getResponseTimeAvg();
            }
            // weight for each server is (sum of responseTime of all servers - responseTime)
            // so that the longer the response time, the less the weight and the less likely to be chosen
            Double weightSoFar = 0.0;

            // 每个服务的权重计数
            List<Double> finalWeights = new ArrayList<Double>();
            for (Server server : nlb.getAllServers()) {
                ServerStats ss = stats.getSingleServerStat(server);
                double weight = totalResponseTime - ss.getResponseTimeAvg();
                weightSoFar += weight;
                finalWeights.add(weightSoFar);   
            }
            // 重置权重
            setWeights(finalWeights);
        } catch (Exception e) {
            logger.error("Error calculating server weights", e);
        } finally {
            serverWeightAssignmentInProgress.set(false);
        }
    }
}

权重保存在一个列表中

// 权重累计
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();

计算方式:
假设服务有4个节点,每个节点的平均响应时间是A(10ms),B(20ms),C(30ms),D(40ms)
totalResponseTime=10+20+30+40 = 100
A权重:wt_A=100-10 = 90
B权重:wt_B=100-20 + wt_A = 100-20+90=170
C权重:wt_C=100-30 + wt_B = 100-30+170=240
D权重:wt_D=100-30 + wt_C = 100-40+240=300
权重区间:A,B,C,D分别是
【[0-90),[90-170),[170-240],[240-300)】
选择服务时,先生成一个300以内的随机数然后对300取模,得到的数在那个区间就调用哪个服务。
accumulatedWeights中存放的是累计权重accumulatedWeights=[90, 170, 240, 300]

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        return null;
    }
    Server server = null;
    while (server == null) {
        List<Double> currentWeights = accumulatedWeights;
        if (Thread.interrupted()) {
            return null;
        }
        List<Server> allList = lb.getAllServers();
        int serverCount = allList.size();
        if (serverCount == 0) {
            return null;
        }

        int serverIndex = 0;
        // 最后一个是累计权重之和
        double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
        // 还没有权重规则,回退到轮训算法
        if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
            server =  super.choose(getLoadBalancer(), key);
            if(server == null) {
                return server;
            }
        } else {
            // 生成随机数
            double randomWeight = random.nextDouble() * maxTotalWeight;
            // 找到区间
            int n = 0;
            for (Double d : currentWeights) {
                if (d >= randomWeight) {
                    // 找到
                    serverIndex = n;
                    break;
                } else {
                    n++;
                }
            }

            server = allList.get(serverIndex);
        }
        if (server == null) {
            // 重新找
            /* Transient. */
            Thread.yield();
            continue;
        }
        if (server.isAlive()) {
            //返回节点
            return (server);
        }
        // Next.
        server = null;
    }
    return server;
}