要分析Ribbon的执行流程,也就是分析Ribbon如何加载服务列表,如何选择服务的过程。
根据算法选择服务节点
先从我们使用端选择服务开始,业务代码中我们这样使用Ribbon客户端
@Autowired
private LoadBalancerClient loadBalancerClient;
@GetMapping("/allOrders/1")
public Object getOrders1() {
ServiceInstance serviceInstance = loadBalancerClient.choose("order-service");
String url = String.format("http://%s:%s/orders", serviceInstance.getHost(), serviceInstance.getPort());
return restTemplate.getForObject(url, String.class);
}
我们就从loadBalancerClient.choose("order-service")
作为分析入口。
首先进入到的是RibbonLoadBalancerClient#choose
@Override
public ServiceInstance choose(String serviceId) {
// 选择服务
Server server = getServer(serviceId);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected Server getServer(String serviceId) {
// 负载均衡器
return getServer(getLoadBalancer(serviceId));
}
// 获取负载均衡器
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
// 根据负载均衡器获取
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
clientFactory.getLoadBalancer(serviceId)
才是重点,这里的clientFactory
就是SpringClientFactory,可见是从每个客户端定制的IOC上下文中去获取定制的负载均衡器。
ILoadBalancer#chooseServer()有多个实现,最基础的是BaseLoadBalancer
public Server chooseServer(Object key) {
// 计数器
if (counter == null) {
counter = createCounter();
}
counter.increment();
// 调用规则实现进行选择
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
rule.choose(key)
参见具体的实现分析。
加载服务节点数列表
服务节点列表是存储在LoadBalancer遍历中,服务节点列表的获取就在其实现的构造方法中执行。
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
// 选择服务
public Server chooseServer(Object key);
public void markServerDown(Server server);
// 获取服务列表
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
// 抽象实现没有实现任何东西
public abstract class AbstractLoadBalancer implements ILoadBalancer {
// 服务节点状态类型枚举
public enum ServerGroup{
ALL,// 所有
STATUS_UP,// 可用
STATUS_NOT_UP // 不可用
}
public Server chooseServer() {
return chooseServer(null);
}
public abstract List<Server> getServerList(ServerGroup serverGroup);
public abstract LoadBalancerStats getLoadBalancerStats();
}
BaseLoadBalancer中定义了基本需要的字段
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private final static IRule DEFAULT_RULE = new RoundRobinRule();
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
private static final String DEFAULT_NAME = "default";
private static final String PREFIX = "LoadBalancer_";
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
// 服务列表保存
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
// 更新时使用锁
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
protected String name = DEFAULT_NAME;
protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;// 心跳检查间隔
protected int maxTotalPingTimeSeconds = 5;//
protected Comparator<Server> serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
protected LoadBalancerStats lbStats;// 统计信息
private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");
private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;
private IClientConfig config;// 配置
// 监听器
private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
boolean isSecure = false;
boolean useTunnel = false;
// to keep track of modification of server lists
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
// 更新服务节点列表
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
// 初始化
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
// 设置任务
enableAndInitLearnNewServersFeature();
// 更新列表
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
}
}
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
// 最终调用serverList的实现类进行获取
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
protected void updateAllServerList(List<T> ls) {
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
// 更新列表
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
在全局配置中会注册BeanServerList<Server>
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
// 构造方法中设值
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
// ServerList
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
if (this.propertiesFactory.isSet(ServerList.class, name)) {
return this.propertiesFactory.get(ServerList.class, config, name);
}
// 默认使用配置文件中的
ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
serverList.initWithNiwsConfig(config);
return serverList;
}
如果添加Eureka客户端在maven中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
会导入默认注解配置EurekaRibbonClientConfiguration
@Configuration
@EnableConfigurationProperties
@ConditionalOnRibbonAndEurekaEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
// 默认配置EurekaRibbonClientConfiguration,可覆盖RibbonClientConfiguration
@RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
public class RibbonEurekaAutoConfiguration {
}
Eureka动态获取服务列表的实现
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
// 动态查找服务列表实现
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
具体覆盖原理逻辑参见