1.阅读Eureka应该由该地方渐入式阅读

单元测试、集成测试的代码作为入口,来打断点,跟进去调试

1.1 测试代码阅读相关类

  1. 阅读在eureka-server项目下的test模块下的 com.netflix.eureka.resources.EurekaClientServerRestIntegrationTest :在这个测试里,他会将eureka注册中心启动起来,然后模拟eureka客户端(服务)去发送各种请求到eureka注册中心,去测试各种功能
  2. 在eureka-examples中对 com.netflix.eureka.ExampleEurekaClient 中对 main() 添加: ```java // EurekaClientServerRestIntegrationTest 内方法 private static void injectEurekaConfiguration() throws UnknownHostException {

    1. String myHostName = InetAddress.getLocalHost().getHostName();
    2. String myServiceUrl = "http://" + myHostName + ":8080/v2/";
    3. System.setProperty("eureka.region", "default");
    4. System.setProperty("eureka.name", "eureka");
    5. System.setProperty("eureka.vipAddress", "eureka.mydomain.net");
    6. System.setProperty("eureka.port", "8080");
    7. System.setProperty("eureka.preferSameZone", "false");
    8. System.setProperty("eureka.shouldUseDns", "false");
    9. System.setProperty("eureka.shouldFetchRegistry", "false");
    10. System.setProperty("eureka.serviceUrl.defaultZone", myServiceUrl);
    11. System.setProperty("eureka.serviceUrl.default.defaultZone", myServiceUrl);
    12. System.setProperty("eureka.awsAccessId", "fake_aws_access_id");
    13. System.setProperty("eureka.awsSecretKey", "fake_aws_secret_key");
    14. System.setProperty("eureka.numberRegistrySyncRetries", "0");

    }

public static void main(String[] args) throws UnknownHostException { //添加Eureka配置相关配置 injectEurekaConfiguration();

    ExampleEurekaClient sampleClient = new ExampleEurekaClient();

    // create the client
    ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
    EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());

    // use the client
    sampleClient.sendRequestToServiceUsingEureka(client);

    // shutdown the client
    eurekaClient.shutdown();
}
> 本章由spring cloud 的 15章开始记录

<a name="u8OWH"></a>
### 1.2  eureka-server模块中文件分析
<a name="9pb0q"></a>
#### 1.2.1 eureka-server 中 build.gradle分析
```bash
eureka-server在该模块的build.gradle中有如下依赖:
    compile project(':eureka-client') //eureka-server也是一个eureka-client
  compile project(':eureka-core')//扮演了核心的注册中心的角色,接收别人的服务注册请求,提供服务发现的功能,保持心跳(续约请求),摘除故障服务实例。eureka server依赖eureka core的,基于eureka core的功能对外暴露接口,提供注册中心的功能
  compile "com.sun.jersey:jersey-server:$jerseyVersion" 
  compile "com.sun.jersey:jersey-servlet:$jerseyVersion"//eureka server依赖jersey框架,类比于spring web mvc框架,支持mvc模式,支持restful http请求
  (eureka client和eureka server之间进行通信,都是基于jersey框架实现http restful接口请求和调用的)

  testCompile "org.mockito:mockito-core:${mockitoVersion}" //mock测试框架,用于单元测试
  testCompile "org.eclipse.jetty:jetty-server:$jetty_version"
  testCompile "org.eclipse.jetty:jetty-webapp:$jetty_version"//测试时候,是会基于jetty直接将eureka server作为一个web应用给跑起来,jetty作为web容器,跑起来eureka server这个web应用

1.2.2 eureka-server 中 web.xml分析

eureka-server在该模块中的${eureka-server}\src\main\webapp\WEB-INF\web.xml 
<!-- 负责eureka-server的初始化,在eureka-core里  -->
<filter>
  <filter-name>statusFilter</filter-name>
  <filter-class>com.netflix.eureka.StatusFilter</filter-class> 
</filter>
<!-- 负责状态相关的处理逻辑 -->
<filter>
  <filter-name>statusFilter</filter-name>
  <filter-class>com.netflix.eureka.StatusFilter</filter-class>
</filter>
<!-- 对请求进行授权认证的处理的 -->
<filter>
  <filter-name>requestAuthFilter</filter-name>
  <filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
</filter>
<!-- 负责限流相关的逻辑的 -->
<filter>
  <filter-name>rateLimitingFilter</filter-name>
  <filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
</filter>
<!-- 压缩相关 -->
<filter>
  <filter-name>gzipEncodingEnforcingFilter</filter-name>
  <filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
</filter>

<!--jersey框架的一个ServletContainer的一个filter-->
<filter>
  <filter-name>jersey</filter-name>
  <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
  <init-param>
    <param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
    <param-value>/(flex|images|js|css|jsp)/.*</param-value>
  </init-param>
  <init-param>
    <param-name>com.sun.jersey.config.property.packages</param-name>
    <param-value>com.sun.jersey;com.netflix</param-value>
  </init-param>

  <!-- GZIP content encoding/decoding -->
  <init-param>
    <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
    <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
  </init-param>
  <init-param>
    <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
    <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
  </init-param>
</filter>

<!-- statusFilter、requestAuthFilter就是通用的处理逻辑,是对所有的请求都开放的 -->
<filter-mapping>
  <filter-name>statusFilter</filter-name>
  <url-pattern>/*</url-pattern>
</filter-mapping>
<filter-mapping>
  <filter-name>requestAuthFilter</filter-name>
  <url-pattern>/*</url-pattern>
</filter-mapping>

<!-- rateLimitingFilter 默认是不开启的,如果你要打开eureka-server内置的限流功能,
你需要自己把RateLimitingFilter的<filter-mapping>的注释打开,让这个filter生效
<filter-mapping>
  <filter-name>rateLimitingFilter</filter-name>
  <url-pattern>/v2/apps</url-pattern>
  <url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>
-->

<!-- /v2/apps相关的请求,会走这里,仅仅对部分特殊的请求生效 -->
<filter-mapping>
  <filter-name>gzipEncodingEnforcingFilter</filter-name>
  <url-pattern>/v2/apps</url-pattern>
  <url-pattern>/v2/apps/*</url-pattern>
</filter-mapping>

<!-- jersey核心filter,是拦截所有的请求的。 -->
<filter-mapping>
  <filter-name>jersey</filter-name>
  <url-pattern>/*</url-pattern>
</filter-mapping>
# ps: 请求被jersey拦截之后,就是登录erurka-server服务器地址,
# 而在该控制面板就是在eureka-resources模块的webapp文件夹内,页面配置文件就是一个jsp,
# 当中status.jsp是欢迎页面首页,eureka-server的控制台页面,展示注册服务的信息

2. eureka-core源码阅读[eureka server 如何启动的]

web容器(tomcat还是jetty)启动的时候,把eureka-server作为一个web应用给带起来的时候,eureka-server的初始化的逻辑,是透过监听器拦截,核心在EurekaBootStrap(eureka-server,监听器通过执行初始化方法contextInitialized()方法启动初始化eureka-server一个入口

2.1 EurekaBootStrap#contextInitialized()方法理解

该方法为eureka-server启动的入口,源码如下:

@Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            //配置eureka配置环境
            initEurekaEnvironment();
            //
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

2.1.1分析initEurekaEnvironment()方法执行流程

  • 分析源码:

initEurekaEnvironment方法源码执行流程图.png
归纳:

  1. 创建了一个ConcurrentCompositeConfiguration实例, 它就是一个所谓的配置器,包括eureka所需要的配置信息。
  2. 上面的ConcurrentCompositeConfiguration实例加入了一堆别的config,然后搞完了以后,就直接返回了这个实例,就是作为配置的单例
  3. 初始化数据中心的配置,如果没有配置的话,就是DEFAULT data center
  4. 初始化eurueka运行的环境,如果你没有配置的话,默认就给你设置为test环境
  5. initEurekaEnvironment的初始化环境的逻辑完毕

    2.1.2分析 initEurekaServerContext()方法执行流程

    进入EurekaBootStrap#initEurekaServerContext()方法如下:

    protected void initEurekaServerContext() throws Exception {
     //1.第一步.加载eureka-server.properties文件中的配置
     EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
    
     //第二步,初始化ApplicationInfoManager
     ApplicationInfoManager applicationInfoManager = null;
     //第三步,初始化eureka-server内部的一个eureka-client(用来跟其他的eureka-server节点进行注册和通信的)
     if (eurekaClient == null) {
         //Eureka服务器注册所需的配置信息instanceConfig的实现类模式是MyDataCenterInstanceConfig
         EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
             ? new CloudInstanceConfig()
             : new MyDataCenterInstanceConfig();
    
         applicationInfoManager = new ApplicationInfoManager(
             //get()方法构建了instanceInfo
             instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
    
         EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
         //初始化一个eureka clinet(默认实现是DiscoveryClient)
         eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
     } else {
         applicationInfoManager = eurekaClient.getApplicationInfoManager();
     }
    
     //第四部,处理注册相关事情
     PeerAwareInstanceRegistry registry;
     if (isAws(applicationInfoManager.getInfo())) {
         registry = new AwsInstanceRegistry(
             eurekaServerConfig,
             eurekaClient.getEurekaClientConfig(),
             serverCodecs,
             eurekaClient
         );
         awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
         awsBinder.start();
     } else {
         //ps:非AWS的通常走这里!(默认走这里)
         registry = new PeerAwareInstanceRegistryImpl(
             eurekaServerConfig,
             eurekaClient.getEurekaClientConfig(),
             serverCodecs,
             eurekaClient
         );
     }
    
     //第五部,处理Peer节点相关事情
     PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
         registry,
         eurekaServerConfig,
         eurekaClient.getEurekaClientConfig(),
         serverCodecs,
         applicationInfoManager
     );
    
     //第六部,完成eureka-server的上下文(context)构建
     serverContext = new DefaultEurekaServerContext(
         eurekaServerConfig,
         serverCodecs,
         registry,
         peerEurekaNodes,
         applicationInfoManager
     );
    
     serverContext.initialize();
    
     // 第七部,处理一点善后的事情,从想邻的eureka节点表拷贝注册信息
     int registryCount = registry.syncUp();
     registry.openForTraffic(applicationInfoManager, registryCount);
    
     //第八部,处理一点善后的事情,注册所有监控
     EurekaMonitors.registerAllStats();
    }
    
  • 源码分析

    1、解析 eureka-server.properties

EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

DefaultEurekaServerConfig 实现了接口 EurekaServerConfigEurekaServerConfig主要是 Eureka Server 逻辑中需要使用的一些变量,它主要包含。上面的代码的逻辑就是通过它的构建器调用 init() 方法。然后调用 ConfigurationManager#loadCascadedPropertiesFromResources 方法法 eureka-server.properties 加载到配置管理器 ConfigurationManager 当中。 ConfigurationManager 这个类的作用就是用来获取 Eureka 服务的配置信息

2、解析 eureka-client.properties,创建 EurekaInstanceConfig 对象

当 Eureka Server 进行启动的时候,eurekaClient 必然是空,而且我们的服务一般也不是部署在 AWS 上面,所以会调用 new MyDataCenterInstanceConfig()

ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        //默认基本走这里
        : new MyDataCenterInstanceConfig();

    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}

MyDataCenterInstanceConfig 的类继承结构如下:
image.png
MyDataCenterInstanceConfig 实现了接口 EurekaInstanceConfig 接口。上面已经说过 Eureka Server 在集群环境下,它其实本身也是一个 Eureka Client也可以认为是一个服务实例 ,EurekaInstanceConfig 这个接口它主要是对于服务实例信息的一个抽象。
在初始化 MyDataCenterInstanceConfig的时候会调用 MyDataCenterInstanceConfig 的无参构建器,初始化子类之前会调用它的父类 PropertiesInstanceConfig 的无参构建器。它会通过Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME)eureka-client.properties 加载解析到 ConfigurationManager 当中,并返回一个 DynamicPropertyFactory 实例对象用于获取配置信息。

3、创建应用信息管理器

ApplicationInfoManager applicationInfoManager = new ApplicationInfoManager(
                                    //get()方法构建了instanceInfo
                    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

在创建创建应用信息管理器信息的时候会把第二步创建好的 EurekaInstanceConfig 与 通过 EurekaConfigBasedInstanceInfoProvider#get 创建的 InstanceInfo 信息用来创建应用信息管理器。首先我们来分析一下 InstanceInfo 是如何进行创建的。通过 EurekaConfigBasedInstanceInfoProvider 的构造器传入从 eureka-client.properties 解析来的 Eureka Client 的配置信息。然后通过 EurekaConfigBasedInstanceInfoProvider#get 通过构建器 InstanceInfo.Builder 进行对象构建。

4、解析 eureka-client.properties,创建 EurekaClientConfig 对象

Eureka Server 在集群环境下,它需要同步其它 Eureka Server 的信息。而 Eureka Client提供了很方便的方法接口来调用 Eureka Server 提供的接口。

EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();

EurekaClientConfig 构建器当中它会通过 Archaius1Utils#initConfigeureka-client.properties 加载解析到 ConfigurationManager 当中,并返回一个 DynamicPropertyFactory 实例对象传入 DefaultEurekaTransportConfig 构造器中创建 EurekaTransportConfig 用于获取 Eureka 网络传输的配置信息。

5、创建 DiscoveryClient

EurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);

上文已经说过了, Eureka Server在集群环境下会同步其它 Eureka Server的注册信息到当前 Eureka Server当中来。到这里就会通过上面构建的 ApplicationInfoManager(应用信息管理器) 与 EurekaClientConfig(Eureka Client 配置信息) 来创建一个 EurekaClientEurekaClient的构建过程有点复杂这里就不作过多分析,在下面的一篇文章当中会详细的分析它的初始化过程。
它最主要的功能就是同时发送心跳到 Eureka Server,定时同步最新的注册表到当前 Eureka Client(服务的自动上线与下线)。

6、创建 PeerAwareInstanceRegistry 实例对象

PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
    registry = new AwsInstanceRegistry(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
    awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
    awsBinder.start();
} else {
    //ps:非AWS的通常走这里!(默认走这里)
    registry = new PeerAwareInstanceRegistryImpl(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
}

根据 Eureka Server 配置Eureka Client 配置以及 EurekaClient 创建 PeerAwareInstanceRegistryImpl 类,这个类实现了 PeerAwareInstanceRegistryInstanceRegistry 这两个接口。主要具有服务注册以及从其它的 Eureka Server 集群拉取注册信息。

7、创建 PeerEurekaNodes 对象

 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
         registry,
         eurekaServerConfig,
         eurekaClient.getEurekaClientConfig(),
         serverCodecs,
         applicationInfoManager
 );

创建 PeerEurekaNodes ,这个类主要是通过 PeerEurekaNodes#start 方法来更新 Eureka Server 集群的服务信息。

8、构建并初始化 EurekaServerContext 对象

EurekaServerContext serverContext = new DefaultEurekaServerContext(
        eurekaServerConfig,
        serverCodecs,
        registry,
        peerEurekaNodes,
        applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();

把上面创建的所有组件信息用来创建 EurekaServerContext 信息,调用把 EurekaServerContext 保存到 EurekaServerContextHolder 当中,然后进行初始化。主要包括以下步骤:

  • 调用 PeerEurekaNodes#start方法定时更新 Eureka Server 服务信息列表
  • 调用 PeerAwareInstanceRegistry#init 方法初始化响应缓存(ResponseCache),这个响应缓存主要是缓存服务的注册信息;然后就是启动检测服务的心跳是否过期了(eureka client 会定期向服务器发送心跳,也就是续约[renewal])。

    9、从其它 Eureka Server 同步注册表

int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);

在这里它会通过 Eureka Client 请求远程相邻的 Eureka Server 获取注册的服务信息,初始化注册列表

10、注册监控信息

EurekaMonitors.registerAllStats()

在这里它会利用 JmxMonitorRegistryEurekaMonitors 所有的枚举类注册到 JMX 监控当中

3.Eureka Client 是如何注册的

在集群环境下 Eureka Server 相互之前需要同步注册表信息。所以不管是微服务当中还是 Eureka Server 当中都需要依赖 eureka-client 这个 Jar 包。它封装 Eureka Server 提供的 Restful 服务,依赖方可以以接口方法的方式方便的进行调用。下面是 Eureka 官网提供的系统架构图。
eureka_architecture.png
在这个图中有以下几个角色:

  • Eureka Server:Eureka 服务器,它是注册中心提供接口给应用把服务实例的信息注册上来
  • Eureka Client:Eureka 客户端,Eureka 包装好了微服务访问 Eureka 服务器的一系列接口。比如:注册,心跳,服务下线等。
  • Application Server:微服务应用服务器,一个单体服务可以按照不同的领域拆分为多个微服务。微服务可以依赖 Eureka 客户端这样就可以很方便的调用 Eureka 服务器上提供的接口进行服务注册与服务发现相关的功能

理解了 Eureka 的整个架构以及每个角色提供的功能,那么下面我们就来分析一下 Eureka Client 的启动流程。

3.1 Eureka Client 启动整体流程

不管 Eureka Server 还是微服务应用里面都会引用并创建 EurekaClient 来访问 Eureka Server,其中 Eureka Server 是配置的其它 Eureka Server 同步服务注册信息。而微服务应用服务器是从 Eureka Server 进行服务的注册与发现。并且微服务会定时向 Eureka Server 发送心跳请求,告诉 Eureka Server 当前应用实例还是存活状态,Eureka Server 在进行服务实例存活判断的时候就不会把当前服务实例剔除。
下面回到我们的主题,就是 Eureka Client 是如何启动的,其实就是如何创建 com.netflix.discovery.EurekaClient。因为 Eureka Client 和 Eureka Server 当中都需要创建 EurekaClient 这个对象实例。我们还是以 Eureka Server 服务中是如何创建 EurekaClient 的逻辑为例来讲解它的启动流程。

EurekaBootStrap#initEurekaServerContext

if (eurekaClient == null) {
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        : new MyDataCenterInstanceConfig();

    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
    applicationInfoManager = eurekaClient.getApplicationInfoManager();
}

因为 Eureka Server 在启动的时候必然 eurekaClient 对象实例为空,所以就会进到上面的逻辑。其实在 Eureka Server 启动流程分析的时候我们已经大概的分析了这个创建过程。

  • 创建 EurekaInstanceConfig 对象实例,它主要是一个配置读取接口。在创建 EurekaInstanceConfig 对象实例的时候,因为我们不是 AWS,所以创建的对象是 MyDataCenterInstanceConfig 对象实例。在调用 MyDataCenterInstanceConfig 无参构建器进行初始化的时候首先会调用它的 PropertiesInstanceConfig 进行初始化,这里它会使用 ConfigurationManager 加载 eureka-client.properties 并创建一个 DynamicPropertyFactory 实例对于读取 ConfigurationManager 里面管理的配置文件。
  • 创建 ApplicationInfoManager 对象实例,它主要应用实例管理相关,提供服务注册相关的接口。 ApplicationInfoManager 对象实例创建本身很简单。主要是通过 new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get(),传入服务实例配置信息然后创建当前服务实例对象 InstanceInfo 。里面的代码其实也是挺简单的,就是通过传入的配置类通过 Builder 模式构建一个 InstanceInfo 对象
  • 创建 EurekaClientConfig 对象,它也是一个配置类,上面的 EurekaInstanceConfig 对象主要是服务实例相关的配置,比如 host、instanceId、port 相关的。而它主要是 EurekaClient 相关的配置,是否从其它 Eureka Server 拉取注册表,是否需要注册到其它 Eureka Server 等待。它也是在自己无参构建器中读取 eureka-client.properites 文件,创建 DynamicPropertyFactory 实例用于读取配置文件中的信息;创建 EurekaTransportConfig 用于 EurekaClient 进行服务的访问。
  • 创建 DiscoveryClient 对象,它其实是 com.netflix.discovery.EurekaClient 接口的实例对象。它主要是封装了微服务(或者 Eureka Server 集群环境下) 调用 Eureka Server 的接口,并且内部封装了服务自动注册与发现的功能。

我们首先来看一下 DiscoveryClient 接口的定义,提供了哪些功能:
EurekaClient.java

public interface EurekaClient extends LookupService {
    // 获取指定分区下的应用信息
    public Applications getApplicationsForARegion(@Nullable String region);
    // 获取指定 Eureka Service 中注册的应用信息
    public Applications getApplications(String serviceUrl);
    // 条件获取服务实例信息
    public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure);
    // 条件获取服务实例信息
    public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region);
    // 条件获取服务实例信息
    public List<InstanceInfo> getInstancesByVipAddressAndAppName(String vipAddress, String appName, boolean secure);
    // 获取所有的分区信息
    public Set<String> getAllKnownRegions();
    // 从 Eureka Server 获取当前自身实例状态
    public InstanceInfo.InstanceStatus getInstanceRemoteStatus();
    // 废弃方法,可以不关注
    public List<String> getDiscoveryServiceUrls(String zone);
    // 废弃方法,可以不关注
    public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone);
    // 废弃方法,可以不关注
    public List<String> getServiceUrlsFromDNS(String instanceZone, boolean preferSameZone);
    // 废弃方法,可以不关注
    public void registerHealthCheckCallback(HealthCheckCallback callback);
    // 注册心跳检测处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
    // 注册事件监听器
    public void registerEventListener(EurekaEventListener eventListener);
    // 解除事件监听器
    public boolean unregisterEventListener(EurekaEventListener eventListener);
    // 获取心跳检测处理类
    public HealthCheckHandler getHealthCheckHandler();
    // 服务下线
    public void shutdown();
    // 获取 Eureka Client 相关配置
    public EurekaClientConfig getEurekaClientConfig();
    // 获取服务应用管理器
    public ApplicationInfoManager getApplicationInfoManager();
}
public interface LookupService<T> {
    // 根据指定 appName 获取当前 Eureka Server 注册的应用信息
    Application getApplication(String appName);
    // 获取当前 Eureka Server 注册的应用信息
    Applications getApplications();
    // 根据 ID 获取服务实例信息列表
    List<InstanceInfo> getInstancesById(String id);
    // 从 Eureka Server 获取下一个可能的服务器来处理来自注册表的请求
    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

可以看到 EurekaClient 提供的功能偏向于微服务中依赖的 eureka-client 提供的功能,而 LookupService 比较偏向于 eureka-server 里面的接口,当微服务需要服务发现的时候 Eureka 基本就会调用LookupService 提供的接口。

3.2 Eureka Client 初始化流程

下面就是通过 ApplicationInfoManagerEurekaClientConfig 对象创建 DiscoveryClient 的代码。

EurekaBootStrap#initEurekaServerContext

EurekaClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);

上面的代码看着平平无奇,实际上内含乾坤。下面就是 DiscoveryClient 初始化的流程图:
DiscoveryClient初始化方法内流程.png
归纳重点:

  • 创建 schedulerheartbeatExecutorcacheRefreshExecutor 这三个线程池
  • 调用 scheduleServerEndpointTask 方法,初始化 EurekaTransport 对象
  • 如果需要抓取注册表 clientConfig.shouldFetchRegistry(),从远程中抓取注册表添加到 com.netflix.discovery.DiscoveryClient#localRegionApps 属性当中。
  • 如果需要注册到 Eureka Server【clientConfig.shouldRegisterWithEureka()】以及在初始化的时候就进行注册【clientConfig.shouldEnforceRegistrationAtInit()】,就把当前服务注册到 Eureka Server 服务器
  • 通过 initScheduledTasks 启动一些定时任务,后面具体分析

    3.2.1 Eureka Client 初始化中的定时任务

    Eureka Client 初始化中创建 schedulerheartbeatExecutorcacheRefreshExecutor 这三个线程池,然后 DiscoveryClient#initScheduledTasks 就会通过这些线程池启动一些定时任务。

  • 如果需要抓取定时任务【clientConfig.shouldFetchRegistry()】,通过 CacheRefreshThread 这个定时任务默认 30 秒去 Eureka Server 抓取最新的服务的注册表并且添加到 com.netflix.discovery.DiscoveryClient#localRegionApps 属性当中

  • 如果需要把服务注册到注册中心【clientConfig.shouldRegisterWithEureka()】,通过 HeartbeatThread 把心跳发送到注册中心。
  • 创建 InstanceInfoReplicator 任务类,它主要负责将自身的信息周期性的上报到 Eureka server,会调用它的 start 进行定时调用,同时如果配置了状态变更需要进行通知【clientConfig.shouldOnDemandUpdateStatusChange()】时,会把创建 StatusChangeListener 并把这个监听器注册到 ApplicationInfoManager 当中

3.2.2 Eureka client上报Eureka server的注册过程

这里是23.眼花缭乱的代码中找到eureka-client是如何进行服务注册的

eureka_architecture.png
对于 Eureka 而言,微服务的提供者和消费者都是它的客户端,其中服务提供者关注服务注册服务续约服务下线等功能,而服务消费者关注于 服务信息的获取。下面我们来看一下 Eureka Client 的服务注册流程。

  • Eureka Client 注册流程

在 DiscoveryClient 类中,服务注册操作由 register 方法完成。下面我们来看一下这个方法的定义:

DiscoveryClient#register

boolean register() throws Throwable {
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            throw e;
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

上述 register 方法会在 InstanceInfoReplicator 类的 run 方法中进行执行。从操作流程上讲,上述代码的逻辑非常简单,即服务提供者先将自己注册到 Eureka 服务器中,然后根据返回的结果确定操作是否成功。显然,这里的重点代码是 eurekaTransport.registrationClient.register() ,DiscoveryClient 通过这行代码发起了远程请求。

  • Eureka Client Http

Eureka Client 在进行服务实例注册的时候是调用它的内部类 EurekaTransport 的属性 registrationClient 就是 EurekaHttpClient ,其真实的执行的对象是AbstractJersey2EurekaHttpClient

  • Eureka Server 服务注册

Eureka Client 在进行微服务注册的时候,其实是通过 EurekaHttpClient 实现类 AbstractJersey2EurekaHttpClient#register 进行服务注册的

     //这里就是eureka client注册信息
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        Response response = null;
        try {
            //发送请求,http://localhost:8080/v2/apps/ServiceA
            //发送一个post请求,服务实例对象为一个json格式发送过去,包含主机、ip,端口号
            //Eureka server就会知道这个ServiceA服务,有一个实例,比如192.168.xx.xx,host-01,8761
            Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
            addExtraProperties(resourceBuilder);
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .accept(MediaType.APPLICATION_JSON)
                    .acceptEncoding("gzip")
                    .post(Entity.json(info));
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

在这里它会构建一个类似后面请求地址: http://localhost:8080/v2/apps/APPLICATION0,并且发送一个 post 请求。

  1. http://localhost:8080 :Eureka Server 服务的域名与端口
  2. /v2/apps :Eureka Server 处理应用的 uri 地址
  3. APPLICATION0:微服务的应用名称,要求唯一值

我们在 Eureka Server 引用的 eureka-core 中可以找到 ApplicationsResource#getApplicationResource 来处理注册请求。

3.2.3 eureka server端是如何完成服务注册的

ApplicationsResource#getApplicationResource

 @Path("{appId}")
    public ApplicationResource getApplicationResource(
            @PathParam("version") String version,
            @PathParam("appId") String appId) {
        CurrentRequestVersion.set(Version.toEnum(version));
        try {
            return new ApplicationResource(appId, serverConfig, registry);
        } finally {
            CurrentRequestVersion.remove();
        }
    }

之前说过 Eureka Server 中使用 Jeysey Restful 来处理 http 请求,而 com.netflix.eureka.resources 包下面的 XXXResource 相当于 Spring MVC 中的 Controller。

上面的类中返回 ApplicationResource 说明真正的处理类是在 ApplicationResource中,并且在客户端发送的是 post 请求。那么我们就应该去ApplicationResource 中找 post 处理的方法。最终会找到 ApplicationResource#addInstance

而服务的最终注册会调用到 AbstractInstanceRegistry#register,在 Eureka Server 当中它是把微服务的注册信息以数据结构 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 保存到内存当中。所以后续我们会分析集群环境之后 Eureka Server 注册信息的同步以及 Eureka Client 定时同步 Eureka Server 的注册信息到 Eureka Client 的本地。而这种数据结构保存的数据格式如下所示:

{
    "APPLICATION0" : {
        "i-00000000" : InstanceInfo 对象1,
        "i-00000001" : InstanceInfo 对象2
    },
    "APPLICATION1" : {
        "i-00000000" : InstanceInfo 对象1
    }
}
  • APPLICATION0 :表示我们微服务的名称,比如:user-serviceorder-service
    -i-00000000 :一个微服务可以有多个实例,而i-00000000 就是服务实例的 ID,比如 user-service 用户服务发布了 8 台机器,i-00000000 就表示其中一台具体的机器 ID。
  • InstanceInfo 对象:微服务实例信息对象,包含服务名称、服务域名、服务端口、服务分组以及服务的元数据信息等,Eureka 调用者获取到服务实例信息就可以进行远程调用。

关于服务实例信息对象大家可以参看 com.netflix.appinfo.InstanceInfo 这个对象。这个类其实就是一个 POJO 对象,并不复杂。
下面我们来分析一下服务的注册过程,也就是 com.netflix.eureka.registry.AbstractInstanceRegistry#register

AbstractInstanceRegistry#register

 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            //TODO-这里eureka-server两级缓存机制中对缓存的信息设置 -- 28
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  • read.lock():首先调用读锁,防止注册的时候其它线程调用这个方法出现并发问题。
  • 根据微服务名称从注册表 registry 中获取注册的 Map<String, Lease<InstanceInfo>>.
  • 如果该服务之前没有注册过创建一个 ConcurrentHashMap<String, Lease<InstanceInfo>>() 添加到registry 注册表当中
  • 根据服务实例的 ID 从该服务的注册列表中获取 Lease 对象
  • 调用 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); 构建一个新的 Lease<InstanceInfo> 对象,并添加到该服务的注册 ConcurrentHashMap<String, Lease<InstanceInfo>>() 当中
  • 把服务注册的当时时间与服务名+服务应用 ID 添加到 recentRegisteredQueue 当中,把服务应用信息添加到 recentlyChangedQueue 当中(现在可以不考虑这个动作,后续会分析这个队列)
  • 清除缓存, Eureka Client 在查询 Eureka Server 的注册表会有一个缓存(现在可以不考虑这个动作,后续会分析这个缓存)

上面就是整个 Eureka Client 的服务注册过程了。

4 Eureka Client 全量拉取注册表

这里是26章的数据

4.1 eureka client 获取全量拉取注册表流程

在微服务中是嵌入了 Eureka Client 的,当服务启动的时候就会从 Eureka Server 中拉取注册的服务信息列表
image.png
在微服务中嵌入的 EurekaClient 的实现类 DiscoveryClient 在初始化的时候会通过调用 EurekaHttpClient 去 Eureka Server 拉取全量服务信息列表。
DiscoveryClient#initScheduledTasks 会启动定时任务 CacheRefreshThread 增量的拉取 Eureka Server 的注册信息(后续的博客会分析增量拉取注册表)。
在 DiscoveryClient 初始化拉取全量配置中心,然后通过定时任务每 30 秒去 Eureka Server 拉取增量的注册表信息。保证服务调用的是最新服务的注册中心。

4.2 eureka server 处理全量注册表处理分析

当 Eureka Client 初始化时调用 Eureka Client 提供的 http 接口其实是 ApplicationsResource#getContainers

ApplicationsResource#getContainers

    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }

        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }

        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        CurrentRequestVersion.remove();
        return response;
    }

上面的代码看着挺多,其实逻辑还是挺简单的。

  • 如果你没有指定 regions 信息的话,这个值默认会是空字符串,然后它的 EurekaMonitors.GET_ALL 监控指标会 +1
  • PeerAwareInstanceRegistry 注册中心会判断当前访问是否可以进行,它主要是通过当前系统时间与启动时间的关系或者判断远程配置中心的数据是否可读。具体可参见:PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)
  • 配置中心中的服务注册信息支持 XML 与 JSON 格式并且可以使用 gzip 进行压缩
  • 读取的服务注册信息是从 ResponseCache 进行读取

    5 Eureka Server 多级缓存

    既然在 Eureka Server 中获取注册信息是从 ResponseCache 进行读取的,那么我们看一下这个缓存是从什么时候进行初始化的
    20210104130113628.png
    从上面的时序图当中我们可以看到,在 Eureka Server 进行初始化的时候会初始化注册中心 PeerAwareInstanceRegistryImpl 这个对象会调用它的 init 方法并最终调用 PeerAwareInstanceRegistryImpl#initializedResponseCache 初始化注册信息列表的缓存。

PeerAwareInstanceRegistryImpl#initializedResponseCache

@Override
    public synchronized void initializedResponseCache() {
        if (responseCache == null) {
            responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
        }
    }

下面我们来分析一下 ResponseCacheImpl 的初始化过程:

ResponseCacheImpl

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }
  • 在 ResponseCacheImpl 初始化的时候通过 ConcurrentHashMap 构建一级只读缓存 readOnlyCacheMap;通过 GuavaLoadingCache 构建二级读写缓存。构建 Eureka Server 注册表的多级缓存机制
  • ConcurrentHashMap 构建一级只读缓存 readOnlyCacheMap 会通过定时任务 TimerTaskLoadingCache 构建二级读写缓存进行比对更新(每 30 秒执行一次)
  • LoadingCache 构建二级读写缓存会在 180 秒钟后就会过期
  • ResponseCacheImpl 中还提供了 invalidate 方法进行手动过期,当 Eureka Server 发生了服务注册、下线、故障会自动过期该缓存

上面说了 Eureka Server 多级缓存提供了只读缓存、读写缓存以及这两个缓存的过期策略,下面我们来看一下缓存的获取。

ResponseCacheImpl#getValue

Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }
  • 当 Eureka Client 获取注册表的时候,首先会从只读缓存中获取数据
  • 如果只读缓存为空就会从读写缓存中获取数据,并把读取到的值添加到只读缓存当中

6.Eureka 之 Eureka Client 续约

6.1 Eureka Client 续约机制

在 Eureka Client 需要定时的向注册中心 Eureka Server 发送续约信息,告诉注册中心当前的微服务处于可用状态,这样注册中心在服务剔除的时候才不会把当前服务实例从注册表中删除。
在 DiscoveryClient 进行初始化的时候,会调用 DiscoveryClient#initScheduledTasks 方法。在这个方向当中不仅会启动定时任务调用 CacheRefreshThread 线程定时从注册中心拉取最新的注册信息;还会启动定时任务调用 HeartbeatThread 向注册中心发送续约信息。

DiscoveryClient#renew

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            return false;
        }
    }

上面的请求最终会调用到 AbstractJersey2EurekaHttpClient#sendHeartBeat发送类似于:http://localhost:8080/v2/apps/APPLICATION0/i-00000000 这样的 PUT 请求路径到注册中心。

  • http://localhost:8080 :Eureka Server 的请求地址
  • /v2/apps:Restful 处理类 ApplicationsResource 上面的路径:@Path("/{version}/apps")
  • APPLICATION0 :微服务应用的名称,也可以是:user-service,order-service 等
  • i-00000000 :微服务应用的名称下具体的服务实例 ID,比如 user-service 下面有 8 台机制实例。i-00000000就是其中一台唯一的 ID 值

    6.2 Eureka Server 处理续约请求

    Eureka Server 处理 Eureka Client 时序图:
    20210105125731762.png
    首先 ApplicationsResource#getApplicationResource 会来接收 Eureka Client 发送过来的续约请求,这个接口里面会返回 ApplicationResource 对象。然后会调用ApplicationResource#getInstanceInfo。因为 Eureka Client 发送的是 PUT 请求,所以请求会转发到 InstanceResource 标注了 @PUT 的方法 renewLease 进行处理。

    InstanceResource#renewLease

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {

        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }

        Response response;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
  • 判断是否来自集群节点(如果是集群节点需要同步到 Eureka Server 集群其它节点)
  • 调用 Eureka Server 的注册中心 PeerAwareInstanceRegistry 进行续约
  • 比较注册中心的服务信息中的注册时间是否与传入时间匹配
  • Eureka Server 响应 Eureka Client 续约结果

    6.3 注册中心续约服务

    PeerAwareInstanceRegistryImpl#renew 进行续约的时候,先会调用 AbstractInstanceRegistry#renew 在当前 Eureka Server 进行续约,如果是 Eureka Server 集群环境下,会把续约请求发送到其它的 Eureka Server 节点上去。

    PeerAwareInstanceRegistryImpl#renew

public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

下面我们来看一下调用 AbstractInstanceRegistry#renew 进行当前 Eureka Server 节点是如何进行续约的。

AbstractInstanceRegistry#renew

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

其实续约的逻辑非常简单。

  • 首先从注册列表中找到当前服务的注册信息列表:Map<String, Lease<InstanceInfo>> gMap.
  • 然后从注册信息列表中根据服务信息的 ID 找到服务信息:Lease<InstanceInfo> leaseToRenew.
  • 如果找不到服务信息,续约失败,返回 false。
  • 获取服务信息在 InstanceStatusOverrideRule 的中状态
  • 如果状态是 InstanceStatus.UNKNOWN),续约失败,返回 false。
  • 如果状态与服务实例中的状态不一致,调用 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus) 方法
  • 最后调用 com.netflix.eureka.lease.Lease#renew 进行服务的续约,其实就是更新 Lease 对象的 long lastUpdateTimestamp 属性为系统当前时间 + duration 时间(默认 90 秒)。

其实 Eureka Client 进行服务续约就是把服务信息里面的 lastUpdateTimestamp 更新一下,防止 Eureka Server 在进行服务拆除的时候把当前服务摘除。

7 Eureka 之 Eureka Client 服务下线

7.1 Eureka Client 服务下线

当 Eureka Client 服务关闭之前会调用 DiscoveryClient#shutdown 方法。因为这个方法上面标注了 @PreDestroy 在对象销毁之前这个方法就会被调用。

    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
            cancelScheduledTasks();
            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
            Monitors.unregisterObject(this);
        }
    }
  • 首先把 AtomicBoolean 类型的 isShutdown 设置成 true。
  • 接着会解除掉服务应用管理器(ApplicationInfoManager) 中注册的状态监听器(StatusChangeListener)
  • 然后会调用 cancelScheduledTasks 释放线程池资源,包括:Eureka Server 间的数据同步线程与线程池、心跳处理的线程池与线程以及注册表定时刷新的线程池与线程。
  • 设置当前服务应用的状态为下线,并且在DiscoveryClient#unregister 方法当中调用 EurekaHttpClient#cancel 进行服务下线
  • 释放 EurekaTransport 远程调用里面的 HTTP 连接资源
  • 最后释放一些监控资源

在调用服务下线的时候请求最终会调用到 AbstractJersey2EurekaHttpClient#cancel发送类似于:http://localhost:8080/v2/apps/APPLICATION0/i-00000000 这样的 PUT 请求路径到注册中心。

  • http://localhost:8080:Eureka Server 的请求地址
  • /v2/apps:Restful 处理类 ApplicationsResource 上面的路径:@Path("/{version}/apps")
  • APPLICATION0 :微服务应用的名称,也可以是:user-serviceorder-service
  • i-00000000 :微服务应用的名称下具体的服务实例 ID,比如 user-service 下面有 8 台机制实例。i-00000000 就是其中一台唯一的 ID 值

    7.2 Eureka Server 服务下线

    Eureka Server 处理 Eureka Client 服务下线时序图:
    Eureka Server 服务下线.png
    首先 ApplicationsResource#getApplicationResource 会来接收 Eureka Client 发送过来的服务下线请求,这个接口里面会返回 ApplicationResource 对象。然后会调用 ApplicationResource#getInstanceInfo。因为 Eureka Client 发送的是 DELETE 请求,所以请求会转发到 InstanceResource 标注了 @DELETE 的方法 cancelLease 进行处理。

InstanceResource#cancelLease

@DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));
            if (isSuccess) {
                return Response.ok().build();
            } else {
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            return Response.serverError().build();
        }
    }

这个方法里面的逻辑非常简单:

  • 调用注册中心的服务下线功能
  • 然后把服务下线的响应结果返回给 Eureka Client

    7.3 注册中心下线服务

    下面我们来看一下注册中心服务在服务下线做了哪些事情。下面是 Eureka Server 服务下线的时序图:
    1.Eureka 源码学习 - 图10

上面的时序图有两点比较核心:

  • 调用当前 Eureka Server 的 AbstractInstanceRegistry#internalCancel 方法进行服务下线
  • 如果是集群环境调用 PeerAwareInstanceRegistryImpl#replicateToPeers 把当前的服务实例的下线同步到其它 Eureka Servier 当中去(后续分析)。

那我们就来分析一下 AbstractInstanceRegistry#internalCancel 方法是如何进行服务下线的。

AbstractInstanceRegistry#internalCancel

protected boolean internalCancel(String appName, String id, boolean isReplication) {
        read.lock();
        try {
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            read.unlock();
        }
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }
  • EurekaMonitors#CANCEL 服务取消监控进行 +1
  • 根据微服务的名称获取到当前服务的注册列表
  • 如果当前服务的注册列表不为空,根据传入的服务实例 ID 把它从注册列表当中移除
  • CircularQueue<Pair<Long, String>> recentCanceledQueue 列表当中添加当前服务实例信息(注:这个队列信息暂时没有用到)
  • 如果移除的服务实例为空,EurekaMonitors#CANCEL_NOT_FOUND +1 并返回 false.
  • 调用 Lease<InstanceInfo>#cancel 进行下线,其实就是修改 Lease 的 registrationTimestamp 属性为当前系统时间
  • 设置当前信息为删除状态,并添加到 ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue 当中。(注:这个队列会在 Eureka Client 增量拉取注册表时使用)

其实 Eureka Client 服务下线就是在注册服务中的 Map 当中把当前服务实例的信息删除就行了。

8 Eureka 之 Eureka Server 服务过期

正常情况下,应用实例下线时候会主动向 Eureka-Server 发起下线请求,但实际情况下,应用实例可能异常崩溃,又或者是网络异常等原因,导致下线请求无法被成功提交。

这种情况之后,需要 Eureka Client 定时向 Eureka Server 发送续约配合 Eureka Client 通过定时任务清理超时的租约解决上述异常。

8.1 EvictionTask

EvictionTask 是清理租约过期任务,下面是它的调用时序图:
1.Eureka 源码学习 - 图11
当 Eureka Server 启动的时候就会调用实现了 Java Servlet 规范 ServletContextListener 监听器的 EurekaBootStrap 到调用初始化注册服务 AbstractInstanceRegistry。然后会启用以下几个定时任务:

  • MeasuredRate,统计定时任务,在 AbstractInstanceRegistry 的构建器创建 MeasuredRate 对象的时候传入 1000 * 60 * 1,然后在这里调用它的 start 方法里面有一个定时任务,每隔 60 秒也就是每隔 1 分钟执行一次。这个定时任务里面有 2 个 AtomicLong 类型的参数。一个是 AtomicLong currentBucket 每进行一次续约的时候就会调用它 + 1,另一个是AtomicLong lastBucketMeasuredRate任务每分钟进行执行的时候就会把AtomicLong currentBucket 里面的值设置到AtomicLong lastBucket当中去,然后把AtomicLong currentBucket值清空。然后通过获取 AtomicLong lastBucket 的值就能够得到最近一分钟续约的次数(最近一分钟续约的次数这个在后面要讲的 Eureka Server 自我保护机制当中会使用到)
  • 另外一个就是定时调用 EvictionTask 任务,过期注册表中超时续约的应用。默认 60 秒执行一次

    8.2 服务过期

    下面我们来分析一下服务过期的逻辑:

    EvictionTask

class EvictionTask extends TimerTask {
        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }

        long getCompensationTimeMs() {
            long currNanos = getCurrentTimeNano();
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }
        long getCurrentTimeNano() {  // for testing
            return System.nanoTime();
        }
    }

compute a compensation time defined as the actual time this task was executed since the prev iteration, vs the configured amount of time for execution. This is useful for cases where changes in time (due to clock skew or gc for example) causes the actual eviction task to execute later than the desired time according to the configured cycle.

首先它会调用 getCompensationTimeMs 方法计算一个补偿时间,由于 JVM GC ,又或是时间偏移( clock skew ) 等原因,定时器执行实际比预期会略有延迟。
传入计算获取到的补偿时间,调用 evict 方法过期超时续约的应用。

AbstractInstanceRegistry#evict

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
  • 首先会判断 Eureka Server 是否开启的自我保护机制,如果开启了自我保护机制(下一篇博客会讲)
  • 首先遍历注册表注册的应用列表,找到所有续约过期的应用实例列表。过期判断条件是:当前时间 > 最后一次续约时间 + 90 秒 + 补偿时间。
  • 然后调用 getLocalRegistrySize 获取当前注册列表中应用实例的个数
  • 接着通过下面的公式获取一个期望过期的个数 evictionLimit。evictionLimit = 注册列表所有应用实例的个数 - 注册列表所有应用实例的个数 * 获得续订的最小百分比(默认 0.85)。也就是:期望过期的个数 = 应用实例 * 0.15。如果有20 个服务应用那么这个值就是 3.
  • 然后取第一步获取到续约过期的应用实例个数与期望过期的应用实例个数的最小值
  • 如果这个最小值大于 0,就随机过期续约过期的应用实例列表里面的应用实例并且调用内部取消逻辑(之前服务下线分析过)

如果 Eureka Client 默认会每 30 秒发送心跳到 Eureka Server。如果 Eureka Server 在进行服务过期判断的时候也就是:最后一次续约时间 + 90 秒 + 补偿时间小于系统当前时间就会以服务进行过期。
有以下两点需要注意一下:

  • Eureka Client 在进行续约的时候是把 lastUpdateTimestamp 设置成系统的当前时间 + 90 秒。然后 Eureka Server 在进行服务过期判断的时候是:最后一次续约时间 + 90 秒 + 补偿时间小于系统当前时间就会以服务进行过期。最后在 Eureka Client 拉取注册表是每 30 秒进行拉取。所以一个服务不可用时其它服务需要感知到这个服务不可用就需要:90 秒 + 90 秒 + 30 秒 = 210秒,也就是 3 分半 才能感知到
  • Eureka Server 在服务过期的时候会根据:应用实例 * 0.15 与 心跳过期应用个数随机顺序将它们剔除出去。因为对于需要大量剔除服务应用时,如果我们不这样做,我们可能会在 Eureka 自我保护开始之前清除整个应用程序。通过随机化,影响应该均匀分布在所有应用程序中。

    9 Eureka 之 自我保护机制

    9.1 什么是自我保护机制

    默认情况下,如果Eureka Server在一定时间内(默认 90 秒,其实不止 90 秒)没有接收到某个微服务实例的心跳,Eureka Server将会移除该实例。但是当网络分区故障发生时,微服务与Eureka Server之间无法正常通信,而微服务本身是正常运行的,此时不应该移除这个微服务,所以引入了自我保护机制。
    自我保护机制的工作机制是:如果在15分钟内超过 85% 的客户端节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,Eureka Server 自动进入自我保护机制,此时会出现以下几种情况:

  • Eureka Server不再从注册列表中移除因为长时间没收到心跳而应该过期的服务。

  • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可用。
  • 当网络稳定时,当前Eureka Server新的注册信息会被同步到其它节点中。

因此Eureka Server可以很好的应对因网络故障导致部分节点失联的情况,而不会像 ZK 那样如果有一半不可用的情况会导致整个集群不可用而变成瘫痪。
Eureka Server 自我保护机制,可以通过通过配置 eureka.server.enable-self-preservation 来 true 开启/ false 禁用 自我保护机制,默认打开状态,建议生产环境打开此配置。

9-2 Eureka Server 自我保护机制

上面【 8. Eureka 之 Eureka Server 服务过期 】,在进行服务过期的时候,首先会判断 Eureka Server 是否开启了自我保护机制。

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        ......

}

如果开启了自我保护机制也就是 isLeaseExpirationEnabled() 方法返回了 false,就直接返回,不进行服务下线

@Override
public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
  • 首先判断 Eureka Server 是否开启了自我保护机制 eureka.enableSelfPreservation 为 true 开启反之不开启。如果没有开启直接返回 true,可以进行服务过期处理。
  • 然后判断每分钟期望的续约数(numberOfRenewsPerMinThreshold) 大于 0 并且实际每分钟的续约数(getNumOfRenewsInLastMin()) 大于每分钟期望的续约数(numberOfRenewsPerMinThreshold)

    9-3.每分钟应用的续约数

    在 Eureka Server 启动的时候,在注册服务(AbstractInstanceRegistry)中会启动一个定时任务 MeasuredRate 来计算每分钟应用续约的个数。时序图如下:image.png
    MeasuredRate ,它是一个统计定时任务,在 AbstractInstanceRegistry 的构建器创建 MeasuredRate 对象的时候传入 1000 * 60 * 1 ,然后在这里调用它的 start 方法里面有一个定时任务,每隔 60 秒也就是每隔 1 分钟执行一次。这个定时任务里面有 2 个AtomicLong 类型的参数。一个是 AtomicLong currentBucket 每进行一次续约的时候就会调用它 + 1,另一个是 AtomicLong lastBucket

MeasuredRate 任务每分钟进行执行的时候就会把 AtomicLong currentBucket 里面的值设置到 AtomicLong lastBucket 当中去,然后把 AtomicLong currentBucket 值清空再次计算。然后通过获取 AtomicLong lastBucket 的值就能够得到最近一分钟续约的次数。

9-4.每分钟期望的续约数

每分钟期望的续约数是 AbstractInstanceRegistry#numberOfRenewsPerMinThreshold , 这个值是动态变化的。它提供了 AbstractInstanceRegistry#updateRenewsPerMinThreshold 来动态的更新这个值

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                                                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                                                * serverConfig.getRenewalPercentThreshold());
}

每分钟期望的续约数是根据 expectedNumberOfClientsSendingRenews (期望 Eureka Client 发送的续约数,这个值会根据服务的动作进行更新:服务注册 + 1服务下线 - 1) 来进行判断的。上面的公式如下:

每分钟期望的续约数 = 期望Eureka Client发送的续约数 * (60 秒 / 预计客户端间隔秒数续约[默认 30 秒]) * 0.85

比如:现在注册中心有 20 个服务
那么:每分钟期望的续约数 = 20 * (60 / 30) * 0.85 = 17
  • AbstractInstanceRegistry#register 方法

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    
          ......
    
          synchronized (lock) {
              if (this.expectedNumberOfClientsSendingRenews > 0) {
                  // Since the client wants to register it, increase the number of clients sending renews
                  this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                  updateRenewsPerMinThreshold();
              }
          }
    
          ...... 
    }
    

    服务注册, **expectedNumberOfClientsSendingRenews**(期望Eureka Client发送的续约数) + 1,并且更新每分钟期望的续约数

  • AbstractInstanceRegistry#internalCancel方法 ```java protected boolean internalCancel(String appName, String id, boolean isReplication) {

    …….

    synchronized (lock) {

      if (this.expectedNumberOfClientsSendingRenews > 0) {
          // Since the client wants to cancel it, reduce the number of clients to send renews.
          this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
          updateRenewsPerMinThreshold();
      }
    

    }

    ….

}

**服务下线, **`**expectedNumberOfClientsSendingRenews**`**(期望Eureka Client发送的续约数) - 1**,并且更新每分钟期望的续约数
<a name="gilJ3"></a>
## 10.Eureka 之 Eureka Server 覆盖状态
<a name="xPOQw"></a>
### 10-1.Eureka Server 覆盖状态概述
在 `InstanceInfo` 服务应用信息对象里面不仅有状态`status` ,还有覆盖状态 `overriddenStatus`
```java
public class InstanceInfo {

    // 服务应用信息状态
    private volatile InstanceStatus status = InstanceStatus.UP;
    // 服务应用信息覆盖状态
    private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;

}

可以调用 Eureka-Server 暴露的 HTTP Restful 接口 apps/${APP_NAME}/${INSTANCE_ID}/status 对服务应用实例覆盖状态的变更。从而达到主动的、强制的变更应用实例状态。

这里不会修改 Eureka-Client 应用实例的状态,而是修改在 Eureka-Server 注册的应用实例的状态。

通过这样的方式,Eureka-Client 在获取到注册信息时,并且配置 eureka.shouldFilterOnlyUpInstances 为 true(默认为 true),过滤掉非 InstanceStatus.UP 的应用实例,从而避免调动该实例,以达到应用实例的暂停服务( InstanceStatus.OUT_OF_SERVICE ),而无需关闭应用实例。
因此,大多数情况下,调用该接口的目的,将应用实例状态在 (InstanceStatus.UP ) 和 (InstanceStatus.OUT_OF_SERVICE) 之间切换。

10-2.Eureka Server 覆盖状态操作

在 Eureka Server 中的服务应用处理类 InstanceResource 提供了 3 个接口来修改服务实例中的 overriddenStatus

  • renewLease: 心跳接口,可以传入 overriddenStatus 字段来修改服务信息(InstanceInfo) 的覆盖状态
  • statusUpdate:覆盖状态修改接口,可以传入 value字段来修改服务信息(InstanceInfo) 的覆盖状态
  • deleteStatusUpdate:删除覆盖状态接口,可以传入value字段来修改服务信息(InstanceInfo) 的覆盖状态

    续约修改覆盖状态

    可以调用 InstanceResource#renewLease 这个续约接口可以修改服务应用信息的覆盖状态值,它的时序图如下所示:
    image.png在续约成功之后,如果传递的 lastDirtyTimestamp 字段不为空并且根据时间戳验证服务应用信息状态为Response.Status.NOT_FOUND。就会从ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap 这个 Map 根据应用 ID 获取这个应用服务的实例之前是否进行了状态覆盖,如果没有,就会往ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap这个 Map 里面添加服务 ID 的以及对应的覆盖状态并且设置应用信息 InstanceInfo 的覆盖状态

    在进行续约的时候如果实例的状态与覆盖状态不相等就会把状态设置成覆盖状态

public boolean renew(String appName, String id, boolean isReplication) {

        ......

        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }

       ......    
}

覆盖状态修改接口

可以调用 InstanceResource#statusUpdate 这个接口来修改服务应用信息的覆盖状态值,它的时序图如下所示:
image.png

  • 首先它会根据服务找到对应服务的实例列表,然后根据服务实例的 ID 找到具体的服务实例
  • 接着会调用服务实例信息的续约接口Lease#renew修改最近一次修改时间戳lastUpdateTimestampSystem.currentTimeMillis() + duration(默认90秒);
  • 如果服务实例的状态与覆盖状态相等就直接返回,否则进入下一个步骤
  • ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap 添加服务实例的 ID 与覆盖状态的映射
  • 修改服务应用信息的覆盖状态为传入的状态
  • 往修改队列 recentlyChangedQueue 里面添加当前服务应用信息的操作状态为ActionType.MODIFIED
  • 清除注册服务中的响应缓存

    应用实例覆盖状态映射

    在之前的分析中我们可以看到覆盖状态操作其实包含以下两个步骤:

  • 操作注册服务 AbstractInstanceRegistry对象中的 ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap 服务实例 ID 与覆盖状态的映射

  • 把服务实例里面的覆盖状态overriddenStatus设置为传入的状态

上面讲过在在服务实例进行续约的时候就会从 overriddenInstanceStatusMap根据服务实例 ID 获取到覆盖状态把覆盖状态更新到当前服务实例当中。其实对于服务注册也有同样的逻辑:

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {

        ......

       InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
       if (overriddenStatusFromMap != null) {
           logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
           registrant.setOverriddenStatus(overriddenStatusFromMap);
       }

       // Set the status based on the overridden status rules
       InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
       registrant.setStatusWithoutDirty(overriddenInstanceStatus);

       ......

}
  • ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap根据服务实例 ID 获取到覆盖状态
  • 如果覆盖状态不为空就调用registrant.setOverriddenStatus(overriddenStatusFromMap)把覆盖状态设置到实例信息 InstanceInfooverriddenStatus 覆盖状态当中
  • 接着调用 AbstractInstanceRegistry#getOverriddenInstanceStatus 这个方法根据InstanceStatusOverrideRule这个实例覆盖状态规则的实现类获取覆盖状态
  • 如果覆盖状态的值与实例信息 InstanceInfo registrant 里面状态(status)的值不一致就把覆盖状态的值设置为实例信息 InstanceInfo registrant的状态值

    AbstractInstanceRegistry#getOverriddenInstanceStatus

    protected InstanceInfo.InstanceStatus getOverriddenInstanceStatus(InstanceInfo r,
                                                                    Lease<InstanceInfo> existingLease,
                                                                    boolean isReplication) {
        InstanceStatusOverrideRule rule = getInstanceInfoOverrideRule();
        logger.debug("Processing override status using rule: {}", rule);
        return rule.apply(r, existingLease, isReplication).status();
    }

    protected abstract InstanceStatusOverrideRule getInstanceInfoOverrideRule();

在方法 AbstractInstanceRegistry#getOverriddenInstanceStatus 中会获取 InstanceStatusOverrideRule 应用覆盖状态的规则。这个规则接口的类结构如下图所示:
1.Eureka 源码学习 - 图15

  • StatusOverrideResult:是 InstanceStatusOverrideRule 定义的接口返回值里面有两个字段: match用于判断是否规则匹配,实例状态 InstanceStatus用于调用registrant.setStatusWithoutDirty(overriddenInstanceStatus) 设置实例的状态 (status)
  • AsgEnabledRule:是 AWS 环境才会使用这时我们不做讨论

    AlwaysMatchInstanceStatusRule.java

public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {
    private static final Logger logger = LoggerFactory.getLogger(AlwaysMatchInstanceStatusRule.class);

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
                instanceInfo.getId());
        return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
    }

    @Override
    public String toString() {
        return AlwaysMatchInstanceStatusRule.class.getName();
    }
}

任何情况的匹配,直接返回 instanceInfo 的状态

DownOrStartingRule.java

public class DownOrStartingRule implements InstanceStatusOverrideRule {
    private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        // ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
        // The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
        // the client says. The same is the case with replica as well.
        // The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
        // currently in SERVICE
        if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
                && (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
            logger.debug("Trusting the instance status {} from replica or instance for instance {}",
                    instanceInfo.getStatus(), instanceInfo.getId());
            return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
        }
        return StatusOverrideResult.NO_MATCH;
    }

    @Override
    public String toString() {
        return DownOrStartingRule.class.getName();
    }
}

上面的逻辑是:服务实例信息的状态既不等于 InstanceStatus.UP 也不等于InstanceStatus.OUT_OF_SERVICE ,就直接返回服务实例信息里面的状态。
代码注释:ReplicationInstance is DOWN or STARTING -相信这个,但是当实例说UP的时候,质疑它客户端实例发送 STARTINGDOWN``(因为心跳失败),然后我们接受什么客户端说。replica也是如此。客户端或副本的OUT_OF_SERVICE也需要确认,因为服务可能是当前服务。

FirstMatchWinsCompositeRule.java

public class FirstMatchWinsCompositeRule implements InstanceStatusOverrideRule {

    private final InstanceStatusOverrideRule[] rules;
    private final InstanceStatusOverrideRule defaultRule;
    private final String compositeRuleName;

    public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
        this.rules = rules;
        this.defaultRule = new AlwaysMatchInstanceStatusRule();
        // Let's build up and "cache" the rule name to be used by toString();
        List<String> ruleNames = new ArrayList<>(rules.length+1);
        for (int i = 0; i < rules.length; ++i) {
            ruleNames.add(rules[i].toString());
        }
        ruleNames.add(defaultRule.toString());
        compositeRuleName = ruleNames.toString();
    }

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        for (int i = 0; i < this.rules.length; ++i) {
            StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
            if (result.matches()) {
                return result;
            }
        }
        return defaultRule.apply(instanceInfo, existingLease, isReplication);
    }

    @Override
    public String toString() {
        return this.compositeRuleName;
    }
}

它的逻辑是里面有规则列表 InstanceStatusOverrideRule[] rules 同时也有默认规则 InstanceStatusOverrideRule defaultRule 首先遍历规则列表,如果有一个匹配就直接返回。反之就使用默认的规则进行匹配

OverrideExistsRule.java

public class OverrideExistsRule implements InstanceStatusOverrideRule {

    private static final Logger logger = LoggerFactory.getLogger(OverrideExistsRule.class);

    private Map<String, InstanceInfo.InstanceStatus> statusOverrides;

    public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
        this.statusOverrides = statusOverrides;
    }

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
        InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
        // If there are instance specific overrides, then they win - otherwise the ASG status
        if (overridden != null) {
            logger.debug("The instance specific override for instance {} and the value is {}",
                    instanceInfo.getId(), overridden.name());
            return StatusOverrideResult.matchingStatus(overridden);
        }
        return StatusOverrideResult.NO_MATCH;
    }

    @Override
    public String toString() {
        return OverrideExistsRule.class.getName();
    }

}

它的逻辑是:从服务实例信息 ID 与覆盖状态的映射 Map<String, InstanceInfo.InstanceStatus> statusOverrides 中根据服务实例信息 ID 获取,如果能够获取就返回覆盖状态的状态值,反之就不匹配。

LeaseExistsRule.java

public class LeaseExistsRule implements InstanceStatusOverrideRule {

    private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        // This is for backward compatibility until all applications have ASG
        // names, otherwise while starting up
        // the client status may override status replicated from other servers
        if (!isReplication) {
            InstanceInfo.InstanceStatus existingStatus = null;
            if (existingLease != null) {
                existingStatus = existingLease.getHolder().getStatus();
            }
            // Allow server to have its way when the status is UP or OUT_OF_SERVICE
            if ((existingStatus != null)
                    && (InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus)
                    || InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
                logger.debug("There is already an existing lease with status {}  for instance {}",
                        existingLease.getHolder().getStatus().name(),
                        existingLease.getHolder().getId());
                return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
            }
        }
        return StatusOverrideResult.NO_MATCH;
    }

    @Override
    public String toString() {
        return LeaseExistsRule.class.getName();
    }
}
  • 如果是集群环境下直接返回 StatusOverrideResult.NO_MATCH 不匹配。
  • 如果实例状态 Lease<InstanceInfo> existingLease 中的实例信息是 InstanceStatus.OUT_OF_SERVICE 或者 InstanceStatus.UP 直接返回 Lease<InstanceInfo> existingLease 实例信息里面的状态。

我们将 PeerAwareInstanceRegistryImpl 的应用实例覆盖状态规则进行梳理一下:

状态 DownOrStartingRule OverrideExistsRule LeaseExistsRule AlwaysMatchInstanceStatusRule
instanceInfo STARTING * 全部状态
existingLease UP or OUT_OF_SERVER(非 Eureka Server 请求)
statusOverrides * 全部状态
  • 应用实例状态是最重要的属性,没有之一,因而在最终实例状态的计算,以可信赖为主。
  • DownOrStartingRule ,instanceInfo处于 STARTING或者 DOWN 状态,应用实例可能不适合提供服务( 被请求 ),考虑可信赖,返回 instanceInfo 的状态。
  • OverrideExistsRule ,当存在覆盖状态( statusoverrides ) ,使用该状态,比较好理解。
  • LeaseExistsRule ,来自 Eureka-Client 的请求( 非 Eureka-Server 集群请求),当 Eureka-Server 的实例状态存在,并且处于 UP 或则 OUT_OF_SERVICE ,保留当前状态。原因,禁止 Eureka-Client主动在这两个状态之间切换。如果要切换,使用应用实例覆盖状态变更与删除接口。
  • AlwaysMatchInstanceStatusRule ,使用 instanceInfo 的状态返回,以保证能匹配到状态。

从上面我们可以看到 Eureka Server 是从 AbstractInstanceRegistry#getOverriddenInstanceStatus 获取应用覆盖状态规则 InstanceStatusOverrideRule 对应的实例对象。而 Eureka Server 的注册服务实例是 PeerAwareInstanceRegistryImpl ,在它的初始化过程当中它的构建方法会初始化 InstanceStatusOverrideRule 对象。

public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    private final InstanceStatusOverrideRule instanceStatusOverrideRule;

    public PeerAwareInstanceRegistryImpl(
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            EurekaClient eurekaClient
    ) {
        super(serverConfig, clientConfig, serverCodecs);
        this.eurekaClient = eurekaClient;
        this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
        // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
        // then we check the status of a potentially existing lease.
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
                new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
    }

    ......

它使用的是 FirstMatchWinsCompositeRule 这个规则类:它的逻辑是里面有规则列表 InstanceStatusOverrideRule[] rules 同时也有默认规则 InstanceStatusOverrideRule defaultRule 首先遍历规则列表,如果有一个匹配就直接返回。反之就使用默认的规则进行匹配。然后传入的规则依次是:DownOrStartingRule 、 OverrideExistsRule 和 LeaseExistsRule。

11.Euraka 之 Euraka Client 增量拉取注册表

在【5.Eureka Server 多级缓存】中已经详细说明了在 Eureka Client 启动的时候会去 Eureka Server 里面全量的拉取一下注册到里面的服务信息列表。并且 Eureka Server 里面是使用了多级缓存来保存全量的注册信息列表。
Eureka Client 在启动时获取 Eureka Server 中的注册信息,但是作为分布式环境服务随时时可进行上线、下线操作。所以作为注册中心它需要有服务自动上下线发现的功能。
Eureka Client 在初始化获取了全量的注册信息之后,它会启动定时任务去增量的拉取注册中心当的新的服务列表信息。
Eureka Client 增量拉取注册表的时序图如下:
image.png
EurekaClient 在初始化的时候会初始化一序列定时任务,其中一个就是定时的从 Eureka Server 增量的获取服务注册信息。它调用的 http 请求的路径类似于:http://localhost:8080/v2/apps/delta 并且发送的是 get 请求。

  • http://localhost:8080:Eureka Server 的接收请求的地址
  • /v2/apps:Restful 处理类 ApplicationsResource 上面的路径:@Path(“/{version}/apps”)
  • deltaApplicationsResource 这个处理类中的 getContainerDifferential 方法用于处理 Eureka Client 的堷拉取注册列表信息。

我们先来分析一下 Eureka Server 中是如果来处理 Eureka Client 发送来的增量拉取注册列表的请求。先来分析一下 Eureka Server 的返回值。然后再来分析一下 Eureka Client 是如何处理从 Eureka Client 的增量服务注册列表的。

Eureka Server 增量拉取