Ribbon 是提供了客户端负载均衡的功能,其原理就是调用了 Eureka 的服务注册信息,实现负载均衡
@Configurationpublic class AppConfig {@Bean@LoadBalanced // 使用 Ribbon 负载均衡public RestTemplate restTemplate() {return new RestTemplate();}}@RequestMapping("/order/get/{id}")public R getOrder(@PathVariable String id) {return restTemplate.getForObject("http://server-order/order/get/" + id, R.class);}
LoadBalancerAutoConfiguration 配置类
Ribbo’n 负载均衡的实现其实就是给 RestTemplate 增加了一个拦截器,拦截其请求,并替换 server-order 对应的 IP 地址和 PORT,以实现负载均衡的原理
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {if (this.interceptors != interceptors) {this.interceptors.clear();this.interceptors.addAll(interceptors);AnnotationAwareOrderComparator.sort(this.interceptors);}}
Spring Cloud 整合了 Ribbon
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {
}
进入 LoadBalancerAutoConfiguration.class 类,该类会读取所有有标记 @LoadBalanced 注解的 RestTemplate ,并存入一个集合中,然后依次遍历集合,给 RestTemplate 加上 LoadBalancerInterceptor 自定义拦截器
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
// 只会注入所有 @LoadBalanced 修饰的 RestTemplate
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
// 自定义拦截器
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
// 添加自定义拦截器
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
LoadBalancerInterceptor 拦截器
当 RestTemplate 发送请求的时候,就会进如 LoadBalancerInterceptor 拦截器的 intercept 方法进行拦截
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}
我们可以看到请求之前,我们先获取了 serviceName ,然后执行了 this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)) 方法
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
ILoadBalancer 负载均衡器
负载均衡器:会根据微服务的名字获取负载均衡器 getLoadBalancer(serviceId) ,如果有就获取,没有就构建
然后会执行 getLoadBalancer(serviceId) 方法
- 根据微服务名称,构建不同的 Spring 容器进行隔离配置
- 构建负载均衡器
- 定时刷新配置,30s 一次
定时 ping 服务是否可用 10s 一次
public ILoadBalancer getLoadBalancer(String name) { return getInstance(name, ILoadBalancer.class); }这里会返回一个
DynamicServerListLoadBalancer.class的对象,这个对象对通过当前传入的微服务名称,从当前客户端的 EurekaClient 中获取当前微服务的所有地址public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } // 初始化 restOfInit(clientConfig); } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); // 获取 注册中心的客户端 的配置信息 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); // serverListImpl 就是微服务的名字 if (serverListImpl != null) { // 通过微服务的名字获取所有的微服务IP地址 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } }
定时器
定时器主要是完成缓存(注册信息)刷新

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
}
PING 逻辑
ping 的逻辑在 DynamicServerListLoadBalancer 的父类 BaseLoadBalancer 中
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
logger.debug("LoadBalancer [{}]: initialized", name);
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
}
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, 10 * 1000);
forceQuickPing();
}
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
class Pinger {
private final IPingStrategy pingerStrategy;
public Pinger(IPingStrategy pingerStrategy) {
this.pingerStrategy = pingerStrategy;
}
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
}
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// NOTE: IFF we were doing a real ping
// assuming we had a large set of servers (say 15)
// the logic below will run them serially
// hence taking 15 times the amount of time it takes
// to ping each server
// A better method would be to put this in an executor
// pool
// But, at the time of this writing, we dont REALLY
// use a Real Ping (its mostly in memory eureka call)
// hence we can afford to simplify this design and run
// this
// serially
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
}

- DummyPing:直接返回 true
- NIWSDiscoverPing(默认):去本地拿当前传入的微服务的状态,判断是否是 UP
- NoOpPing:直接返回 true
- PingConstant:直接返回 true
- PingUrl:根据 Server 的地址,构建一个 HTTP GET 请求,真实的去 Ping
替换为 PingUrl 组件
RibbonEurekaAutoConfiguration 会导入 EurekaRibbonClientConfiguration 类
@Configuration
@EnableConfigurationProperties
@ConditionalOnRibbonAndEurekaEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
public class RibbonEurekaAutoConfiguration {
}
EurekaRibbonClientConfiguration 中会判断容器中是否由 IPing 这个对象,如果没有就会执行 @Bean 标记的方法
@Configuration
public class EurekaRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
}
所以我们只需要在 AppConfig 下自定义我们需要的 IPing 就可以了
@Configuration
public class AppConfig {
@Bean
public IPing iPing() {
return new PingUrl();
}
}
根据不同的微服务,使用不同的 IPing 规则
如果有多个不同的微服务,要为每个微服务采取不同的 IPing 测率,那么就需要遵守下述规则
- 如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
- 在启动类下定义不同的
@RibbonClients对象,并指明需要使用的配置类
@Configuration // 不能被componentScan 扫描到
public class OrderRuleConfig {
/*
如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
并且在 AppConfig 下定义不同的 @RibbonClient
*/
@Bean
public IRule iRule(){
return new RetryRule();
}
}
@Configuration // 不能被componentScan 扫描到
public class PowerRuleConfig {
/*
如果希望不同服务调用不同策略,那么 IRule 不能放再能被 componentScan 扫描到的包下
并且在 AppConfig 下定义不同的 @RibbonClient
*/
@Bean
public IRule iRule(){
// return new MyRule();
return new RetryRule();
}
}
@SpringBootApplication
@EnableEurekaClient
@RibbonClients({
@RibbonClient(name = "server-order", configuration = OrderRuleConfig.class),
@RibbonClient(name = "server-power", configuration = PowerRuleConfig.class)
})
@EnableFeignClients
@EnableHystrix
public class AppUserClient {
public static void main(String[] args) {
SpringApplication.run(AppUserClient.class);
}
}
