要分析Ribbon的执行流程,也就是分析Ribbon如何加载服务列表,如何选择服务的过程。

根据算法选择服务节点

先从我们使用端选择服务开始,业务代码中我们这样使用Ribbon客户端

  1. @Autowired
  2. private LoadBalancerClient loadBalancerClient;
  3. @GetMapping("/allOrders/1")
  4. public Object getOrders1() {
  5. ServiceInstance serviceInstance = loadBalancerClient.choose("order-service");
  6. String url = String.format("http://%s:%s/orders", serviceInstance.getHost(), serviceInstance.getPort());
  7. return restTemplate.getForObject(url, String.class);
  8. }

我们就从loadBalancerClient.choose("order-service")作为分析入口。
首先进入到的是RibbonLoadBalancerClient#choose

@Override
public ServiceInstance choose(String serviceId) {
    // 选择服务
    Server server = getServer(serviceId);
    if (server == null) {
        return null;
    }
    return new RibbonServer(serviceId, server, isSecure(server, serviceId),
            serverIntrospector(serviceId).getMetadata(server));
}

protected Server getServer(String serviceId) {
    // 负载均衡器
    return getServer(getLoadBalancer(serviceId));
}

// 获取负载均衡器
protected ILoadBalancer getLoadBalancer(String serviceId) {
    return this.clientFactory.getLoadBalancer(serviceId);
}
// 根据负载均衡器获取
protected Server getServer(ILoadBalancer loadBalancer) {
    if (loadBalancer == null) {
        return null;
    }
    return loadBalancer.chooseServer("default"); // TODO: better handling of key
}

clientFactory.getLoadBalancer(serviceId)才是重点,这里的clientFactory就是SpringClientFactory,可见是从每个客户端定制的IOC上下文中去获取定制的负载均衡器。
image.png
ILoadBalancer#chooseServer()有多个实现,最基础的是BaseLoadBalancer

public Server chooseServer(Object key) {
    // 计数器
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    // 调用规则实现进行选择
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

rule.choose(key)参见具体的实现分析

加载服务节点数列表

服务节点列表是存储在LoadBalancer遍历中,服务节点列表的获取就在其实现的构造方法中执行。
image.png

public interface ILoadBalancer {
    public void addServers(List<Server> newServers);
    // 选择服务
    public Server chooseServer(Object key);
    public void markServerDown(Server server);
    // 获取服务列表
    public List<Server> getReachableServers();
    public List<Server> getAllServers();
}

// 抽象实现没有实现任何东西
public abstract class AbstractLoadBalancer implements ILoadBalancer {
    // 服务节点状态类型枚举
    public enum ServerGroup{
        ALL,// 所有
        STATUS_UP,// 可用
        STATUS_NOT_UP    // 不可用    
    }
    public Server chooseServer() {
        return chooseServer(null);
    }
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    public abstract LoadBalancerStats getLoadBalancerStats();    
}

BaseLoadBalancer中定义了基本需要的字段

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
    private static final String DEFAULT_NAME = "default";
    private static final String PREFIX = "LoadBalancer_";

    protected IRule rule = DEFAULT_RULE;
    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
    protected IPing ping = null;
    // 服务列表保存
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
    // 更新时使用锁
    protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
    protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

    protected String name = DEFAULT_NAME;
    protected Timer lbTimer = null;
    protected int pingIntervalSeconds = 10;// 心跳检查间隔
    protected int maxTotalPingTimeSeconds = 5;// 
    protected Comparator<Server> serverComparator = new ServerComparator();
    protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
    protected LoadBalancerStats lbStats;// 统计信息
    private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");
    private PrimeConnections primeConnections;
    private volatile boolean enablePrimingConnections = false;

    private IClientConfig config;// 配置
    // 监听器
    private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
    private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    // to keep track of modification of server lists
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;
    // 更新服务节点列表
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    protected volatile ServerListUpdater serverListUpdater;

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        // 初始化
        restOfInit(clientConfig);
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        // 设置任务
        enableAndInitLearnNewServersFeature();
        // 更新列表
        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections().primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
    }
}
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    // 最终调用serverList的实现类进行获取
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
        }
    }
    updateAllServerList(servers);
}

protected void updateAllServerList(List<T> ls) {
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                                  // servers right away instead
                                  // of having to wait out the ping cycle.
            }
            // 更新列表
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

在全局配置中会注册BeanServerList<Server>

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        // 构造方法中设值
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
            serverListFilter, serverListUpdater);
}

// ServerList
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("unchecked")
public ServerList<Server> ribbonServerList(IClientConfig config) {
    if (this.propertiesFactory.isSet(ServerList.class, name)) {
        return this.propertiesFactory.get(ServerList.class, config, name);
    }
    // 默认使用配置文件中的
    ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
    serverList.initWithNiwsConfig(config);
    return serverList;
}

如果添加Eureka客户端在maven中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

会导入默认注解配置EurekaRibbonClientConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnRibbonAndEurekaEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
// 默认配置EurekaRibbonClientConfiguration,可覆盖RibbonClientConfiguration
@RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
public class RibbonEurekaAutoConfiguration {
}

Eureka动态获取服务列表的实现

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
    if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
        return this.propertiesFactory.get(ServerList.class, config, serviceId);
    }
    // 动态查找服务列表实现
    DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
            config, eurekaClientProvider);

    DomainExtractingServerList serverList = new DomainExtractingServerList(
            discoveryServerList, config, this.approximateZoneFromHostname);
    return serverList;
}

具体覆盖原理逻辑参见