Eureka 是由 Netflix 开源的一个【服务注册中心】项目,是一个 CS 架构设计的项目,分为 EurekaServer 和 EurekaClient 两端
Spring Cloud 的整合
@EnableEurekaServer
在 SpringBoot 项目的启动类上添加 @EnableEurekaServer 注解,则当前项目就称为一个 EurekaServer,即注册中心
@Import(EurekaServerMarkerConfiguration.class) 给 Spring 容器注册了一个 Marker Bean,用来判断当前项目是否是一个 Eureka 服务端
@Configurationpublic class EurekaServerMarkerConfiguration {@Beanpublic Marker eurekaServerMarkerBean() {return new Marker();}class Marker {}}
EurekaServerAutoConfiguration.class
EurekaServerAutoConfiguration 类的注册,是由 \spring-cloud-netflix-eureka-server-2.1.4.RELEASE.jar!\META-INF\spring.factories 提供,会自动注册到 Spring 容器当中,同时,Eureka 是由 **Jersey **来提供 MVC 的环境支持
判断是否含有 EurekaServerAutoConfiguration.class ,来判断是否是一个 Eureka Server 项目
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {// ...@Beanpublic FilterRegistrationBean jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {FilterRegistrationBean bean = new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(Ordered.LOWEST_PRECEDENCE);bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));return bean;}@Beanpublic javax.ws.rs.core.Application jerseyApplication(Environment environment,ResourceLoader resourceLoader) {ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);// Filter to include only classes that have a particular annotation.//provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));// Find classes in Eureka packages (or subpackages)//Set<Class<?>> classes = new HashSet<>();for (String basePackage : EUREKA_PACKAGES) {Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);for (BeanDefinition bd : beans) {Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),resourceLoader.getClassLoader());classes.add(cls);}}// Construct the Jersey ResourceConfigMap<String, Object> propsAndFeatures = new HashMap<>();propsAndFeatures.put(// Skip static content used by the webappServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");DefaultResourceConfig rc = new DefaultResourceConfig(classes);rc.setPropertiesAndFeatures(propsAndFeatures);return rc;}// ...}
服务注册
流程
- 由 Controller 进行调用 EurekaServer 的服务注册方法
InstanceRegistry.class的register方法,这是一个责任链的设计模式,会继续调用其父类的register方法

Eureka Core Controller
前面说了 Eureka 是由 jersey 来提供 MVC 的功能,所以最终 Spring Cloud 会调用 Eureka Core 的 ApplicationResource.class 类的 addInstance 方法实现服务注册功能
@Produces({"application/xml", "application/json"})public class ApplicationResource {@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 进行服务注册registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible}}
registry 是一个
PeerAwareInstanceRegistry接口,会调用其子类InstanceRegistry来实现 ```java public class InstanceRegistry extends PeerAwareInstanceRegistryImplimplements ApplicationContextAware {
@Override public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
handleRegistration(info, leaseDuration, isReplication); super.register(info, leaseDuration, isReplication);}
}
3. 创建 `ConcurentHashMap<String, Map<String, Lease<InstanceInfo>> gMap` 来保存集群下的实例
```java
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 先获取 gMap
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 判断 gMap 是否为空,表示第一次创建
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 判断是否有相同 id 的冲突问题
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 使用较新的 registry
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 完成服务(registry)注册,将实例对象写入 gMap 中
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
}
心跳续约
心跳续约,是指服务在指定间隔内,汇报自己的健康状况,服务器会保存最后一次操作时间,如果超过指定间隔到最大容忍时间后,则会判定为服务出现异常,会进行服务下架处理
eureka:
instance:
instance-id: user # 此实例注册到 eureka 上的唯一实例 ID
prefer-ip-address: true # 是否显示 IP 地址
lease-renewal-interval-in-seconds: 10 # eureka 客户端需要多久发送心跳给 eureka 服务器
lease-expiration-duration-in-seconds: 30 # eureka 服务器在接收到实例最后一次发出心跳后,需要等待多久表明它已经终结
这个实现是由 Eureka core 的 InstanceResource.class 类的 renewLease 方法来实现的
@Produces({"application/xml", "application/json"})
public class InstanceResource {
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 开始进行心跳续约
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
}
由 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); 来实现心跳续约,当然 registry 也是由 InstanceRegistry.class 的 renew 方法来实现的,同样使用责任链的调用模式
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
implements ApplicationContextAware {
@Override
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
// 责任链模式,进行心跳续约的调用
return super.renew(appName, serverId, isReplication);
}
}
此时会调用
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 继续责任链模式调用
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
}
此时会调用 AbstractInstanceRegistry.class 的 renew 方法来真正的实现心跳续约
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 根据集群名称,获取集群
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
// 从集群中根据 ID 获取具体的服务
leaseToRenew = gMap.get(id);
}
// 看服务是否还未注册
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
// 获取具体的服务实例
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 开始调用 Lease 类。来更新续约时间
leaseToRenew.renew();
return true;
}
}
}
通过租债器来更新续约时间,公式为:最后一次更新时间 = 当前时间 + 续约间隔时间
假设心跳时间设置的是:30s
那么当前服务的最后一次操作时间(续约时间)就是:当前时间 + 30s
public class Lease<T> {
private volatile long lastUpdateTimestamp;
public void renew() {
// 更改了最后一次操作时间
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
}
集群同步
所谓的集群同步,就是当 client 发送一个注册,心跳,更新等操作给 server 的时候,当前 server 会同时将请求发送给集群中的其他 server 来保持数据的一致性
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
所有操作完成后,都会进行集群同步操作,具体实现类为 PeerAwareInstanceRegistryImpl 类的 replicateToPeers 方法,其中:
isReplication:表示是否来自集群同步,同时也是处理循环同步的关键
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry
implements PeerAwareInstanceRegistry {
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 解决循环同步的问题,isReplication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 循环同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
// 剔除自己
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 执行集群同步
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
}
开始真正执行同步逻辑
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
// 这里将 replicateInstanceInfo 设置为 true,表示来及集群同步消息
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
同步复制
当一台新的 Eureka Server 启动的时候,会主动同步其他 Server 的注册信息,以达到同步的目的。
我们可以看到在 Spring 整合的 Eureka Server 的包下,其中 META-INF/spring.factories 下 配置了 EurekaServerAutoConfiguration.class 类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
}
其中 @Import(EurekaServerInitializerConfiguration.class) 导入了 EurekaServerInitializerConfiguration.class 类,这个类主要完成了初始化 Eureka 环境的配置,和初始化 Eureka Context(包括集群同步注册信息,自我保护机制,等)
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
// 上下文启动
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
}
public class EurekaServerBootstrap {
public void contextInitialized(ServletContext context) {
try {
// 初始化 Eureka 环境,读取配置文件
initEurekaEnvironment();
// 初始化 Eureka 上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 开始进行集群同步复制
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
}
集群同步复制 int registryCount = this.registry.syncUp(); 时,会调用 PeerAwareInstanceRegistryImpl.class 类的 syncUp 方法来实现的,简答的来说,就是读取其他 Server 的所有注册信息,然后循环写入到自己的 Server 中
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 写入注册信息
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
服务剔除(下架)
服务剔除,就是 Eureka 会启动一个定时器,会定期判断服务是否过期(即检查当前系统时间是否大于下一次续约时间),即:当前时间是否 > lastUpdateTimestamp + 30s
根据上述同步复制的时候,会同时启动服务剔除线程:this.registry.openForTraffic(this.applicationInfoManager, registryCount);
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 执行服务剔除
super.postInit();
}
super.postInit(); 启动一个 new EvictionTask() 线程,完成服务剔除,其实服务剔除是一个定时器,定时去清理:因为长时间没有发送心跳的服务
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// 执行服务剔除
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 找出所有过期的的服务
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
// 洗牌算法随机找出一个微服务
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 执行服务剔除
internalCancel(appName, id, false);
}
}
}
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 将服务从集群中删除
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
自我保护机制
自我保护机制触发时需要一个阈值的:当短时间(15分钟)内大量(85%)心跳连接过期,自我保护机制将被触发
公式;**预估心跳数量(所有注册上来的实例) 每分钟触发的心跳连接次数(60s / 服务端每分钟每分钟心跳连接刷新时间(默认 30s)) 自我保护机制的触发百分比(85%)
这个阈值就是 numberOfRenewsPerMinThreshold 这个变量
当有服务初始化、服务注册、服务下架,或者每15分钟自动更新,这四种情况都会更新自我保护机制的阈值
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
一旦触发自我保护机制,将不再进行服务剔除操作
numberOfRenewsPerMinThreshold:自我保护机制的阈值
getNumOfRenewsInLastMin() = renewsLastMin:会再心跳续约的时候 + 1**
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 服务剔除代码...
}
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
服务发现 & 缓存机制
在 Eureka Core 包下的 ApplicationsResource.class 中定义了服务发现 getContainers 方法,会返回所有的微服务实例
@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
}
这里主要说一下 Eureka 的缓存机制 responseCache.getGZIP(cacheKey) 也就是这段代码
public byte[] getGZIP(Key key) {
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
Eureka 缓存的设置的是三层缓存机制
- 只读缓存:使用 ConcurrentHashMap 来实现的,readOnlyCacheMap(30s 自动更新)
- 读写缓存:使用 Google Guava 来实现的,readWriteCacheMap(180s 过期)
- 监听器:删除监听
- 监听器:如果取数据没有去到,就进入监听器
- 真实数据:使用 ConcurrentHashMap 来实现的
- 首先进入只读缓存读取
- 如果只读缓存没有,就进入读写缓存
- 如果读写缓存也没有,就进入读写缓存注册的监听器中,从真实i数据中获取数据
@VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }
- 只读缓存的数据,只会来源于读写缓存,而且没有提供主动更新的 API,只读缓存只能通过定时任务来更新,默认是每 30s 取读写缓存中更新一次数据
