Eureka 是由 Netflix 开源的一个【服务注册中心】项目,是一个 CS 架构设计的项目,分为 EurekaServer 和 EurekaClient 两端

Spring Cloud 的整合

@EnableEurekaServer

在 SpringBoot 项目的启动类上添加 @EnableEurekaServer 注解,则当前项目就称为一个 EurekaServer,即注册中心

@Import(EurekaServerMarkerConfiguration.class) 给 Spring 容器注册了一个 Marker Bean,用来判断当前项目是否是一个 Eureka 服务端

  1. @Configuration
  2. public class EurekaServerMarkerConfiguration {
  3. @Bean
  4. public Marker eurekaServerMarkerBean() {
  5. return new Marker();
  6. }
  7. class Marker {
  8. }
  9. }

EurekaServerAutoConfiguration.class

EurekaServerAutoConfiguration 类的注册,是由 \spring-cloud-netflix-eureka-server-2.1.4.RELEASE.jar!\META-INF\spring.factories 提供,会自动注册到 Spring 容器当中,同时,Eureka 是由 **Jersey **来提供 MVC 的环境支持

判断是否含有 EurekaServerAutoConfiguration.class ,来判断是否是一个 Eureka Server 项目

  1. @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
  2. public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
  3. // ...
  4. @Bean
  5. public FilterRegistrationBean jerseyFilterRegistration(
  6. javax.ws.rs.core.Application eurekaJerseyApp) {
  7. FilterRegistrationBean bean = new FilterRegistrationBean();
  8. bean.setFilter(new ServletContainer(eurekaJerseyApp));
  9. bean.setOrder(Ordered.LOWEST_PRECEDENCE);
  10. bean.setUrlPatterns(
  11. Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
  12. return bean;
  13. }
  14. @Bean
  15. public javax.ws.rs.core.Application jerseyApplication(Environment environment,
  16. ResourceLoader resourceLoader) {
  17. ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
  18. false, environment);
  19. // Filter to include only classes that have a particular annotation.
  20. //
  21. provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
  22. provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
  23. // Find classes in Eureka packages (or subpackages)
  24. //
  25. Set<Class<?>> classes = new HashSet<>();
  26. for (String basePackage : EUREKA_PACKAGES) {
  27. Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
  28. for (BeanDefinition bd : beans) {
  29. Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
  30. resourceLoader.getClassLoader());
  31. classes.add(cls);
  32. }
  33. }
  34. // Construct the Jersey ResourceConfig
  35. Map<String, Object> propsAndFeatures = new HashMap<>();
  36. propsAndFeatures.put(
  37. // Skip static content used by the webapp
  38. ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
  39. EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
  40. DefaultResourceConfig rc = new DefaultResourceConfig(classes);
  41. rc.setPropertiesAndFeatures(propsAndFeatures);
  42. return rc;
  43. }
  44. // ...
  45. }

服务注册

流程

  1. 由 Controller 进行调用 EurekaServer 的服务注册方法 InstanceRegistry.classregister 方法,这是一个责任链的设计模式,会继续调用其父类的 register 方法

image.png

Eureka Core Controller

前面说了 Eureka 是由 jersey 来提供 MVC 的功能,所以最终 Spring Cloud 会调用 Eureka Core 的 ApplicationResource.class 类的 addInstance 方法实现服务注册功能

  1. @Produces({"application/xml", "application/json"})
  2. public class ApplicationResource {
  3. @POST
  4. @Consumes({"application/json", "application/xml"})
  5. public Response addInstance(InstanceInfo info,
  6. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
  7. logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
  8. // validate that the instanceinfo contains all the necessary required fields
  9. if (isBlank(info.getId())) {
  10. return Response.status(400).entity("Missing instanceId").build();
  11. } else if (isBlank(info.getHostName())) {
  12. return Response.status(400).entity("Missing hostname").build();
  13. } else if (isBlank(info.getIPAddr())) {
  14. return Response.status(400).entity("Missing ip address").build();
  15. } else if (isBlank(info.getAppName())) {
  16. return Response.status(400).entity("Missing appName").build();
  17. } else if (!appName.equals(info.getAppName())) {
  18. return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
  19. } else if (info.getDataCenterInfo() == null) {
  20. return Response.status(400).entity("Missing dataCenterInfo").build();
  21. } else if (info.getDataCenterInfo().getName() == null) {
  22. return Response.status(400).entity("Missing dataCenterInfo Name").build();
  23. }
  24. // handle cases where clients may be registering with bad DataCenterInfo with missing data
  25. DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
  26. if (dataCenterInfo instanceof UniqueIdentifier) {
  27. String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
  28. if (isBlank(dataCenterInfoId)) {
  29. boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
  30. if (experimental) {
  31. String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
  32. return Response.status(400).entity(entity).build();
  33. } else if (dataCenterInfo instanceof AmazonInfo) {
  34. AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
  35. String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
  36. if (effectiveId == null) {
  37. amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
  38. }
  39. } else {
  40. logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
  41. }
  42. }
  43. }
  44. // 进行服务注册
  45. registry.register(info, "true".equals(isReplication));
  46. return Response.status(204).build(); // 204 to be backwards compatible
  47. }
  48. }
  1. registry 是一个 PeerAwareInstanceRegistry 接口,会调用其子类 InstanceRegistry 来实现 ```java public class InstanceRegistry extends PeerAwareInstanceRegistryImpl

    1. implements ApplicationContextAware {

    @Override public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {

     handleRegistration(info, leaseDuration, isReplication);
     super.register(info, leaseDuration, isReplication);
    

    }

}


3. 创建 `ConcurentHashMap<String, Map<String, Lease<InstanceInfo>> gMap` 来保存集群下的实例
```java
public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();

            // 先获取 gMap
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);

            // 判断 gMap 是否为空,表示第一次创建
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }

            // 判断是否有相同 id 的冲突问题
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 使用较新的 registry
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 完成服务(registry)注册,将实例对象写入 gMap 中
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
}

心跳续约

心跳续约,是指服务在指定间隔内,汇报自己的健康状况,服务器会保存最后一次操作时间,如果超过指定间隔到最大容忍时间后,则会判定为服务出现异常,会进行服务下架处理

eureka:
  instance:
    instance-id: user # 此实例注册到 eureka 上的唯一实例 ID
    prefer-ip-address: true # 是否显示 IP 地址
    lease-renewal-interval-in-seconds: 10 # eureka 客户端需要多久发送心跳给 eureka 服务器
    lease-expiration-duration-in-seconds: 30 # eureka 服务器在接收到实例最后一次发出心跳后,需要等待多久表明它已经终结

这个实现是由 Eureka core 的 InstanceResource.class 类的 renewLease 方法来实现的

@Produces({"application/xml", "application/json"})
public class InstanceResource {

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);

        // 开始进行心跳续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
}

boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); 来实现心跳续约,当然 registry 也是由 InstanceRegistry.classrenew 方法来实现的,同样使用责任链的调用模式

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
        implements ApplicationContextAware {

    @Override
    public boolean renew(final String appName, final String serverId,
            boolean isReplication) {
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
                + isReplication);
        List<Application> applications = getSortedApplications();
        for (Application input : applications) {
            if (input.getName().equals(appName)) {
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) {
                    if (info.getId().equals(serverId)) {
                        instance = info;
                        break;
                    }
                }
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                        instance, isReplication));
                break;
            }
        }
        // 责任链模式,进行心跳续约的调用
        return super.renew(appName, serverId, isReplication);
    }

}

此时会调用

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 继续责任链模式调用
        if (super.renew(appName, id, isReplication)) {
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
}

此时会调用 AbstractInstanceRegistry.classrenew 方法来真正的实现心跳续约

public abstract class AbstractInstanceRegistry implements InstanceRegistry {

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        // 根据集群名称,获取集群
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 从集群中根据 ID 获取具体的服务
            leaseToRenew = gMap.get(id);
        }
        // 看服务是否还未注册
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取具体的服务实例
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            // 开始调用 Lease 类。来更新续约时间
            leaseToRenew.renew();
            return true;
        }
    }
}

通过租债器来更新续约时间,公式为:最后一次更新时间 = 当前时间 + 续约间隔时间

假设心跳时间设置的是:30s
那么当前服务的最后一次操作时间(续约时间)就是:当前时间 + 30s

public class Lease<T> {

    private volatile long lastUpdateTimestamp;

    public void renew() {
        // 更改了最后一次操作时间
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }

}

集群同步

所谓的集群同步,就是当 client 发送一个注册,心跳,更新等操作给 server 的时候,当前 server 会同时将请求发送给集群中的其他 server 来保持数据的一致性

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

所有操作完成后,都会进行集群同步操作,具体实现类为 PeerAwareInstanceRegistryImpl 类的 replicateToPeers 方法,其中:

isReplication:表示是否来自集群同步,同时也是处理循环同步的关键

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry 
    implements PeerAwareInstanceRegistry {

    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            // 解决循环同步的问题,isReplication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 循环同步
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                // 剔除自己
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 执行集群同步
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

}

开始真正执行同步逻辑

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}
public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    batchingDispatcher.process(
        taskId("register", info),
        // 这里将 replicateInstanceInfo 设置为 true,表示来及集群同步消息
        new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
            public EurekaHttpResponse<Void> execute() {
                return replicationClient.register(info);
            }
        },
        expiryTime
    );
}

同步复制

当一台新的 Eureka Server 启动的时候,会主动同步其他 Server 的注册信息,以达到同步的目的。

我们可以看到在 Spring 整合的 Eureka Server 的包下,其中 META-INF/spring.factories 下 配置了 EurekaServerAutoConfiguration.class

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
}

其中 @Import(EurekaServerInitializerConfiguration.class) 导入了 EurekaServerInitializerConfiguration.class 类,这个类主要完成了初始化 Eureka 环境的配置,和初始化 Eureka Context(包括集群同步注册信息,自我保护机制,等)

@Configuration
public class EurekaServerInitializerConfiguration
        implements ServletContextAware, SmartLifecycle, Ordered {

    @Override
    public void start() {
        new Thread(() -> {
            try {
                // TODO: is this class even needed now?
                // 上下文启动
                eurekaServerBootstrap.contextInitialized(
                        EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");

                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
            }
        }).start();
    }

}

public class EurekaServerBootstrap {
    public void contextInitialized(ServletContext context) {
        try {
            // 初始化 Eureka 环境,读取配置文件
            initEurekaEnvironment();
            // 初始化 Eureka 上下文
            initEurekaServerContext();

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

    protected void initEurekaServerContext() throws Exception {
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                                                    XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                                                   XStream.PRIORITY_VERY_HIGH);

        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                                                   this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);

        log.info("Initialized server context");

        // Copy registry from neighboring eureka node
        // 开始进行集群同步复制
        int registryCount = this.registry.syncUp();
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }
}

集群同步复制 int registryCount = this.registry.syncUp(); 时,会调用 PeerAwareInstanceRegistryImpl.class 类的 syncUp 方法来实现的,简答的来说,就是读取其他 Server 的所有注册信息,然后循环写入到自己的 Server 中

@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;

    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        // 写入注册信息
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

服务剔除(下架)

服务剔除,就是 Eureka 会启动一个定时器,会定期判断服务是否过期(即检查当前系统时间是否大于下一次续约时间),即:当前时间是否 > lastUpdateTimestamp + 30s

根据上述同步复制的时候,会同时启动服务剔除线程:this.registry.openForTraffic(this.applicationInfoManager, registryCount);

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 执行服务剔除
    super.postInit();
}

super.postInit(); 启动一个 new EvictionTask() 线程,完成服务剔除,其实服务剔除是一个定时器,定时去清理:因为长时间没有发送心跳的服务

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
                           serverConfig.getEvictionIntervalTimerInMs(),
                           serverConfig.getEvictionIntervalTimerInMs());
}

class EvictionTask extends TimerTask {

    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            // 执行服务剔除
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.

    // 找出所有过期的的服务
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            // 洗牌算法随机找出一个微服务
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 执行服务剔除
            internalCancel(appName, id, false);
        }
    }
}

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        read.lock();
        CANCEL.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            // 将服务从集群中删除
            leaseToCancel = gMap.remove(id);
        }
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

自我保护机制

自我保护机制触发时需要一个阈值的:当短时间(15分钟)内大量(85%)心跳连接过期,自我保护机制将被触发

公式;**预估心跳数量(所有注册上来的实例) 每分钟触发的心跳连接次数(60s / 服务端每分钟每分钟心跳连接刷新时间(默认 30s)) 自我保护机制的触发百分比(85%)

这个阈值就是 numberOfRenewsPerMinThreshold 这个变量

当有服务初始化、服务注册、服务下架,或者每15分钟自动更新,这四种情况都会更新自我保护机制的阈值

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                                                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                                                * serverConfig.getRenewalPercentThreshold());
}

一旦触发自我保护机制,将不再进行服务剔除操作

numberOfRenewsPerMinThreshold:自我保护机制的阈值
getNumOfRenewsInLastMin() = renewsLastMin:会再心跳续约的时候 + 1**

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // 服务剔除代码...
}

@Override
public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

服务发现 & 缓存机制

在 Eureka Core 包下的 ApplicationsResource.class 中定义了服务发现 getContainers 方法,会返回所有的微服务实例

@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {

    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }

        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }

        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }

}

这里主要说一下 Eureka 的缓存机制 responseCache.getGZIP(cacheKey) 也就是这段代码

public byte[] getGZIP(Key key) {
    Value payload = getValue(key, shouldUseReadOnlyResponseCache);
    if (payload == null) {
        return null;
    }
    return payload.getGzipped();
}

Eureka 缓存的设置的是三层缓存机制

  • 只读缓存:使用 ConcurrentHashMap 来实现的,readOnlyCacheMap(30s 自动更新)
  • 读写缓存:使用 Google Guava 来实现的,readWriteCacheMap(180s 过期)
    • 监听器:删除监听
    • 监听器:如果取数据没有去到,就进入监听器
  • 真实数据:使用 ConcurrentHashMap 来实现的
  1. 首先进入只读缓存读取
  2. 如果只读缓存没有,就进入读写缓存
  3. 如果读写缓存也没有,就进入读写缓存注册的监听器中,从真实i数据中获取数据
    @VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
     Value payload = null;
     try {
         if (useReadOnlyCache) {
             final Value currentPayload = readOnlyCacheMap.get(key);
             if (currentPayload != null) {
                 payload = currentPayload;
             } else {
                 payload = readWriteCacheMap.get(key);
                 readOnlyCacheMap.put(key, payload);
             }
         } else {
             payload = readWriteCacheMap.get(key);
         }
     } catch (Throwable t) {
         logger.error("Cannot get value for key : {}", key, t);
     }
     return payload;
    }
    
  • 只读缓存的数据,只会来源于读写缓存,而且没有提供主动更新的 API,只读缓存只能通过定时任务来更新,默认是每 30s 取读写缓存中更新一次数据