1.阅读Eureka应该由该地方渐入式阅读
1.1 测试代码阅读相关类
- 阅读在eureka-server项目下的test模块下的
com.netflix.eureka.resources.EurekaClientServerRestIntegrationTest:在这个测试里,他会将eureka注册中心启动起来,然后模拟eureka客户端(服务)去发送各种请求到eureka注册中心,去测试各种功能 在eureka-examples中对
com.netflix.eureka.ExampleEurekaClient中对main()添加: ```java // EurekaClientServerRestIntegrationTest 内方法 private static void injectEurekaConfiguration() throws UnknownHostException {String myHostName = InetAddress.getLocalHost().getHostName();String myServiceUrl = "http://" + myHostName + ":8080/v2/";System.setProperty("eureka.region", "default");System.setProperty("eureka.name", "eureka");System.setProperty("eureka.vipAddress", "eureka.mydomain.net");System.setProperty("eureka.port", "8080");System.setProperty("eureka.preferSameZone", "false");System.setProperty("eureka.shouldUseDns", "false");System.setProperty("eureka.shouldFetchRegistry", "false");System.setProperty("eureka.serviceUrl.defaultZone", myServiceUrl);System.setProperty("eureka.serviceUrl.default.defaultZone", myServiceUrl);System.setProperty("eureka.awsAccessId", "fake_aws_access_id");System.setProperty("eureka.awsSecretKey", "fake_aws_secret_key");System.setProperty("eureka.numberRegistrySyncRetries", "0");
}
public static void main(String[] args) throws UnknownHostException { //添加Eureka配置相关配置 injectEurekaConfiguration();
ExampleEurekaClient sampleClient = new ExampleEurekaClient();
// create the client
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
// use the client
sampleClient.sendRequestToServiceUsingEureka(client);
// shutdown the client
eurekaClient.shutdown();
}
> 本章由spring cloud 的 15章开始记录
<a name="u8OWH"></a>
### 1.2 eureka-server模块中文件分析
<a name="9pb0q"></a>
#### 1.2.1 eureka-server 中 build.gradle分析
```bash
eureka-server在该模块的build.gradle中有如下依赖:
compile project(':eureka-client') //eureka-server也是一个eureka-client
compile project(':eureka-core')//扮演了核心的注册中心的角色,接收别人的服务注册请求,提供服务发现的功能,保持心跳(续约请求),摘除故障服务实例。eureka server依赖eureka core的,基于eureka core的功能对外暴露接口,提供注册中心的功能
compile "com.sun.jersey:jersey-server:$jerseyVersion"
compile "com.sun.jersey:jersey-servlet:$jerseyVersion"//eureka server依赖jersey框架,类比于spring web mvc框架,支持mvc模式,支持restful http请求
(eureka client和eureka server之间进行通信,都是基于jersey框架实现http restful接口请求和调用的)
testCompile "org.mockito:mockito-core:${mockitoVersion}" //mock测试框架,用于单元测试
testCompile "org.eclipse.jetty:jetty-server:$jetty_version"
testCompile "org.eclipse.jetty:jetty-webapp:$jetty_version"//测试时候,是会基于jetty直接将eureka server作为一个web应用给跑起来,jetty作为web容器,跑起来eureka server这个web应用
1.2.2 eureka-server 中 web.xml分析
eureka-server在该模块中的${eureka-server}\src\main\webapp\WEB-INF\web.xml
<!-- 负责eureka-server的初始化,在eureka-core里 -->
<filter>
<filter-name>statusFilter</filter-name>
<filter-class>com.netflix.eureka.StatusFilter</filter-class>
</filter>
<!-- 负责状态相关的处理逻辑 -->
<filter>
<filter-name>statusFilter</filter-name>
<filter-class>com.netflix.eureka.StatusFilter</filter-class>
</filter>
<!-- 对请求进行授权认证的处理的 -->
<filter>
<filter-name>requestAuthFilter</filter-name>
<filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
</filter>
<!-- 负责限流相关的逻辑的 -->
<filter>
<filter-name>rateLimitingFilter</filter-name>
<filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
</filter>
<!-- 压缩相关 -->
<filter>
<filter-name>gzipEncodingEnforcingFilter</filter-name>
<filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
</filter>
<!--jersey框架的一个ServletContainer的一个filter-->
<filter>
<filter-name>jersey</filter-name>
<filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
<init-param>
<param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
<param-value>/(flex|images|js|css|jsp)/.*</param-value>
</init-param>
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>com.sun.jersey;com.netflix</param-value>
</init-param>
<!-- GZIP content encoding/decoding -->
<init-param>
<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
</init-param>
<init-param>
<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
</init-param>
</filter>
<!-- statusFilter、requestAuthFilter就是通用的处理逻辑,是对所有的请求都开放的 -->
<filter-mapping>
<filter-name>statusFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>requestAuthFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- rateLimitingFilter 默认是不开启的,如果你要打开eureka-server内置的限流功能,
你需要自己把RateLimitingFilter的<filter-mapping>的注释打开,让这个filter生效
<filter-mapping>
<filter-name>rateLimitingFilter</filter-name>
<url-pattern>/v2/apps</url-pattern>
<url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>
-->
<!-- /v2/apps相关的请求,会走这里,仅仅对部分特殊的请求生效 -->
<filter-mapping>
<filter-name>gzipEncodingEnforcingFilter</filter-name>
<url-pattern>/v2/apps</url-pattern>
<url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>
<!-- jersey核心filter,是拦截所有的请求的。 -->
<filter-mapping>
<filter-name>jersey</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
# ps: 请求被jersey拦截之后,就是登录erurka-server服务器地址,
# 而在该控制面板就是在eureka-resources模块的webapp文件夹内,页面配置文件就是一个jsp,
# 当中status.jsp是欢迎页面首页,eureka-server的控制台页面,展示注册服务的信息
2. eureka-core源码阅读[eureka server 如何启动的]
web容器(tomcat还是jetty)启动的时候,把eureka-server作为一个web应用给带起来的时候,eureka-server的初始化的逻辑,是透过监听器拦截,核心在EurekaBootStrap(eureka-server),监听器通过执行初始化方法contextInitialized()方法启动初始化eureka-server一个入口
2.1 EurekaBootStrap#contextInitialized()方法理解
该方法为eureka-server启动的入口,源码如下:
@Override
public void contextInitialized(ServletContextEvent event) {
try {
//配置eureka配置环境
initEurekaEnvironment();
//
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
2.1.1分析initEurekaEnvironment()方法执行流程
- 分析源码:

归纳:
- 创建了一个ConcurrentCompositeConfiguration实例, 它就是一个所谓的配置器,包括eureka所需要的配置信息。
- 上面的ConcurrentCompositeConfiguration实例加入了一堆别的config,然后搞完了以后,就直接返回了这个实例,就是作为配置的单例
- 初始化数据中心的配置,如果没有配置的话,就是DEFAULT data center
- 初始化eurueka运行的环境,如果你没有配置的话,默认就给你设置为test环境
initEurekaEnvironment的初始化环境的逻辑完毕
2.1.2分析 initEurekaServerContext()方法执行流程
进入EurekaBootStrap#initEurekaServerContext()方法如下:
protected void initEurekaServerContext() throws Exception { //1.第一步.加载eureka-server.properties文件中的配置 EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); //第二步,初始化ApplicationInfoManager ApplicationInfoManager applicationInfoManager = null; //第三步,初始化eureka-server内部的一个eureka-client(用来跟其他的eureka-server节点进行注册和通信的) if (eurekaClient == null) { //Eureka服务器注册所需的配置信息instanceConfig的实现类模式是MyDataCenterInstanceConfig EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( //get()方法构建了instanceInfo instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); //初始化一个eureka clinet(默认实现是DiscoveryClient) eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); } //第四部,处理注册相关事情 PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { //ps:非AWS的通常走这里!(默认走这里) registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); } //第五部,处理Peer节点相关事情 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( registry, eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager ); //第六部,完成eureka-server的上下文(context)构建 serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); serverContext.initialize(); // 第七部,处理一点善后的事情,从想邻的eureka节点表拷贝注册信息 int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); //第八部,处理一点善后的事情,注册所有监控 EurekaMonitors.registerAllStats(); }
- 源码分析
1、解析 eureka-server.properties
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
DefaultEurekaServerConfig 实现了接口 EurekaServerConfig。EurekaServerConfig主要是 Eureka Server 逻辑中需要使用的一些变量,它主要包含。上面的代码的逻辑就是通过它的构建器调用 init() 方法。然后调用 ConfigurationManager#loadCascadedPropertiesFromResources 方法法 eureka-server.properties 加载到配置管理器 ConfigurationManager 当中。 ConfigurationManager 这个类的作用就是用来获取 Eureka 服务的配置信息
2、解析 eureka-client.properties,创建 EurekaInstanceConfig 对象
当 Eureka Server 进行启动的时候,eurekaClient 必然是空,而且我们的服务一般也不是部署在 AWS 上面,所以会调用 new MyDataCenterInstanceConfig()。
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
//默认基本走这里
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}
MyDataCenterInstanceConfig 的类继承结构如下:
MyDataCenterInstanceConfig 实现了接口 EurekaInstanceConfig 接口。上面已经说过 Eureka Server 在集群环境下,它其实本身也是一个 Eureka Client也可以认为是一个服务实例 ,EurekaInstanceConfig 这个接口它主要是对于服务实例信息的一个抽象。
在初始化 MyDataCenterInstanceConfig的时候会调用 MyDataCenterInstanceConfig 的无参构建器,初始化子类之前会调用它的父类 PropertiesInstanceConfig 的无参构建器。它会通过Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME) 把 eureka-client.properties 加载解析到 ConfigurationManager 当中,并返回一个 DynamicPropertyFactory 实例对象用于获取配置信息。
3、创建应用信息管理器
ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(
//get()方法构建了instanceInfo
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
在创建创建应用信息管理器信息的时候会把第二步创建好的 EurekaInstanceConfig 与 通过 EurekaConfigBasedInstanceInfoProvider#get 创建的 InstanceInfo 信息用来创建应用信息管理器。首先我们来分析一下 InstanceInfo 是如何进行创建的。通过 EurekaConfigBasedInstanceInfoProvider 的构造器传入从 eureka-client.properties 解析来的 Eureka Client 的配置信息。然后通过 EurekaConfigBasedInstanceInfoProvider#get 通过构建器 InstanceInfo.Builder 进行对象构建。
4、解析 eureka-client.properties,创建 EurekaClientConfig 对象
Eureka Server 在集群环境下,它需要同步其它 Eureka Server 的信息。而 Eureka Client提供了很方便的方法接口来调用 Eureka Server 提供的接口。
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
在 EurekaClientConfig 构建器当中它会通过 Archaius1Utils#initConfig 把 eureka-client.properties 加载解析到 ConfigurationManager 当中,并返回一个 DynamicPropertyFactory 实例对象传入 DefaultEurekaTransportConfig 构造器中创建 EurekaTransportConfig 用于获取 Eureka 网络传输的配置信息。
5、创建 DiscoveryClient
EurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
上文已经说过了, Eureka Server在集群环境下会同步其它 Eureka Server的注册信息到当前 Eureka Server当中来。到这里就会通过上面构建的 ApplicationInfoManager(应用信息管理器) 与 EurekaClientConfig(Eureka Client 配置信息) 来创建一个 EurekaClient。 EurekaClient的构建过程有点复杂这里就不作过多分析,在下面的一篇文章当中会详细的分析它的初始化过程。
它最主要的功能就是同时发送心跳到 Eureka Server,定时同步最新的注册表到当前 Eureka Client(服务的自动上线与下线)。
6、创建 PeerAwareInstanceRegistry 实例对象
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
//ps:非AWS的通常走这里!(默认走这里)
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
根据 Eureka Server 配置、Eureka Client 配置以及 EurekaClient 创建 PeerAwareInstanceRegistryImpl 类,这个类实现了 PeerAwareInstanceRegistry、InstanceRegistry 这两个接口。主要具有服务注册以及从其它的 Eureka Server 集群拉取注册信息。
7、创建 PeerEurekaNodes 对象
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
创建 PeerEurekaNodes ,这个类主要是通过 PeerEurekaNodes#start 方法来更新 Eureka Server 集群的服务信息。
8、构建并初始化 EurekaServerContext 对象
EurekaServerContext serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();
把上面创建的所有组件信息用来创建 EurekaServerContext 信息,调用把 EurekaServerContext 保存到 EurekaServerContextHolder 当中,然后进行初始化。主要包括以下步骤:
- 调用
PeerEurekaNodes#start方法定时更新Eureka Server服务信息列表 - 调用
PeerAwareInstanceRegistry#init方法初始化响应缓存(ResponseCache),这个响应缓存主要是缓存服务的注册信息;然后就是启动检测服务的心跳是否过期了(eureka client 会定期向服务器发送心跳,也就是续约[renewal])。9、从其它 Eureka Server 同步注册表
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
在这里它会通过 Eureka Client 请求远程相邻的 Eureka Server 获取注册的服务信息,初始化注册列表
10、注册监控信息
EurekaMonitors.registerAllStats()
在这里它会利用 JmxMonitorRegistry 把 EurekaMonitors 所有的枚举类注册到 JMX 监控当中
3.Eureka Client 是如何注册的
在集群环境下 Eureka Server 相互之前需要同步注册表信息。所以不管是微服务当中还是 Eureka Server 当中都需要依赖 eureka-client 这个 Jar 包。它封装 Eureka Server 提供的 Restful 服务,依赖方可以以接口方法的方式方便的进行调用。下面是 Eureka 官网提供的系统架构图。
在这个图中有以下几个角色:
Eureka Server:Eureka 服务器,它是注册中心提供接口给应用把服务实例的信息注册上来Eureka Client:Eureka 客户端,Eureka 包装好了微服务访问 Eureka 服务器的一系列接口。比如:注册,心跳,服务下线等。Application Server:微服务应用服务器,一个单体服务可以按照不同的领域拆分为多个微服务。微服务可以依赖 Eureka 客户端这样就可以很方便的调用 Eureka 服务器上提供的接口进行服务注册与服务发现相关的功能
理解了 Eureka 的整个架构以及每个角色提供的功能,那么下面我们就来分析一下 Eureka Client 的启动流程。
3.1 Eureka Client 启动整体流程
不管 Eureka Server 还是微服务应用里面都会引用并创建 EurekaClient 来访问 Eureka Server,其中 Eureka Server 是配置的其它 Eureka Server 同步服务注册信息。而微服务应用服务器是从 Eureka Server 进行服务的注册与发现。并且微服务会定时向 Eureka Server 发送心跳请求,告诉 Eureka Server 当前应用实例还是存活状态,Eureka Server 在进行服务实例存活判断的时候就不会把当前服务实例剔除。
下面回到我们的主题,就是 Eureka Client 是如何启动的,其实就是如何创建 com.netflix.discovery.EurekaClient。因为 Eureka Client 和 Eureka Server 当中都需要创建 EurekaClient 这个对象实例。我们还是以 Eureka Server 服务中是如何创建 EurekaClient 的逻辑为例来讲解它的启动流程。
EurekaBootStrap#initEurekaServerContext
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
因为 Eureka Server 在启动的时候必然 eurekaClient 对象实例为空,所以就会进到上面的逻辑。其实在 Eureka Server 启动流程分析的时候我们已经大概的分析了这个创建过程。
- 创建
EurekaInstanceConfig对象实例,它主要是一个配置读取接口。在创建EurekaInstanceConfig对象实例的时候,因为我们不是 AWS,所以创建的对象是MyDataCenterInstanceConfig对象实例。在调用MyDataCenterInstanceConfig无参构建器进行初始化的时候首先会调用它的PropertiesInstanceConfig进行初始化,这里它会使用ConfigurationManager加载eureka-client.properties并创建一个DynamicPropertyFactory实例对于读取ConfigurationManager里面管理的配置文件。 - 创建
ApplicationInfoManager对象实例,它主要应用实例管理相关,提供服务注册相关的接口。ApplicationInfoManager对象实例创建本身很简单。主要是通过new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get(),传入服务实例配置信息然后创建当前服务实例对象InstanceInfo。里面的代码其实也是挺简单的,就是通过传入的配置类通过 Builder 模式构建一个InstanceInfo对象 - 创建
EurekaClientConfig对象,它也是一个配置类,上面的EurekaInstanceConfig对象主要是服务实例相关的配置,比如 host、instanceId、port 相关的。而它主要是 EurekaClient 相关的配置,是否从其它 Eureka Server 拉取注册表,是否需要注册到其它 Eureka Server 等待。它也是在自己无参构建器中读取eureka-client.properites文件,创建DynamicPropertyFactory实例用于读取配置文件中的信息;创建EurekaTransportConfig用于 EurekaClient 进行服务的访问。 - 创建
DiscoveryClient对象,它其实是com.netflix.discovery.EurekaClient接口的实例对象。它主要是封装了微服务(或者 Eureka Server 集群环境下) 调用 Eureka Server 的接口,并且内部封装了服务自动注册与发现的功能。
我们首先来看一下 DiscoveryClient 接口的定义,提供了哪些功能:
EurekaClient.java
public interface EurekaClient extends LookupService {
// 获取指定分区下的应用信息
public Applications getApplicationsForARegion(@Nullable String region);
// 获取指定 Eureka Service 中注册的应用信息
public Applications getApplications(String serviceUrl);
// 条件获取服务实例信息
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure);
// 条件获取服务实例信息
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region);
// 条件获取服务实例信息
public List<InstanceInfo> getInstancesByVipAddressAndAppName(String vipAddress, String appName, boolean secure);
// 获取所有的分区信息
public Set<String> getAllKnownRegions();
// 从 Eureka Server 获取当前自身实例状态
public InstanceInfo.InstanceStatus getInstanceRemoteStatus();
// 废弃方法,可以不关注
public List<String> getDiscoveryServiceUrls(String zone);
// 废弃方法,可以不关注
public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone);
// 废弃方法,可以不关注
public List<String> getServiceUrlsFromDNS(String instanceZone, boolean preferSameZone);
// 废弃方法,可以不关注
public void registerHealthCheckCallback(HealthCheckCallback callback);
// 注册心跳检测处理器
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
// 注册事件监听器
public void registerEventListener(EurekaEventListener eventListener);
// 解除事件监听器
public boolean unregisterEventListener(EurekaEventListener eventListener);
// 获取心跳检测处理类
public HealthCheckHandler getHealthCheckHandler();
// 服务下线
public void shutdown();
// 获取 Eureka Client 相关配置
public EurekaClientConfig getEurekaClientConfig();
// 获取服务应用管理器
public ApplicationInfoManager getApplicationInfoManager();
}
public interface LookupService<T> {
// 根据指定 appName 获取当前 Eureka Server 注册的应用信息
Application getApplication(String appName);
// 获取当前 Eureka Server 注册的应用信息
Applications getApplications();
// 根据 ID 获取服务实例信息列表
List<InstanceInfo> getInstancesById(String id);
// 从 Eureka Server 获取下一个可能的服务器来处理来自注册表的请求
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
可以看到 EurekaClient 提供的功能偏向于微服务中依赖的 eureka-client 提供的功能,而 LookupService 比较偏向于 eureka-server 里面的接口,当微服务需要服务发现的时候 Eureka 基本就会调用LookupService 提供的接口。
3.2 Eureka Client 初始化流程
下面就是通过 ApplicationInfoManager 与 EurekaClientConfig 对象创建 DiscoveryClient 的代码。
EurekaBootStrap#initEurekaServerContext
EurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
上面的代码看着平平无奇,实际上内含乾坤。下面就是 DiscoveryClient 初始化的流程图:
归纳重点:
- 创建
scheduler、heartbeatExecutor和cacheRefreshExecutor这三个线程池 - 调用
scheduleServerEndpointTask方法,初始化EurekaTransport对象 - 如果需要抓取注册表
clientConfig.shouldFetchRegistry(),从远程中抓取注册表添加到com.netflix.discovery.DiscoveryClient#localRegionApps属性当中。 - 如果需要注册到 Eureka Server【
clientConfig.shouldRegisterWithEureka()】以及在初始化的时候就进行注册【clientConfig.shouldEnforceRegistrationAtInit()】,就把当前服务注册到 Eureka Server 服务器 通过
initScheduledTasks启动一些定时任务,后面具体分析3.2.1 Eureka Client 初始化中的定时任务
在
Eureka Client初始化中创建scheduler、heartbeatExecutor和cacheRefreshExecutor这三个线程池,然后DiscoveryClient#initScheduledTasks就会通过这些线程池启动一些定时任务。如果需要抓取定时任务【
clientConfig.shouldFetchRegistry()】,通过CacheRefreshThread这个定时任务默认 30 秒去 Eureka Server 抓取最新的服务的注册表并且添加到com.netflix.discovery.DiscoveryClient#localRegionApps属性当中- 如果需要把服务注册到注册中心【
clientConfig.shouldRegisterWithEureka()】,通过HeartbeatThread把心跳发送到注册中心。 - 创建
InstanceInfoReplicator任务类,它主要负责将自身的信息周期性的上报到 Eureka server,会调用它的start进行定时调用,同时如果配置了状态变更需要进行通知【clientConfig.shouldOnDemandUpdateStatusChange()】时,会把创建StatusChangeListener并把这个监听器注册到ApplicationInfoManager当中
3.2.2 Eureka client上报Eureka server的注册过程
这里是23.眼花缭乱的代码中找到eureka-client是如何进行服务注册的

对于 Eureka 而言,微服务的提供者和消费者都是它的客户端,其中服务提供者关注服务注册、服务续约 和 服务下线等功能,而服务消费者关注于 服务信息的获取。下面我们来看一下 Eureka Client 的服务注册流程。
- Eureka Client 注册流程
在 DiscoveryClient 类中,服务注册操作由 register 方法完成。下面我们来看一下这个方法的定义:
DiscoveryClient#register
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
上述 register 方法会在 InstanceInfoReplicator 类的 run 方法中进行执行。从操作流程上讲,上述代码的逻辑非常简单,即服务提供者先将自己注册到 Eureka 服务器中,然后根据返回的结果确定操作是否成功。显然,这里的重点代码是 eurekaTransport.registrationClient.register() ,DiscoveryClient 通过这行代码发起了远程请求。
- Eureka Client Http
Eureka Client 在进行服务实例注册的时候是调用它的内部类 EurekaTransport 的属性 registrationClient 就是 EurekaHttpClient ,其真实的执行的对象是AbstractJersey2EurekaHttpClient
- Eureka Server 服务注册
Eureka Client 在进行微服务注册的时候,其实是通过 EurekaHttpClient 实现类 AbstractJersey2EurekaHttpClient#register 进行服务注册的
//这里就是eureka client注册信息
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
Response response = null;
try {
//发送请求,http://localhost:8080/v2/apps/ServiceA
//发送一个post请求,服务实例对象为一个json格式发送过去,包含主机、ip,端口号
//Eureka server就会知道这个ServiceA服务,有一个实例,比如192.168.xx.xx,host-01,8761
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.accept(MediaType.APPLICATION_JSON)
.acceptEncoding("gzip")
.post(Entity.json(info));
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
在这里它会构建一个类似后面请求地址: http://localhost:8080/v2/apps/APPLICATION0,并且发送一个 post 请求。
- http://localhost:8080 :Eureka Server 服务的域名与端口
- /v2/apps :Eureka Server 处理应用的 uri 地址
- APPLICATION0:微服务的应用名称,要求唯一值
我们在 Eureka Server 引用的 eureka-core 中可以找到 ApplicationsResource#getApplicationResource 来处理注册请求。
3.2.3 eureka server端是如何完成服务注册的
ApplicationsResource#getApplicationResource
@Path("{appId}")
public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
try {
return new ApplicationResource(appId, serverConfig, registry);
} finally {
CurrentRequestVersion.remove();
}
}
之前说过 Eureka Server 中使用 Jeysey Restful 来处理 http 请求,而
com.netflix.eureka.resources包下面的 XXXResource 相当于 Spring MVC 中的 Controller。
上面的类中返回 ApplicationResource 说明真正的处理类是在 ApplicationResource中,并且在客户端发送的是 post 请求。那么我们就应该去ApplicationResource 中找 post 处理的方法。最终会找到 ApplicationResource#addInstance。
而服务的最终注册会调用到 AbstractInstanceRegistry#register,在 Eureka Server 当中它是把微服务的注册信息以数据结构 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 保存到内存当中。所以后续我们会分析集群环境之后 Eureka Server 注册信息的同步以及 Eureka Client 定时同步 Eureka Server 的注册信息到 Eureka Client 的本地。而这种数据结构保存的数据格式如下所示:
{
"APPLICATION0" : {
"i-00000000" : InstanceInfo 对象1,
"i-00000001" : InstanceInfo 对象2
},
"APPLICATION1" : {
"i-00000000" : InstanceInfo 对象1
}
}
APPLICATION0:表示我们微服务的名称,比如:user-service,order-service
-i-00000000:一个微服务可以有多个实例,而i-00000000就是服务实例的 ID,比如user-service用户服务发布了 8 台机器,i-00000000就表示其中一台具体的机器 ID。InstanceInfo 对象:微服务实例信息对象,包含服务名称、服务域名、服务端口、服务分组以及服务的元数据信息等,Eureka 调用者获取到服务实例信息就可以进行远程调用。
关于服务实例信息对象大家可以参看 com.netflix.appinfo.InstanceInfo 这个对象。这个类其实就是一个 POJO 对象,并不复杂。
下面我们来分析一下服务的注册过程,也就是 com.netflix.eureka.registry.AbstractInstanceRegistry#register
AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//TODO-这里eureka-server两级缓存机制中对缓存的信息设置 -- 28
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
read.lock():首先调用读锁,防止注册的时候其它线程调用这个方法出现并发问题。- 根据微服务名称从注册表
registry中获取注册的Map<String, Lease<InstanceInfo>>. - 如果该服务之前没有注册过创建一个
ConcurrentHashMap<String, Lease<InstanceInfo>>()添加到registry注册表当中 - 根据服务实例的 ID 从该服务的注册列表中获取 Lease 对象
- 调用
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);构建一个新的Lease<InstanceInfo>对象,并添加到该服务的注册ConcurrentHashMap<String, Lease<InstanceInfo>>()当中 - 把服务注册的当时时间与服务名+服务应用 ID 添加到 recentRegisteredQueue 当中,把服务应用信息添加到 recentlyChangedQueue 当中(现在可以不考虑这个动作,后续会分析这个队列)
- 清除缓存, Eureka Client 在查询 Eureka Server 的注册表会有一个缓存(现在可以不考虑这个动作,后续会分析这个缓存)
上面就是整个 Eureka Client 的服务注册过程了。
4 Eureka Client 全量拉取注册表
这里是26章的数据
4.1 eureka client 获取全量拉取注册表流程
在微服务中是嵌入了 Eureka Client 的,当服务启动的时候就会从 Eureka Server 中拉取注册的服务信息列表
在微服务中嵌入的 EurekaClient 的实现类 DiscoveryClient 在初始化的时候会通过调用 EurekaHttpClient 去 Eureka Server 拉取全量服务信息列表。
在 DiscoveryClient#initScheduledTasks 会启动定时任务 CacheRefreshThread 增量的拉取 Eureka Server 的注册信息(后续的博客会分析增量拉取注册表)。
在 DiscoveryClient 初始化拉取全量配置中心,然后通过定时任务每 30 秒去 Eureka Server 拉取增量的注册表信息。保证服务调用的是最新服务的注册中心。
4.2 eureka server 处理全量注册表处理分析
当 Eureka Client 初始化时调用 Eureka Client 提供的 http 接口其实是 ApplicationsResource#getContainers
ApplicationsResource#getContainers
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
上面的代码看着挺多,其实逻辑还是挺简单的。
- 如果你没有指定 regions 信息的话,这个值默认会是空字符串,然后它的
EurekaMonitors.GET_ALL监控指标会+1 PeerAwareInstanceRegistry注册中心会判断当前访问是否可以进行,它主要是通过当前系统时间与启动时间的关系或者判断远程配置中心的数据是否可读。具体可参见:PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)- 配置中心中的服务注册信息支持 XML 与 JSON 格式并且可以使用
gzip进行压缩 - 读取的服务注册信息是从
ResponseCache进行读取5 Eureka Server 多级缓存
既然在 Eureka Server 中获取注册信息是从 ResponseCache 进行读取的,那么我们看一下这个缓存是从什么时候进行初始化的
从上面的时序图当中我们可以看到,在 Eureka Server 进行初始化的时候会初始化注册中心PeerAwareInstanceRegistryImpl这个对象会调用它的init方法并最终调用PeerAwareInstanceRegistryImpl#initializedResponseCache初始化注册信息列表的缓存。
PeerAwareInstanceRegistryImpl#initializedResponseCache
@Override
public synchronized void initializedResponseCache() {
if (responseCache == null) {
responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
}
}
下面我们来分析一下 ResponseCacheImpl 的初始化过程:
ResponseCacheImpl
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
- 在 ResponseCacheImpl 初始化的时候通过
ConcurrentHashMap构建一级只读缓存readOnlyCacheMap;通过Guava的LoadingCache构建二级读写缓存。构建 Eureka Server 注册表的多级缓存机制 ConcurrentHashMap构建一级只读缓存readOnlyCacheMap会通过定时任务TimerTask从LoadingCache构建二级读写缓存进行比对更新(每 30 秒执行一次)LoadingCache构建二级读写缓存会在 180 秒钟后就会过期- 在
ResponseCacheImpl中还提供了invalidate方法进行手动过期,当 Eureka Server 发生了服务注册、下线、故障会自动过期该缓存
上面说了 Eureka Server 多级缓存提供了只读缓存、读写缓存以及这两个缓存的过期策略,下面我们来看一下缓存的获取。
ResponseCacheImpl#getValue
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
- 当 Eureka Client 获取注册表的时候,首先会从只读缓存中获取数据
- 如果只读缓存为空就会从读写缓存中获取数据,并把读取到的值添加到只读缓存当中
6.Eureka 之 Eureka Client 续约
6.1 Eureka Client 续约机制
在 Eureka Client 需要定时的向注册中心 Eureka Server 发送续约信息,告诉注册中心当前的微服务处于可用状态,这样注册中心在服务剔除的时候才不会把当前服务实例从注册表中删除。
在 DiscoveryClient 进行初始化的时候,会调用 DiscoveryClient#initScheduledTasks 方法。在这个方向当中不仅会启动定时任务调用 CacheRefreshThread 线程定时从注册中心拉取最新的注册信息;还会启动定时任务调用 HeartbeatThread 向注册中心发送续约信息。
DiscoveryClient#renew
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
return false;
}
}
上面的请求最终会调用到 AbstractJersey2EurekaHttpClient#sendHeartBeat发送类似于:http://localhost:8080/v2/apps/APPLICATION0/i-00000000 这样的 PUT 请求路径到注册中心。
http://localhost:8080:Eureka Server 的请求地址/v2/apps:Restful 处理类ApplicationsResource上面的路径:@Path("/{version}/apps")APPLICATION0:微服务应用的名称,也可以是:user-service,order-service 等i-00000000:微服务应用的名称下具体的服务实例 ID,比如 user-service 下面有 8 台机制实例。i-00000000就是其中一台唯一的 ID 值6.2 Eureka Server 处理续约请求
Eureka Server 处理 Eureka Client 时序图:
首先ApplicationsResource#getApplicationResource会来接收 Eureka Client 发送过来的续约请求,这个接口里面会返回ApplicationResource对象。然后会调用ApplicationResource#getInstanceInfo。因为 Eureka Client 发送的是 PUT 请求,所以请求会转发到InstanceResource标注了@PUT的方法renewLease进行处理。InstanceResource#renewLease
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
- 判断是否来自集群节点(如果是集群节点需要同步到 Eureka Server 集群其它节点)
- 调用 Eureka Server 的注册中心
PeerAwareInstanceRegistry进行续约 - 比较注册中心的服务信息中的注册时间是否与传入时间匹配
- Eureka Server 响应 Eureka Client 续约结果
6.3 注册中心续约服务
在PeerAwareInstanceRegistryImpl#renew进行续约的时候,先会调用AbstractInstanceRegistry#renew在当前 Eureka Server 进行续约,如果是 Eureka Server 集群环境下,会把续约请求发送到其它的 Eureka Server 节点上去。PeerAwareInstanceRegistryImpl#renew
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
下面我们来看一下调用 AbstractInstanceRegistry#renew 进行当前 Eureka Server 节点是如何进行续约的。
AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
其实续约的逻辑非常简单。
- 首先从注册列表中找到当前服务的注册信息列表:
Map<String, Lease<InstanceInfo>> gMap. - 然后从注册信息列表中根据服务信息的 ID 找到服务信息:
Lease<InstanceInfo> leaseToRenew. - 如果找不到服务信息,续约失败,返回 false。
- 获取服务信息在
InstanceStatusOverrideRule的中状态 - 如果状态是
InstanceStatus.UNKNOWN),续约失败,返回 false。 - 如果状态与服务实例中的状态不一致,调用
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus)方法 - 最后调用
com.netflix.eureka.lease.Lease#renew进行服务的续约,其实就是更新 Lease 对象的long lastUpdateTimestamp属性为系统当前时间 + duration 时间(默认 90 秒)。
其实 Eureka Client 进行服务续约就是把服务信息里面的 lastUpdateTimestamp 更新一下,防止 Eureka Server 在进行服务拆除的时候把当前服务摘除。
7 Eureka 之 Eureka Client 服务下线
7.1 Eureka Client 服务下线
当 Eureka Client 服务关闭之前会调用 DiscoveryClient#shutdown 方法。因为这个方法上面标注了 @PreDestroy 在对象销毁之前这个方法就会被调用。
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
}
}
- 首先把 AtomicBoolean 类型的
isShutdown设置成 true。 - 接着会解除掉服务应用管理器(
ApplicationInfoManager) 中注册的状态监听器(StatusChangeListener) - 然后会调用
cancelScheduledTasks释放线程池资源,包括:Eureka Server 间的数据同步线程与线程池、心跳处理的线程池与线程以及注册表定时刷新的线程池与线程。 - 设置当前服务应用的状态为下线,并且在
DiscoveryClient#unregister方法当中调用EurekaHttpClient#cancel进行服务下线 - 释放
EurekaTransport远程调用里面的 HTTP 连接资源 - 最后释放一些监控资源
在调用服务下线的时候请求最终会调用到 AbstractJersey2EurekaHttpClient#cancel发送类似于:http://localhost:8080/v2/apps/APPLICATION0/i-00000000 这样的 PUT 请求路径到注册中心。
http://localhost:8080:Eureka Server 的请求地址/v2/apps:Restful 处理类 ApplicationsResource 上面的路径:@Path("/{version}/apps")APPLICATION0:微服务应用的名称,也可以是:user-service,order-service等i-00000000:微服务应用的名称下具体的服务实例 ID,比如user-service下面有 8 台机制实例。i-00000000就是其中一台唯一的 ID 值7.2 Eureka Server 服务下线
Eureka Server 处理 Eureka Client 服务下线时序图:
首先ApplicationsResource#getApplicationResource会来接收 Eureka Client 发送过来的服务下线请求,这个接口里面会返回ApplicationResource对象。然后会调用ApplicationResource#getInstanceInfo。因为 Eureka Client 发送的是DELETE请求,所以请求会转发到InstanceResource标注了@DELETE的方法 cancelLease 进行处理。
InstanceResource#cancelLease
@DELETE
public Response cancelLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
try {
boolean isSuccess = registry.cancel(app.getName(), id,
"true".equals(isReplication));
if (isSuccess) {
return Response.ok().build();
} else {
return Response.status(Status.NOT_FOUND).build();
}
} catch (Throwable e) {
return Response.serverError().build();
}
}
这个方法里面的逻辑非常简单:
- 调用注册中心的服务下线功能
- 然后把服务下线的响应结果返回给 Eureka Client
7.3 注册中心下线服务
下面我们来看一下注册中心服务在服务下线做了哪些事情。下面是 Eureka Server 服务下线的时序图:
上面的时序图有两点比较核心:
- 调用当前 Eureka Server 的
AbstractInstanceRegistry#internalCancel方法进行服务下线 - 如果是集群环境调用
PeerAwareInstanceRegistryImpl#replicateToPeers把当前的服务实例的下线同步到其它 Eureka Servier 当中去(后续分析)。
那我们就来分析一下 AbstractInstanceRegistry#internalCancel 方法是如何进行服务下线的。
AbstractInstanceRegistry#internalCancel
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
read.unlock();
}
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
EurekaMonitors#CANCEL服务取消监控进行 +1- 根据微服务的名称获取到当前服务的注册列表
- 如果当前服务的注册列表不为空,根据传入的服务实例 ID 把它从注册列表当中移除
- 向
CircularQueue<Pair<Long, String>> recentCanceledQueue列表当中添加当前服务实例信息(注:这个队列信息暂时没有用到) - 如果移除的服务实例为空,
EurekaMonitors#CANCEL_NOT_FOUND+1 并返回 false. - 调用
Lease<InstanceInfo>#cancel进行下线,其实就是修改 Lease 的 registrationTimestamp 属性为当前系统时间 - 设置当前信息为删除状态,并添加到
ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue当中。(注:这个队列会在 Eureka Client 增量拉取注册表时使用)
其实 Eureka Client 服务下线就是在注册服务中的 Map 当中把当前服务实例的信息删除就行了。
8 Eureka 之 Eureka Server 服务过期
正常情况下,应用实例下线时候会主动向 Eureka-Server 发起下线请求,但实际情况下,应用实例可能异常崩溃,又或者是网络异常等原因,导致下线请求无法被成功提交。
这种情况之后,需要 Eureka Client 定时向 Eureka Server 发送续约配合 Eureka Client 通过定时任务清理超时的租约解决上述异常。
8.1 EvictionTask
EvictionTask 是清理租约过期任务,下面是它的调用时序图:
当 Eureka Server 启动的时候就会调用实现了 Java Servlet 规范 ServletContextListener 监听器的 EurekaBootStrap 到调用初始化注册服务 AbstractInstanceRegistry。然后会启用以下几个定时任务:
MeasuredRate,统计定时任务,在AbstractInstanceRegistry的构建器创建MeasuredRate对象的时候传入1000 * 60 * 1,然后在这里调用它的start方法里面有一个定时任务,每隔 60 秒也就是每隔 1 分钟执行一次。这个定时任务里面有 2 个 AtomicLong 类型的参数。一个是AtomicLong currentBucket每进行一次续约的时候就会调用它 + 1,另一个是AtomicLong lastBucket当MeasuredRate任务每分钟进行执行的时候就会把AtomicLong currentBucket里面的值设置到AtomicLong lastBucket当中去,然后把AtomicLong currentBucket值清空。然后通过获取AtomicLong lastBucket的值就能够得到最近一分钟续约的次数(最近一分钟续约的次数这个在后面要讲的 Eureka Server 自我保护机制当中会使用到)- 另外一个就是定时调用 EvictionTask 任务,过期注册表中超时续约的应用。默认 60 秒执行一次
8.2 服务过期
下面我们来分析一下服务过期的逻辑:EvictionTask
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
long getCurrentTimeNano() { // for testing
return System.nanoTime();
}
}
compute a compensation time defined as the actual time this task was executed since the prev iteration, vs the configured amount of time for execution. This is useful for cases where changes in time (due to clock skew or gc for example) causes the actual eviction task to execute later than the desired time according to the configured cycle.
首先它会调用 getCompensationTimeMs 方法计算一个补偿时间,由于 JVM GC ,又或是时间偏移( clock skew ) 等原因,定时器执行实际比预期会略有延迟。
传入计算获取到的补偿时间,调用 evict 方法过期超时续约的应用。
AbstractInstanceRegistry#evict
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
- 首先会判断 Eureka Server 是否开启的自我保护机制,如果开启了自我保护机制(下一篇博客会讲)
- 首先遍历注册表注册的应用列表,找到所有续约过期的应用实例列表。过期判断条件是:当前时间 > 最后一次续约时间 + 90 秒 + 补偿时间。
- 然后调用
getLocalRegistrySize获取当前注册列表中应用实例的个数 - 接着通过下面的公式获取一个期望过期的个数 evictionLimit。
evictionLimit = 注册列表所有应用实例的个数 - 注册列表所有应用实例的个数 * 获得续订的最小百分比(默认 0.85)。也就是:期望过期的个数 = 应用实例 * 0.15。如果有20 个服务应用那么这个值就是 3. - 然后取第一步获取到续约过期的应用实例个数与期望过期的应用实例个数的最小值
- 如果这个最小值大于 0,就随机过期续约过期的应用实例列表里面的应用实例并且调用内部取消逻辑(之前服务下线分析过)
如果 Eureka Client 默认会每 30 秒发送心跳到 Eureka Server。如果 Eureka Server 在进行服务过期判断的时候也就是:最后一次续约时间 + 90 秒 + 补偿时间小于系统当前时间就会以服务进行过期。
有以下两点需要注意一下:
- Eureka Client 在进行续约的时候是把
lastUpdateTimestamp设置成系统的当前时间 + 90 秒。然后 Eureka Server 在进行服务过期判断的时候是:最后一次续约时间 + 90 秒 + 补偿时间小于系统当前时间就会以服务进行过期。最后在 Eureka Client 拉取注册表是每 30 秒进行拉取。所以一个服务不可用时其它服务需要感知到这个服务不可用就需要:90 秒 + 90 秒 + 30 秒 = 210秒,也就是 3 分半 才能感知到 Eureka Server 在服务过期的时候会根据:应用实例 * 0.15 与 心跳过期应用个数随机顺序将它们剔除出去。因为对于需要大量剔除服务应用时,如果我们不这样做,我们可能会在 Eureka 自我保护开始之前清除整个应用程序。通过随机化,影响应该均匀分布在所有应用程序中。
9 Eureka 之 自我保护机制
9.1 什么是自我保护机制
默认情况下,如果Eureka Server在一定时间内(默认 90 秒,其实不止 90 秒)没有接收到某个微服务实例的心跳,Eureka Server将会移除该实例。但是当网络分区故障发生时,微服务与Eureka Server之间无法正常通信,而微服务本身是正常运行的,此时不应该移除这个微服务,所以引入了自我保护机制。
自我保护机制的工作机制是:如果在15分钟内超过 85% 的客户端节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,Eureka Server 自动进入自我保护机制,此时会出现以下几种情况:Eureka Server不再从注册列表中移除因为长时间没收到心跳而应该过期的服务。
- Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可用。
- 当网络稳定时,当前Eureka Server新的注册信息会被同步到其它节点中。
因此Eureka Server可以很好的应对因网络故障导致部分节点失联的情况,而不会像 ZK 那样如果有一半不可用的情况会导致整个集群不可用而变成瘫痪。
Eureka Server 自我保护机制,可以通过通过配置 eureka.server.enable-self-preservation 来 true 开启/ false 禁用 自我保护机制,默认打开状态,建议生产环境打开此配置。
9-2 Eureka Server 自我保护机制
上面【 8. Eureka 之 Eureka Server 服务过期 】,在进行服务过期的时候,首先会判断 Eureka Server 是否开启了自我保护机制。
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
......
}
如果开启了自我保护机制也就是 isLeaseExpirationEnabled() 方法返回了 false,就直接返回,不进行服务下线
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
- 首先判断 Eureka Server 是否开启了自我保护机制 eureka.enableSelfPreservation 为 true 开启反之不开启。如果没有开启直接返回 true,可以进行服务过期处理。
- 然后判断每分钟期望的续约数(numberOfRenewsPerMinThreshold) 大于 0 并且实际每分钟的续约数(getNumOfRenewsInLastMin()) 大于每分钟期望的续约数(numberOfRenewsPerMinThreshold)
9-3.每分钟应用的续约数
在 Eureka Server 启动的时候,在注册服务(AbstractInstanceRegistry)中会启动一个定时任务MeasuredRate来计算每分钟应用续约的个数。时序图如下:
MeasuredRate,它是一个统计定时任务,在AbstractInstanceRegistry的构建器创建MeasuredRate对象的时候传入1000 * 60 * 1,然后在这里调用它的start方法里面有一个定时任务,每隔 60 秒也就是每隔 1 分钟执行一次。这个定时任务里面有 2 个AtomicLong 类型的参数。一个是AtomicLong currentBucket每进行一次续约的时候就会调用它 + 1,另一个是AtomicLong lastBucket。
当 MeasuredRate 任务每分钟进行执行的时候就会把 AtomicLong currentBucket 里面的值设置到 AtomicLong lastBucket 当中去,然后把 AtomicLong currentBucket 值清空再次计算。然后通过获取 AtomicLong lastBucket 的值就能够得到最近一分钟续约的次数。
9-4.每分钟期望的续约数
每分钟期望的续约数是 AbstractInstanceRegistry#numberOfRenewsPerMinThreshold , 这个值是动态变化的。它提供了 AbstractInstanceRegistry#updateRenewsPerMinThreshold 来动态的更新这个值
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
每分钟期望的续约数是根据 expectedNumberOfClientsSendingRenews (期望 Eureka Client 发送的续约数,这个值会根据服务的动作进行更新:服务注册 + 1与服务下线 - 1) 来进行判断的。上面的公式如下:
每分钟期望的续约数 = 期望Eureka Client发送的续约数 * (60 秒 / 预计客户端间隔秒数续约[默认 30 秒]) * 0.85
比如:现在注册中心有 20 个服务
那么:每分钟期望的续约数 = 20 * (60 / 30) * 0.85 = 17
AbstractInstanceRegistry#register方法public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { ...... synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } ...... }服务注册,
**expectedNumberOfClientsSendingRenews**(期望Eureka Client发送的续约数) + 1,并且更新每分钟期望的续约数AbstractInstanceRegistry#internalCancel方法 ```java protected boolean internalCancel(String appName, String id, boolean isReplication) {…….
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); }}
….
}
**服务下线, **`**expectedNumberOfClientsSendingRenews**`**(期望Eureka Client发送的续约数) - 1**,并且更新每分钟期望的续约数
<a name="gilJ3"></a>
## 10.Eureka 之 Eureka Server 覆盖状态
<a name="xPOQw"></a>
### 10-1.Eureka Server 覆盖状态概述
在 `InstanceInfo` 服务应用信息对象里面不仅有状态`status` ,还有覆盖状态 `overriddenStatus`
```java
public class InstanceInfo {
// 服务应用信息状态
private volatile InstanceStatus status = InstanceStatus.UP;
// 服务应用信息覆盖状态
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
}
可以调用 Eureka-Server 暴露的 HTTP Restful 接口 apps/${APP_NAME}/${INSTANCE_ID}/status 对服务应用实例覆盖状态的变更。从而达到主动的、强制的变更应用实例状态。
这里不会修改 Eureka-Client 应用实例的状态,而是修改在 Eureka-Server 注册的应用实例的状态。
通过这样的方式,Eureka-Client 在获取到注册信息时,并且配置 eureka.shouldFilterOnlyUpInstances 为 true(默认为 true),过滤掉非 InstanceStatus.UP 的应用实例,从而避免调动该实例,以达到应用实例的暂停服务( InstanceStatus.OUT_OF_SERVICE ),而无需关闭应用实例。
因此,大多数情况下,调用该接口的目的,将应用实例状态在 (InstanceStatus.UP ) 和 (InstanceStatus.OUT_OF_SERVICE) 之间切换。
10-2.Eureka Server 覆盖状态操作
在 Eureka Server 中的服务应用处理类 InstanceResource 提供了 3 个接口来修改服务实例中的 overriddenStatus
renewLease: 心跳接口,可以传入overriddenStatus字段来修改服务信息(InstanceInfo) 的覆盖状态statusUpdate:覆盖状态修改接口,可以传入value字段来修改服务信息(InstanceInfo) 的覆盖状态deleteStatusUpdate:删除覆盖状态接口,可以传入value字段来修改服务信息(InstanceInfo) 的覆盖状态续约修改覆盖状态
可以调用InstanceResource#renewLease这个续约接口可以修改服务应用信息的覆盖状态值,它的时序图如下所示:
在续约成功之后,如果传递的 lastDirtyTimestamp字段不为空并且根据时间戳验证服务应用信息状态为Response.Status.NOT_FOUND。就会从ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap这个 Map 根据应用 ID 获取这个应用服务的实例之前是否进行了状态覆盖,如果没有,就会往ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap这个 Map 里面添加服务 ID 的以及对应的覆盖状态并且设置应用信息 InstanceInfo 的覆盖状态在进行续约的时候如果实例的状态与覆盖状态不相等就会把状态设置成覆盖状态
public boolean renew(String appName, String id, boolean isReplication) {
......
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
......
}
覆盖状态修改接口
可以调用 InstanceResource#statusUpdate 这个接口来修改服务应用信息的覆盖状态值,它的时序图如下所示:
- 首先它会根据服务找到对应服务的实例列表,然后根据服务实例的 ID 找到具体的服务实例
- 接着会调用服务实例信息的续约接口
Lease#renew修改最近一次修改时间戳lastUpdateTimestamp为System.currentTimeMillis() + duration(默认90秒); - 如果服务实例的状态与覆盖状态相等就直接返回,否则进入下一个步骤
- 在
ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap添加服务实例的 ID 与覆盖状态的映射 - 修改服务应用信息的覆盖状态为传入的状态
- 往修改队列
recentlyChangedQueue里面添加当前服务应用信息的操作状态为ActionType.MODIFIED -
应用实例覆盖状态映射
在之前的分析中我们可以看到覆盖状态操作其实包含以下两个步骤:
操作注册服务
AbstractInstanceRegistry对象中的ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap服务实例 ID 与覆盖状态的映射- 把服务实例里面的覆盖状态
overriddenStatus设置为传入的状态
上面讲过在在服务实例进行续约的时候就会从 overriddenInstanceStatusMap根据服务实例 ID 获取到覆盖状态把覆盖状态更新到当前服务实例当中。其实对于服务注册也有同样的逻辑:
AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
......
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
......
}
- 从
ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap根据服务实例 ID 获取到覆盖状态 - 如果覆盖状态不为空就调用
registrant.setOverriddenStatus(overriddenStatusFromMap)把覆盖状态设置到实例信息InstanceInfo的overriddenStatus覆盖状态当中 - 接着调用
AbstractInstanceRegistry#getOverriddenInstanceStatus这个方法根据InstanceStatusOverrideRule这个实例覆盖状态规则的实现类获取覆盖状态 - 如果覆盖状态的值与实例信息
InstanceInfo registrant里面状态(status)的值不一致就把覆盖状态的值设置为实例信息InstanceInfo registrant的状态值AbstractInstanceRegistry#getOverriddenInstanceStatus
protected InstanceInfo.InstanceStatus getOverriddenInstanceStatus(InstanceInfo r,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
InstanceStatusOverrideRule rule = getInstanceInfoOverrideRule();
logger.debug("Processing override status using rule: {}", rule);
return rule.apply(r, existingLease, isReplication).status();
}
protected abstract InstanceStatusOverrideRule getInstanceInfoOverrideRule();
在方法 AbstractInstanceRegistry#getOverriddenInstanceStatus 中会获取 InstanceStatusOverrideRule 应用覆盖状态的规则。这个规则接口的类结构如下图所示:
StatusOverrideResult:是InstanceStatusOverrideRule定义的接口返回值里面有两个字段:match用于判断是否规则匹配,实例状态InstanceStatus用于调用registrant.setStatusWithoutDirty(overriddenInstanceStatus)设置实例的状态 (status)AsgEnabledRule:是 AWS 环境才会使用这时我们不做讨论AlwaysMatchInstanceStatusRule.java
public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(AlwaysMatchInstanceStatusRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
@Override
public String toString() {
return AlwaysMatchInstanceStatusRule.class.getName();
}
}
任何情况的匹配,直接返回 instanceInfo 的状态
DownOrStartingRule.java
public class DownOrStartingRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
// The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
// the client says. The same is the case with replica as well.
// The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
// currently in SERVICE
if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
&& (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
logger.debug("Trusting the instance status {} from replica or instance for instance {}",
instanceInfo.getStatus(), instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return DownOrStartingRule.class.getName();
}
}
上面的逻辑是:服务实例信息的状态既不等于 InstanceStatus.UP 也不等于InstanceStatus.OUT_OF_SERVICE ,就直接返回服务实例信息里面的状态。
代码注释:ReplicationInstance is DOWN or STARTING -相信这个,但是当实例说UP的时候,质疑它客户端实例发送 STARTING或 DOWN``(因为心跳失败),然后我们接受什么客户端说。replica也是如此。客户端或副本的OUT_OF_SERVICE也需要确认,因为服务可能是当前服务。
FirstMatchWinsCompositeRule.java
public class FirstMatchWinsCompositeRule implements InstanceStatusOverrideRule {
private final InstanceStatusOverrideRule[] rules;
private final InstanceStatusOverrideRule defaultRule;
private final String compositeRuleName;
public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
this.rules = rules;
this.defaultRule = new AlwaysMatchInstanceStatusRule();
// Let's build up and "cache" the rule name to be used by toString();
List<String> ruleNames = new ArrayList<>(rules.length+1);
for (int i = 0; i < rules.length; ++i) {
ruleNames.add(rules[i].toString());
}
ruleNames.add(defaultRule.toString());
compositeRuleName = ruleNames.toString();
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
for (int i = 0; i < this.rules.length; ++i) {
StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
if (result.matches()) {
return result;
}
}
return defaultRule.apply(instanceInfo, existingLease, isReplication);
}
@Override
public String toString() {
return this.compositeRuleName;
}
}
它的逻辑是里面有规则列表 InstanceStatusOverrideRule[] rules 同时也有默认规则 InstanceStatusOverrideRule defaultRule 首先遍历规则列表,如果有一个匹配就直接返回。反之就使用默认的规则进行匹配
OverrideExistsRule.java
public class OverrideExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(OverrideExistsRule.class);
private Map<String, InstanceInfo.InstanceStatus> statusOverrides;
public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
this.statusOverrides = statusOverrides;
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
// If there are instance specific overrides, then they win - otherwise the ASG status
if (overridden != null) {
logger.debug("The instance specific override for instance {} and the value is {}",
instanceInfo.getId(), overridden.name());
return StatusOverrideResult.matchingStatus(overridden);
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return OverrideExistsRule.class.getName();
}
}
它的逻辑是:从服务实例信息 ID 与覆盖状态的映射 Map<String, InstanceInfo.InstanceStatus> statusOverrides 中根据服务实例信息 ID 获取,如果能够获取就返回覆盖状态的状态值,反之就不匹配。
LeaseExistsRule.java
public class LeaseExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// This is for backward compatibility until all applications have ASG
// names, otherwise while starting up
// the client status may override status replicated from other servers
if (!isReplication) {
InstanceInfo.InstanceStatus existingStatus = null;
if (existingLease != null) {
existingStatus = existingLease.getHolder().getStatus();
}
// Allow server to have its way when the status is UP or OUT_OF_SERVICE
if ((existingStatus != null)
&& (InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus)
|| InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
logger.debug("There is already an existing lease with status {} for instance {}",
existingLease.getHolder().getStatus().name(),
existingLease.getHolder().getId());
return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
}
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return LeaseExistsRule.class.getName();
}
}
- 如果是集群环境下直接返回
StatusOverrideResult.NO_MATCH不匹配。 - 如果实例状态
Lease<InstanceInfo> existingLease中的实例信息是InstanceStatus.OUT_OF_SERVICE或者InstanceStatus.UP直接返回Lease<InstanceInfo> existingLease实例信息里面的状态。
我们将 PeerAwareInstanceRegistryImpl 的应用实例覆盖状态规则进行梳理一下:
| 状态 | DownOrStartingRule | OverrideExistsRule | LeaseExistsRule | AlwaysMatchInstanceStatusRule |
|---|---|---|---|---|
| instanceInfo | STARTING | * 全部状态 | ||
| existingLease | UP or OUT_OF_SERVER(非 Eureka Server 请求) | |||
| statusOverrides | * 全部状态 |
- 应用实例状态是最重要的属性,没有之一,因而在最终实例状态的计算,以可信赖为主。
- DownOrStartingRule ,instanceInfo处于 STARTING或者 DOWN 状态,应用实例可能不适合提供服务( 被请求 ),考虑可信赖,返回 instanceInfo 的状态。
- OverrideExistsRule ,当存在覆盖状态( statusoverrides ) ,使用该状态,比较好理解。
- LeaseExistsRule ,来自 Eureka-Client 的请求( 非 Eureka-Server 集群请求),当 Eureka-Server 的实例状态存在,并且处于 UP 或则 OUT_OF_SERVICE ,保留当前状态。原因,禁止 Eureka-Client主动在这两个状态之间切换。如果要切换,使用应用实例覆盖状态变更与删除接口。
- AlwaysMatchInstanceStatusRule ,使用 instanceInfo 的状态返回,以保证能匹配到状态。
从上面我们可以看到 Eureka Server 是从 AbstractInstanceRegistry#getOverriddenInstanceStatus 获取应用覆盖状态规则 InstanceStatusOverrideRule 对应的实例对象。而 Eureka Server 的注册服务实例是 PeerAwareInstanceRegistryImpl ,在它的初始化过程当中它的构建方法会初始化 InstanceStatusOverrideRule 对象。
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private final InstanceStatusOverrideRule instanceStatusOverrideRule;
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
......
它使用的是 FirstMatchWinsCompositeRule 这个规则类:它的逻辑是里面有规则列表 InstanceStatusOverrideRule[] rules 同时也有默认规则 InstanceStatusOverrideRule defaultRule 首先遍历规则列表,如果有一个匹配就直接返回。反之就使用默认的规则进行匹配。然后传入的规则依次是:DownOrStartingRule 、 OverrideExistsRule 和 LeaseExistsRule。
11.Euraka 之 Euraka Client 增量拉取注册表
在【5.Eureka Server 多级缓存】中已经详细说明了在 Eureka Client 启动的时候会去 Eureka Server 里面全量的拉取一下注册到里面的服务信息列表。并且 Eureka Server 里面是使用了多级缓存来保存全量的注册信息列表。
Eureka Client 在启动时获取 Eureka Server 中的注册信息,但是作为分布式环境服务随时时可进行上线、下线操作。所以作为注册中心它需要有服务自动上下线发现的功能。
Eureka Client 在初始化获取了全量的注册信息之后,它会启动定时任务去增量的拉取注册中心当的新的服务列表信息。
Eureka Client 增量拉取注册表的时序图如下:
EurekaClient 在初始化的时候会初始化一序列定时任务,其中一个就是定时的从 Eureka Server 增量的获取服务注册信息。它调用的 http 请求的路径类似于:http://localhost:8080/v2/apps/delta 并且发送的是 get 请求。
http://localhost:8080:Eureka Server 的接收请求的地址/v2/apps:Restful 处理类 ApplicationsResource 上面的路径:@Path(“/{version}/apps”)delta:ApplicationsResource这个处理类中的getContainerDifferential方法用于处理 Eureka Client 的堷拉取注册列表信息。
我们先来分析一下 Eureka Server 中是如果来处理 Eureka Client 发送来的增量拉取注册列表的请求。先来分析一下 Eureka Server 的返回值。然后再来分析一下 Eureka Client 是如何处理从 Eureka Client 的增量服务注册列表的。
