示意图

11:Ribbon实现负载均衡 - 图1

增加db数据库

image.png

增加服务提供者

springcloud-provider-dept-8082
springcloud-provider-dept-8083
通过springcloud父工程创建服务提供者子工程
image.pngimage.png
保持和springcloud-provider-dept-8081 一致,只修改yml文件中的数据库链接

  1. #spring配置
  2. spring:
  3. application:
  4. name: springcloud-provider-dept #三个服务一致
  5. datasource:
  6. # 数据库名称
  7. type: com.alibaba.druid.pool.DruidDataSource
  8. driver-class-name: com.mysql.jdbc.Driver
  9. url: jdbc:mysql://localhost:3306/db03?useUncode=true&characterEncoding=utf8&verifyServerCertificate=false&useSSL=false&allowMultiQueries=true
  10. username: root
  11. password: root

启动eureka集群

image.png

启动服务提供者

image.png
同一个微服务下有俩个实例
image.png

启动服务消费者

image.png

测试Ribbon负载均衡

第一次访问,调用db01数据库
image.png
第二次访问,调用db01数据库
image.png
Ribbon默认轮询调用所以每次调用得到数据库不一样
ribbon有7种负载均衡策略可供选择

策略类 命名 描述
RandomRule 随机策略 随机选择server
RoundRobinRule 轮询策略 按照顺序选择server(ribbon默认策略)
RetryRule 重试策略 在一个配置时间段内,当选择server不成功,则一直尝试选择一个可用的server
BestAvailableRule 最低并发策略 逐个考察server,如果server断路器打开,则忽略,再选择其中并发链接最低的server
AvailabilityFilteringRule 可用过滤策略 过滤掉一直失败并被标记为circuit tripped的server,过滤掉那些高并发链接的server(active connections超过配置的阈值)
ResponseTimeWeightedRule 响应时间加权重策略 根据server的响应时间分配权重,响应时间越长,权重越低,被选择到的概率也就越低。响应时间越短,权重越高,被选中的概率越高,这个策略很贴切,综合了各种因素,比如:网络,磁盘,io等,都直接影响响应时间
ZoneAvoidanceRule 区域权重策略 综合判断server所在区域的性能,和server的可用性,轮询选择server并且判断一个AWS Zone的运行性能是否可用,剔除不可用的Zone中的所有server

自定义负载均衡算法

IRul实现

负载均衡的核心实现,IRul路由网关,项目网关
image.png
IRule接口有许多实现类
11:Ribbon实现负载均衡 - 图12

1、AbstractLoadBalancerRule

AbstractLoadBalancerRule类是负载均衡策略IRule的抽象实现类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息来作为分配依据,并依次设计一些算法来针对特定场景的高级策略。

  1. public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
  2. private ILoadBalancer lb;
  3. @Override
  4. public void setLoadBalancer(ILoadBalancer lb){
  5. this.lb = lb;
  6. }
  7. @Override
  8. public ILoadBalancer getLoadBalancer(){
  9. return lb;
  10. }
  11. }

2、RandomRule

随机
该策略实现了从服务实例清单中随机选择一个服务实例的功能。从下面的源码可以看到,该实现类的choose方法传入了一个负载均衡器,并且使用负载均衡器获取对应的可用服务列表和全部服务列表,并通过chooseRandomInt方法获取一个随机数,该随机数作为可用服务列表的索引来获取具体的实例。这里有个问题,选择服务实例时使用的是while获取,正常情况下,每次选择都应该能选择一个实例进行返回,但是如果出现异常导致每一次都获取步到可用的实例,那么如果出现死循环而获取不到服务实例时,则很有可能存在并发的BUG。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. return null;
  4. }
  5. Server server = null;
  6. while (server == null) {
  7. if (Thread.interrupted()) {
  8. return null;
  9. }
  10. List<Server> upList = lb.getReachableServers();
  11. List<Server> allList = lb.getAllServers();
  12. int serverCount = allList.size();
  13. if (serverCount == 0) {
  14. /*
  15. * No servers. End regardless of pass, because subsequent passes
  16. * only get more restrictive.
  17. */
  18. return null;
  19. }
  20. int index = chooseRandomInt(serverCount);
  21. server = upList.get(index);
  22. if (server == null) {
  23. /*
  24. * The only time this should happen is if the server list were
  25. * somehow trimmed. This is a transient condition. Retry after
  26. * yielding.
  27. */
  28. Thread.yield();
  29. continue;
  30. }
  31. if (server.isAlive()) {
  32. return (server);
  33. }
  34. // Shouldn't actually happen.. but must be transient or a bug.
  35. server = null;
  36. Thread.yield();
  37. }
  38. return server;
  39. }

3、RoundRobinRule

轮询
该策略实现了按照轮询的方式依次选择每个服务实例的功能。该实现和上述的RandomRule类似,只是获取逻辑不同,该负载均衡策略实现逻辑是直接获取下一个可用实例,如果超过10次没有获取到可用的实例,则返回空且打印异常信息。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. log.warn("no load balancer");
  4. return null;
  5. }
  6. Server server = null;
  7. int count = 0;
  8. while (server == null && count++ < 10) {
  9. List<Server> reachableServers = lb.getReachableServers();
  10. List<Server> allServers = lb.getAllServers();
  11. int upCount = reachableServers.size();
  12. int serverCount = allServers.size();
  13. if ((upCount == 0) || (serverCount == 0)) {
  14. log.warn("No up servers available from load balancer: " + lb);
  15. return null;
  16. }
  17. int nextServerIndex = incrementAndGetModulo(serverCount);
  18. server = allServers.get(nextServerIndex);
  19. if (server == null) {
  20. /* Transient. */
  21. Thread.yield();
  22. continue;
  23. }
  24. if (server.isAlive() && (server.isReadyToServe())) {
  25. return (server);
  26. }
  27. // Next.
  28. server = null;
  29. }
  30. if (count >= 10) {
  31. log.warn("No available alive servers after 10 tries from load balancer: "
  32. + lb);
  33. }
  34. return server;
  35. }

4、RetryRule

会先按照轮询获取服务,如果服务失败,则会选择指定时间内选择重试
该策略实现了一个具备重试机制的实力选择功能。重下述源码可以看出,其选择服务实例使用的是轮询选择策略RoundRobinRule,然后在获取不到服务实例的情况下,则反复尝试获取,直到调用时间超过设置的阈值,则返回空。

  1. IRule subRule = new RoundRobinRule();
  2. long maxRetryMillis = 500;
  3. public Server choose(ILoadBalancer lb, Object key) {
  4. long requestTime = System.currentTimeMillis();
  5. long deadline = requestTime + maxRetryMillis;
  6. Server answer = null;
  7. answer = subRule.choose(key);
  8. if (((answer == null) || (!answer.isAlive()))
  9. && (System.currentTimeMillis() < deadline)) {
  10. InterruptTask task = new InterruptTask(deadline
  11. - System.currentTimeMillis());
  12. while (!Thread.interrupted()) {
  13. answer = subRule.choose(key);
  14. if (((answer == null) || (!answer.isAlive()))
  15. && (System.currentTimeMillis() < deadline)) {
  16. /* pause and retry hoping it's transient */
  17. Thread.yield();
  18. } else {
  19. break;
  20. }
  21. }
  22. task.cancel();
  23. }
  24. if ((answer == null) || (!answer.isAlive())) {
  25. return null;
  26. } else {
  27. return answer;
  28. }
  29. }


5、WeightedResponseTimeRule

该策略继承自RoundRobinRule,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,以达到更优的分配效果,其核心内容分为三块:定时任务、权重计算、实例选择

(1)定时任务

  1. @Override
  2. public void setLoadBalancer(ILoadBalancer lb) {
  3. super.setLoadBalancer(lb);
  4. if (lb instanceof BaseLoadBalancer) {
  5. name = ((BaseLoadBalancer) lb).getName();
  6. }
  7. initialize(lb);
  8. }
  9. public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
  10. private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
  11. void initialize(ILoadBalancer lb) {
  12. if (serverWeightTimer != null) {
  13. serverWeightTimer.cancel();
  14. }
  15. serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
  16. + name, true);
  17. serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
  18. serverWeightTaskTimerInterval);
  19. // do a initial run
  20. ServerWeight sw = new ServerWeight();
  21. sw.maintainWeights();
  22. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  23. public void run() {
  24. logger
  25. .info("Stopping NFLoadBalancer-serverWeightTimer-"
  26. + name);
  27. serverWeightTimer.cancel();
  28. }
  29. }));
  30. }
  31. class DynamicServerWeightTask extends TimerTask {
  32. public void run() {
  33. ServerWeight serverWeight = new ServerWeight();
  34. try {
  35. serverWeight.maintainWeights();
  36. } catch (Exception e) {
  37. logger.error("Error running DynamicServerWeightTask for {}", name, e);
  38. }
  39. }
  40. }

从上述源码可见,在设置负载均衡策略对应的负载均衡器时,调用了initialize方法,而该方法创建了一个定时任务来计算权重(最终调用的serverWeight.maintainWeights()方法),每30秒执行一次。

(2)权重计算

  1. private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
  2. class ServerWeight {
  3. public void maintainWeights() {
  4. ILoadBalancer lb = getLoadBalancer();
  5. if (lb == null) {
  6. return;
  7. }
  8. if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
  9. return;
  10. }
  11. try {
  12. logger.info("Weight adjusting job started");
  13. AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
  14. LoadBalancerStats stats = nlb.getLoadBalancerStats();
  15. if (stats == null) {
  16. // no statistics, nothing to do
  17. return;
  18. }
  19. double totalResponseTime = 0;
  20. // find maximal 95% response time
  21. for (Server server : nlb.getAllServers()) {
  22. // this will automatically load the stats if not in cache
  23. ServerStats ss = stats.getSingleServerStat(server);
  24. totalResponseTime += ss.getResponseTimeAvg();
  25. }
  26. // weight for each server is (sum of responseTime of all servers - responseTime)
  27. // so that the longer the response time, the less the weight and the less likely to be chosen
  28. Double weightSoFar = 0.0;
  29. // create new list and hot swap the reference
  30. List<Double> finalWeights = new ArrayList<Double>();
  31. for (Server server : nlb.getAllServers()) {
  32. ServerStats ss = stats.getSingleServerStat(server);
  33. double weight = totalResponseTime - ss.getResponseTimeAvg();
  34. weightSoFar += weight;
  35. finalWeights.add(weightSoFar);
  36. }
  37. setWeights(finalWeights);
  38. } catch (Exception e) {
  39. logger.error("Error calculating server weights", e);
  40. } finally {
  41. serverWeightAssignmentInProgress.set(false);
  42. }
  43. }
  44. }
  45. void setWeights(List<Double> weights) {
  46. this.accumulatedWeights = weights;
  47. }

通过源码可见,代码中维护一个用于存储权重的List集合accumulatedWeights,同时,通过maintainWeights方法做了权重计算,该计算主要分为两步,第一步,根据LoadBalancerStatus中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总的响应时间totalResponseTime;第二步,为负载均衡器中维护的实例清单逐个计算权重(从第一个开始),计算规则为weightSoFar+totalResponseTime-实例的平均响应时间,其中weightSoFar的初始值为0。
举个例子,如果有ABCD4个实例,他们的平均响应时间是10、40、80、100,那么总的相应时间就是230,那么计算出4个实例的权重分别为:
A:230-10 = 220
B:220+(230-40) = 410
C:410+(230-80) = 560
D:560+(230-100) = 690
权重区间是左开右闭,但是第一个和最后一个比较特殊,由于在后续选择实例时会用随机数从区间中获取,但是随机数最小值可以是0,但是不会到达随机数的最大值,因此第一个左边的0是闭区间,而最后一个的右侧是开区间,因此这4个实例对应的权重区间即为:
A:[0,220]
B:(220,410]
C:(410,560]
D:(560,690)
不难发现,区间的宽度就是总的平均响应时间-实例的平均响应时间,因此实例的平均响应时间越短,那么权重的区间就越大,那么被选中的几率就越大。

(3)实例选择

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. return null;
  4. }
  5. Server server = null;
  6. while (server == null) {
  7. // get hold of the current reference in case it is changed from the other thread
  8. List<Double> currentWeights = accumulatedWeights;
  9. if (Thread.interrupted()) {
  10. return null;
  11. }
  12. List<Server> allList = lb.getAllServers();
  13. int serverCount = allList.size();
  14. if (serverCount == 0) {
  15. return null;
  16. }
  17. int serverIndex = 0;
  18. // last one in the list is the sum of all weights
  19. double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
  20. // No server has been hit yet and total weight is not initialized
  21. // fallback to use round robin
  22. if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
  23. server = super.choose(getLoadBalancer(), key);
  24. if(server == null) {
  25. return server;
  26. }
  27. } else {
  28. // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
  29. double randomWeight = random.nextDouble() * maxTotalWeight;
  30. // pick the server index based on the randomIndex
  31. int n = 0;
  32. for (Double d : currentWeights) {
  33. if (d >= randomWeight) {
  34. serverIndex = n;
  35. break;
  36. } else {
  37. n++;
  38. }
  39. }
  40. server = allList.get(serverIndex);
  41. }
  42. if (server == null) {
  43. /* Transient. */
  44. Thread.yield();
  45. continue;
  46. }
  47. if (server.isAlive()) {
  48. return (server);
  49. }
  50. // Next.
  51. server = null;
  52. }
  53. return server;
  54. }

通过上述源码可见,其首先生成了一个 [0,最大权重值) 区间内的随机数,然后循环权重区间,如果该随机数在权限区间内,则就拿当前权重列表的索引去服务实例获取对应的服务。还是以上面的ABCD四个实例来说明,那么随机数就是从 [0,690) 的区间中获取,如果获取的随机数数230,那么该随机数在实例B的权重区间内,因此就会选择B实例。

6、ClientConfigEnabledRoundRobinRule

该策略比较特殊,一般不会使用它。因为它本身没有什么特殊的处理逻辑,正如下面源码所示,该策略在内部定义了一个RoundRobinRule策略,而choose函数调用的就是RoundRobinRule的choose函数。该类主要的作用就是通过继承该类,在子类中做一些其他的策略时,如果条件不满足,则会使用父类的策略。

  1. public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
  2. RoundRobinRule roundRobinRule = new RoundRobinRule();
  3. @Override
  4. public void initWithNiwsConfig(IClientConfig clientConfig) {
  5. roundRobinRule = new RoundRobinRule();
  6. }
  7. @Override
  8. public void setLoadBalancer(ILoadBalancer lb) {
  9. super.setLoadBalancer(lb);
  10. roundRobinRule.setLoadBalancer(lb);
  11. }
  12. @Override
  13. public Server choose(Object key) {
  14. if (roundRobinRule != null) {
  15. return roundRobinRule.choose(key);
  16. } else {
  17. throw new IllegalArgumentException(
  18. "This class has not been initialized with the RoundRobinRule class");
  19. }
  20. }
  21. }

7、BestAvailableRule

该策略会选出负载最低的实例。
BestAvailableRule继承自ClientConfigEnabledRoundRobinRule,从choose方法看,会循环所有Server实例,过滤掉故障实例并选出负载最低的Server。同时我们可以发现,如果没有选择到Server的话,就会调用父类的choose方法,那么就会使用到上面说的 “通过继承该类,在子类中做一些其他的策略时,如果条件不满足,则会使用父类的策略” 。

  1. public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
  2. private LoadBalancerStats loadBalancerStats;
  3. @Override
  4. public Server choose(Object key) {
  5. if (loadBalancerStats == null) {
  6. return super.choose(key);
  7. }
  8. List<Server> serverList = getLoadBalancer().getAllServers();
  9. int minimalConcurrentConnections = Integer.MAX_VALUE;
  10. long currentTime = System.currentTimeMillis();
  11. Server chosen = null;
  12. for (Server server: serverList) {
  13. ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
  14. if (!serverStats.isCircuitBreakerTripped(currentTime)) {
  15. int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
  16. if (concurrentConnections < minimalConcurrentConnections) {
  17. minimalConcurrentConnections = concurrentConnections;
  18. chosen = server;
  19. }
  20. }
  21. }
  22. if (chosen == null) {
  23. return super.choose(key);
  24. } else {
  25. return chosen;
  26. }
  27. }
  28. @Override
  29. public void setLoadBalancer(ILoadBalancer lb) {
  30. super.setLoadBalancer(lb);
  31. if (lb instanceof AbstractLoadBalancer) {
  32. loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
  33. }
  34. }
  35. }


8、PredicateBasedRule

该策略实现了先通过子类获取一部分实例,然后通过线性轮询的方式从该部分实例中获取一个实例。
PredicateBasedRule继承自ClientConfigEnabledRoundRobinRule,是一个抽象类,它首先使用getPredicate方法获取一个AbstractServerPredicate的实现。而choose方法则是调用AbstractServerPredicate类的chooseRoundRobinAfterFiltering方法获取对应的Server实例并返回。

  1. public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
  2. public abstract AbstractServerPredicate getPredicate();
  3. @Override
  4. public Server choose(Object key) {
  5. ILoadBalancer lb = getLoadBalancer();
  6. Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
  7. if (server.isPresent()) {
  8. return server.get();
  9. } else {
  10. return null;
  11. }
  12. }
  13. }

通过chooseRoundRobinAfterFiltering方法可以看到,其先是调用getEligibleServers方法获取了一部分实例,然后又调用了eligible.get(incrementAndGetModulo(eligible.size()))方法从该部分实例中动态获取了一个Server。其中getEligibleServers方法是根据this.apply(new PredicateKey(loadBalancerKey, server))进行过滤的,如果满足,就添加到返回的集合中,而apply方法,在AbstractServerPredicate中并不存在,因此需要子类实现;而incrementAndGetModulo方法则是直接返回了下一个整数(索引值),通过该索引值从返回的实例列表中取得Server实例。

  1. public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
  2. List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
  3. if (eligible.size() == 0) {
  4. return Optional.absent();
  5. }
  6. return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
  7. }
  8. public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
  9. if (loadBalancerKey == null) {
  10. return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
  11. } else {
  12. List<Server> results = Lists.newArrayList();
  13. for (Server server: servers) {
  14. if (this.apply(new PredicateKey(loadBalancerKey, server))) {
  15. results.add(server);
  16. }
  17. }
  18. return results;
  19. }
  20. }
  21. private int incrementAndGetModulo(int modulo) {
  22. for (;;) {
  23. int current = nextIndex.get();
  24. int next = (current + 1) % modulo;
  25. if (nextIndex.compareAndSet(current, next) && current < modulo)
  26. return current;
  27. }
  28. }

9、AvailabilityFilteringRule

会先过滤掉跳闸的,访问故障的访问,对剩下的访问进行轮询,对轮询进行优化

  1. public class AvailabilityFilteringRule extends PredicateBasedRule {
  2. private AbstractServerPredicate predicate;
  3. public AvailabilityFilteringRule() {
  4. super();
  5. predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
  6. .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
  7. .build();
  8. }
  9. @Override
  10. public void initWithNiwsConfig(IClientConfig clientConfig) {
  11. predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
  12. .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
  13. .build();
  14. }
  15. @Override
  16. public Server choose(Object key) {
  17. int count = 0;
  18. Server server = roundRobinRule.choose(key);
  19. while (count++ <= 10) {
  20. if (predicate.apply(new PredicateKey(server))) {
  21. return server;
  22. }
  23. server = roundRobinRule.choose(key);
  24. }
  25. return super.choose(key);
  26. }
  27. @Override
  28. public AbstractServerPredicate getPredicate() {
  29. return predicate;
  30. }
  31. }

该策略实现了轮询获取Server并校验Server状态的功能。
AvailabilityFilteringRule继承自PredicateBasedRule,从其choose方法可见,其并没有完全使用父类的实现方式,而是先轮询获取一个Server,然后判断该Server是否满足需要,如果满足,直接返回;如果不满足,就继续获取下一个Server,如果一直轮询10次还没有符合要求的Server,那么再使用父类的实现方式(先获取所有满足需求的Server列表,然后从该Server列表中轮询获取一个Server对象)

同时从AvailabilityFilteringRule构造函数中可以看到,AvailabilityFilteringRule使用的是AvailabilityPredicate,根据上面讲述的PredicateBasedRule,其必须要实现apply方法,从下述源码可见,apply方法主要是通过shouldSkipServer方法进行判断的,在该方法中,有两个判断维度:是否故障(断路器是否断开)、实例的并发请求数是否大于阈值(int的最大值)

  1. private static final DynamicBooleanProperty CIRCUIT_BREAKER_FILTERING =
  2. DynamicPropertyFactory.getInstance().getBooleanProperty("niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped", true);
  3. private static final DynamicIntProperty ACTIVE_CONNECTIONS_LIMIT =
  4. DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit", Integer.MAX_VALUE);
  5. private ChainedDynamicProperty.IntProperty activeConnectionsLimit = new ChainedDynamicProperty.IntProperty(ACTIVE_CONNECTIONS_LIMIT);
  6. @Override
  7. public boolean apply(@Nullable PredicateKey input) {
  8. LoadBalancerStats stats = getLBStats();
  9. if (stats == null) {
  10. return true;
  11. }
  12. return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
  13. }
  14. private boolean shouldSkipServer(ServerStats stats) {
  15. if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
  16. || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
  17. return true;
  18. }
  19. return false;
  20. }


10、ZoneAvoidanceRule

ZoneAvoidanceRule同样继承自PredicateBasedRule,同时ZoneAvoidanceRule中没有choose方法,说明完全复用了父类中的策略(先过滤所有可用的实例,然后使用轮询从满足需要的实例清单中获取一个Server)。同时通过ZoneAvoidanceRule的构造函数可见,使用的是CompositePredicate进行的过滤,CompositePredicate的构造函数传入了两个AbstractServerPredicate的子类,分别是主过滤条件ZoneAvoidancePredicate和次过滤条件AvailabilityPredicate(其实次过滤条件可以传入多个)

  1. public ZoneAvoidanceRule() {
  2. super();
  3. ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
  4. AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
  5. compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
  6. }
  7. private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
  8. return CompositePredicate.withPredicates(p1, p2)
  9. .addFallbackPredicate(p2)
  10. .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
  11. .build();
  12. }

首先可以看下CompositePredicate的构造函数相关,可以看到,上一步在创建CompositePredicate对象时:
首先调用了withPredicates方法,该方法调用了Builder(primaryPredicates),最后调用了Builder(AbstractServerPredicate …primaryPredicates)方法,在该方法中,将第一个过滤对象(ZoneAvoidancePredicate)赋值给delegate属性;
其次又调用了addFallbackPredicate方法,在该方法中,将第二个过滤对象(AvailabilityPredicate)赋值给了fallbacks属性

  1. private AbstractServerPredicate delegate;
  2. private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
  3. private int minimalFilteredServers = 1;
  4. private float minimalFilteredPercentage = 0;
  5. public static class Builder {
  6. private CompositePredicate toBuild;
  7. Builder(AbstractServerPredicate primaryPredicate) {
  8. toBuild = new CompositePredicate();
  9. toBuild.delegate = primaryPredicate;
  10. }
  11. Builder(AbstractServerPredicate ...primaryPredicates) {
  12. toBuild = new CompositePredicate();
  13. Predicate<PredicateKey> chain = Predicates.<PredicateKey>and(primaryPredicates);
  14. toBuild.delegate = AbstractServerPredicate.ofKeyPredicate(chain);
  15. }
  16. public Builder addFallbackPredicate(AbstractServerPredicate fallback) {
  17. toBuild.fallbacks.add(fallback);
  18. return this;
  19. }
  20. public Builder setFallbackThresholdAsMinimalFilteredNumberOfServers(int number) {
  21. toBuild.minimalFilteredServers = number;
  22. return this;
  23. }
  24. public Builder setFallbackThresholdAsMinimalFilteredPercentage(float percent) {
  25. toBuild.minimalFilteredPercentage = percent;
  26. return this;
  27. }
  28. public CompositePredicate build() {
  29. return toBuild;
  30. }
  31. }
  32. public static Builder withPredicates(AbstractServerPredicate ...primaryPredicates) {
  33. return new Builder(primaryPredicates);
  34. }
  35. public static Builder withPredicate(AbstractServerPredicate primaryPredicate) {
  36. return new Builder(primaryPredicate);
  37. }

然后可以看到CompositePredicate重写了父类中的getEligibleServers方法,因此,在获取满足条件Server集合时,就会调用CompositePredicate中的getEligibleServers方法,在该方法中,首先调用super.getEligibleServers(servers, loadBalancerKey),那么就会调用到CompositePredicate实现的apply方法,通过源码可以看到,这里直接调用了delegate.apply(input),也就是直接使用了主过滤类ZoneAvoidancePredicate的apply方法,获取到可用的服务列表后,在依次调用次过滤类(次过滤类可以是多个,CompositePredicate里只有一个AvailabilityPredicate)的getEligibleServers方法进行过滤。
CompositePredicate的总体处理逻辑如下:
(1)使用主过滤类对所有实例过滤并返回过滤后的清单
(2)依次使用次过滤类对已筛选出的清单进行再次过滤
(3)每次过滤之后,判断如果满足下面两个条件的话,就不再过滤:
过滤后的实例总数 >= 最小过滤实例数(默认值为1)
过滤后的实例比例 > 最小过滤百分比(默认值为0)
主过滤类ZoneAvoidancePredicate的apply方法在讲述SpringCloud—Ribbon—源码解析—IloadBalancer&ServerListUpdater&ServerListFilter实现的ZoneAwareLoadBalancer过滤器的时候,已经解析过源码,这里就不再赘述。

  1. @Override
  2. public boolean apply(@Nullable PredicateKey input) {
  3. return delegate.apply(input);
  4. }
  5. @Override
  6. public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
  7. List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
  8. Iterator<AbstractServerPredicate> i = fallbacks.iterator();
  9. while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
  10. && i.hasNext()) {
  11. AbstractServerPredicate predicate = i.next();
  12. result = predicate.getEligibleServers(servers, loadBalancerKey);
  13. }
  14. return result;
  15. }

自定义轮询

源码分析
image.png
更改ribbonconfig 类 实现随机访问

  1. package org.springcloud.config;
  2. import org.springframework.cloud.client.loadbalancer.LoadBalanced;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.web.client.RestTemplate;
  6. import com.netflix.loadbalancer.IRule;
  7. import com.netflix.loadbalancer.RandomRule;
  8. // 相当于spring 中的 applicationContext.xml
  9. //指示一个类声明一个或多个@Bean方法,并且可以由Spring容器处理,以便在运行时为这些bean生成BeanDefinition和服务请求
  10. @Configuration
  11. public class ConfigBean {
  12. // 原来是<bean></bean>
  13. // 现在使用spring注解
  14. /**
  15. * RestTemplate中没有@Bean,容器中没有该类,需要new一个新类,
  16. * 所以需要手动配置一个bean,这样容器中就有改类,就可以使用@Autowired进行注入 也就是说通过@Bean 将该类交给spring进行管理及ioc
  17. */
  18. // 配置负载均衡实现,现在是通过RestTemplate进行调用所以将它实现负载均衡就可以
  19. @Bean
  20. // @LoadBalanced本身本没有做做什么,只是开启了负载均衡
  21. @LoadBalanced // ribbon基于客服端实现负载均衡
  22. public RestTemplate getRestTemplate() {
  23. return new RestTemplate();
  24. }
  25. // 负载均衡的核心实现,IRul路由网关,项目网关
  26. //
  27. @Bean
  28. public IRule myIRule() {
  29. // 使用随机
  30. return new RandomRule();
  31. }
  32. }

测试结果则为随机访问服务提供者,结果也是随机的数据库。
编辑ribbon config 类
主要启动类中添加 @RibbonClient(name=”SPRINGCLOUD-PROVIDER-DEPT”) 告诉程序我们要对SPRINGCLOUD-PROVIDER-DEPT进行改造

  1. package org.springcloud;
  2. import org.springcloud.config.RibbonConfig;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
  7. import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
  8. import org.springframework.cloud.netflix.ribbon.RibbonClient;
  9. @SpringBootApplication
  10. @EnableAutoConfiguration(exclude = { DataSourceAutoConfiguration.class })
  11. @EnableDiscoveryClient
  12. // 在微服务启动时候就对SPRINGCLOUD-PROVIDER-DEPT进行Ribbon负载均衡改造,改造配置类RibbonConfig.class
  13. @RibbonClient(name = "SPRINGCLOUD-PROVIDER-DEPT", configuration = RibbonConfig.class)
  14. public class DeptConsumer_8082 {
  15. public static void main(String[] args) {
  16. SpringApplication.run(DeptConsumer_8082.class, args);
  17. }
  18. }

自定义Ribbon客户端
image.png
image.png
仿写ribbon本身实现类,在改造成自己的负载均衡实现

  1. package org.springcloud.config;
  2. import java.util.List;
  3. import java.util.concurrent.ThreadLocalRandom;
  4. import com.netflix.client.config.IClientConfig;
  5. import com.netflix.loadbalancer.AbstractLoadBalancerRule;
  6. import com.netflix.loadbalancer.ILoadBalancer;
  7. import com.netflix.loadbalancer.Server;
  8. public class MyRandomRule extends AbstractLoadBalancerRule {
  9. // 自定义负载均衡策略需求
  10. // 每个服务,访问3次,换下一个访问(服务器为2)
  11. // 需要一个值来记录访问次数,total,默认为0,如果当total=3,就下一个服务
  12. // index = 0 ,默认为0,如果total=3,则index+1
  13. /**
  14. * 被调用的次数
  15. */
  16. private int total = 0;
  17. /**
  18. * 当前服务
  19. */
  20. private int divIndex = 0;
  21. /**
  22. * Randomly choose from all living servers
  23. */
  24. // @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
  25. // "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
  26. public Server choose(ILoadBalancer lb, Object key) {
  27. if (lb == null) {
  28. return null;
  29. }
  30. Server server = null;
  31. while (server == null) {
  32. if (Thread.interrupted()) {
  33. return null;
  34. }
  35. // 获得可获得的服务器 翻译直译
  36. // get获取,Reachable 可获得的,Servers 服务
  37. List<Server> upList = lb.getReachableServers();
  38. // 获取所有的服务
  39. List<Server> allList = lb.getAllServers();
  40. int serverCount = allList.size();
  41. if (serverCount == 0) {
  42. return null;
  43. }
  44. // 注释掉随机算法,编辑自定义算法
  45. // 从所有的服务中数量中, 生成区间随机数
  46. // int index = chooseRandomInt(serverCount);
  47. // 从可获得活着的服务中,以随机数获取改服务,进行服务判断
  48. // server = upList.get(index);
  49. // ---------------------------------------------
  50. // div 算法,每个服务访问3次,换下一个
  51. if (total < 3) {
  52. // 如果次数小于3,则返回当前,并次数+1
  53. server = upList.get(divIndex);
  54. total++;
  55. }else {
  56. // 如果次数大于3则,重置次数为0;选择下一个服务divIndex+1
  57. total=0;
  58. divIndex++;
  59. // 如果当前服务大于可选择服务数量,则从0从新开始
  60. if(divIndex>upList.size()) {
  61. divIndex=0;
  62. }
  63. // 从活着的服务中获取调用服务
  64. server = upList.get(divIndex);
  65. }
  66. // ------------------------------------------
  67. if (server == null) {
  68. Thread.yield();
  69. continue;
  70. }
  71. if (server.isAlive()) {
  72. return (server);
  73. }
  74. server = null;
  75. Thread.yield();
  76. }
  77. return server;
  78. }
  79. protected int chooseRandomInt(int serverCount) {
  80. return ThreadLocalRandom.current().nextInt(serverCount);
  81. }
  82. public Server choose(Object key) {
  83. return choose(getLoadBalancer(), key);
  84. }
  85. public void initWithNiwsConfig(IClientConfig clientConfig) {
  86. // TODO Auto-generated method stub
  87. }
  88. }

RibbonConfig调用自定义算法

  1. package org.springcloud.config;
  2. import java.util.List;
  3. import java.util.concurrent.ThreadLocalRandom;
  4. import com.netflix.client.config.IClientConfig;
  5. import com.netflix.loadbalancer.AbstractLoadBalancerRule;
  6. import com.netflix.loadbalancer.ILoadBalancer;
  7. import com.netflix.loadbalancer.Server;
  8. public class MyRandomRule extends AbstractLoadBalancerRule {
  9. // 自定义负载均衡策略需求
  10. // 每个服务,访问3次,换下一个访问(服务器为2)
  11. // 需要一个值来记录访问次数,total,默认为0,如果当total=3,就下一个服务
  12. // index = 0 ,默认为0,如果total=3,则index+1
  13. /**
  14. * 被调用的次数
  15. */
  16. private int total = 0;
  17. /**
  18. * 当前服务
  19. */
  20. private int divIndex = 0;
  21. /**
  22. * Randomly choose from all living servers
  23. */
  24. // @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
  25. // "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
  26. public Server choose(ILoadBalancer lb, Object key) {
  27. if (lb == null) {
  28. return null;
  29. }
  30. Server server = null;
  31. while (server == null) {
  32. if (Thread.interrupted()) {
  33. return null;
  34. }
  35. // 获得可获得的服务器 翻译直译
  36. // get获取,Reachable 可获得的,Servers 服务
  37. List<Server> upList = lb.getReachableServers();
  38. // 获取所有的服务
  39. List<Server> allList = lb.getAllServers();
  40. int serverCount = allList.size();
  41. if (serverCount == 0) {
  42. return null;
  43. }
  44. // 注释掉随机算法,编辑自定义算法
  45. // 从所有的服务中数量中, 生成区间随机数
  46. // int index = chooseRandomInt(serverCount);
  47. // 从可获得活着的服务中,以随机数获取改服务,进行服务判断
  48. // server = upList.get(index);
  49. // ---------------------------------------------
  50. // div 算法,每个服务访问3次,换下一个
  51. if (total < 3) {
  52. // 如果次数小于3,则返回当前,并次数+1
  53. server = upList.get(divIndex);
  54. total++;
  55. }else {
  56. // 如果次数大于3则,重置次数为0;选择下一个服务divIndex+1
  57. total=0;
  58. divIndex++;
  59. // 如果当前服务大于可选择服务数量,则从0从新开始
  60. if(divIndex>upList.size()-1) {
  61. divIndex=0;
  62. }
  63. // 从活着的服务中获取调用服务
  64. server = upList.get(divIndex);
  65. }
  66. // ------------------------------------------
  67. if (server == null) {
  68. Thread.yield();
  69. continue;
  70. }
  71. if (server.isAlive()) {
  72. return (server);
  73. }
  74. server = null;
  75. Thread.yield();
  76. }
  77. return server;
  78. }
  79. protected int chooseRandomInt(int serverCount) {
  80. return ThreadLocalRandom.current().nextInt(serverCount);
  81. }
  82. public Server choose(Object key) {
  83. return choose(getLoadBalancer(), key);
  84. }
  85. public void initWithNiwsConfig(IClientConfig clientConfig) {
  86. // TODO Auto-generated method stub
  87. }
  88. }

测试服务访问,如愿得到结果