Ribbon 是提供了客户端负载均衡的功能,其原理就是调用了 Eureka 的服务注册信息,实现负载均衡

  1. @Configuration
  2. public class AppConfig {
  3. @Bean
  4. @LoadBalanced // 使用 Ribbon 负载均衡
  5. public RestTemplate restTemplate() {
  6. return new RestTemplate();
  7. }
  8. }
  9. @RequestMapping("/order/get/{id}")
  10. public R getOrder(@PathVariable String id) {
  11. return restTemplate.getForObject("http://server-order/order/get/" + id, R.class);
  12. }

LoadBalancerAutoConfiguration 配置类

Ribbo’n 负载均衡的实现其实就是给 RestTemplate 增加了一个拦截器,拦截其请求,并替换 server-order 对应的 IP 地址和 PORT,以实现负载均衡的原理

  1. public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
  2. if (this.interceptors != interceptors) {
  3. this.interceptors.clear();
  4. this.interceptors.addAll(interceptors);
  5. AnnotationAwareOrderComparator.sort(this.interceptors);
  6. }
  7. }

Spring Cloud 整合了 Ribbon

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
        AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
        ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
}

进入 LoadBalancerAutoConfiguration.class 类,该类会读取所有有标记 @LoadBalanced 注解的 RestTemplate ,并存入一个集合中,然后依次遍历集合,给 RestTemplate 加上 LoadBalancerInterceptor 自定义拦截器

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

    // 只会注入所有 @LoadBalanced 修饰的 RestTemplate
    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    @Configuration
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    static class LoadBalancerInterceptorConfig {

        // 自定义拦截器
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        // 添加自定义拦截器
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }

}

LoadBalancerInterceptor 拦截器

RestTemplate 发送请求的时候,就会进如 LoadBalancerInterceptor 拦截器的 intercept 方法进行拦截

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
            LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

我们可以看到请求之前,我们先获取了 serviceName ,然后执行了 this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)) 方法

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
    throws IOException {
    return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

ILoadBalancer 负载均衡器

负载均衡器:会根据微服务的名字获取负载均衡器 getLoadBalancer(serviceId) ,如果有就获取,没有就构建

然后会执行 getLoadBalancer(serviceId) 方法

  • 根据微服务名称,构建不同的 Spring 容器进行隔离配置
  • 构建负载均衡器
  • 定时刷新配置,30s 一次
  • 定时 ping 服务是否可用 10s 一次

    public ILoadBalancer getLoadBalancer(String name) {
      return getInstance(name, ILoadBalancer.class);
    }
    

    这里会返回一个 DynamicServerListLoadBalancer.class 的对象,这个对象对通过当前传入的微服务名称,从当前客户端的 EurekaClient 中获取当前微服务的所有地址

    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    
      protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
          @Override
          public void doUpdate() {
              updateListOfServers();
          }
      };
    
      public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                               ServerList<T> serverList, ServerListFilter<T> filter,
                                               ServerListUpdater serverListUpdater) {
           super(clientConfig, rule, ping);
           this.serverListImpl = serverList;
           this.filter = filter;
           this.serverListUpdater = serverListUpdater;
           if (filter instanceof AbstractServerListFilter) {
               ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
           }
           // 初始化
           restOfInit(clientConfig);
       }
    
      void restOfInit(IClientConfig clientConfig) {
          boolean primeConnection = this.isEnablePrimingConnections();
          // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
          this.setEnablePrimingConnections(false);
          enableAndInitLearnNewServersFeature();
    
          // 获取 注册中心的客户端 的配置信息
          updateListOfServers();
    
          if (primeConnection && this.getPrimeConnections() != null) {
              this.getPrimeConnections()
                  .primeConnections(getReachableServers());
          }
          this.setEnablePrimingConnections(primeConnection);
          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
      }
    
      @VisibleForTesting
      public void updateListOfServers() {
          List<T> servers = new ArrayList<T>();
          // serverListImpl 就是微服务的名字
          if (serverListImpl != null) {
              // 通过微服务的名字获取所有的微服务IP地址
              servers = serverListImpl.getUpdatedListOfServers();
              LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                      getIdentifier(), servers);
    
              if (filter != null) {
                  servers = filter.getFilteredListOfServers(servers);
                  LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                          getIdentifier(), servers);
              }
          }
          updateAllServerList(servers);
      }
    }
    

定时器

定时器主要是完成缓存(注册信息)刷新

image.png

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }
}

PING 逻辑

ping 的逻辑在 DynamicServerListLoadBalancer 的父类 BaseLoadBalancer

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

    public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
                IPing ping, IPingStrategy pingStrategy) {

        logger.debug("LoadBalancer [{}]:  initialized", name);

        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, 10 * 1000);
        forceQuickPing();
    }

    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

    class Pinger {

        private final IPingStrategy pingerStrategy;

        public Pinger(IPingStrategy pingerStrategy) {
            this.pingerStrategy = pingerStrategy;
        }

        public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }

            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }

    private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    // NOTE: IFF we were doing a real ping
                    // assuming we had a large set of servers (say 15)
                    // the logic below will run them serially
                    // hence taking 15 times the amount of time it takes
                    // to ping each server
                    // A better method would be to put this in an executor
                    // pool
                    // But, at the time of this writing, we dont REALLY
                    // use a Real Ping (its mostly in memory eureka call)
                    // hence we can afford to simplify this design and run
                    // this
                    // serially
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }

}

image.png

  • DummyPing:直接返回 true
  • NIWSDiscoverPing(默认):去本地拿当前传入的微服务的状态,判断是否是 UP
  • NoOpPing:直接返回 true
  • PingConstant:直接返回 true
  • PingUrl:根据 Server 的地址,构建一个 HTTP GET 请求,真实的去 Ping

替换为 PingUrl 组件

RibbonEurekaAutoConfiguration 会导入 EurekaRibbonClientConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnRibbonAndEurekaEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
public class RibbonEurekaAutoConfiguration {

}

EurekaRibbonClientConfiguration 中会判断容器中是否由 IPing 这个对象,如果没有就会执行 @Bean 标记的方法

@Configuration
public class EurekaRibbonClientConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

}

所以我们只需要在 AppConfig 下自定义我们需要的 IPing 就可以了

@Configuration
public class AppConfig {

    @Bean
    public IPing iPing() {
        return new PingUrl();
    }
}

根据不同的微服务,使用不同的 IPing 规则

如果有多个不同的微服务,要为每个微服务采取不同的 IPing 测率,那么就需要遵守下述规则

  • 如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
  • 在启动类下定义不同的 @RibbonClients 对象,并指明需要使用的配置类
@Configuration // 不能被componentScan 扫描到
public class OrderRuleConfig {

    /*
    如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
    并且在 AppConfig 下定义不同的 @RibbonClient
     */
    @Bean
    public IRule iRule(){
        return new RetryRule();
    }

}

@Configuration // 不能被componentScan 扫描到
public class PowerRuleConfig {

    /*
    如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
    并且在 AppConfig 下定义不同的 @RibbonClient
     */
    @Bean
    public IRule iRule(){
        // return new MyRule();
        return new RetryRule();
    }
}

@SpringBootApplication
@EnableEurekaClient
@RibbonClients({
        @RibbonClient(name = "server-order", configuration = OrderRuleConfig.class),
        @RibbonClient(name = "server-power", configuration = PowerRuleConfig.class)
})
@EnableFeignClients
@EnableHystrix
public class AppUserClient {

    public static void main(String[] args) {
        SpringApplication.run(AppUserClient.class);
    }

}