1 服务注册和发现组件Eureka
Spring Cloud版本
Hoxton.SR9
1.1 简介
在软件领域,Eureka是Netflix在线影片公司开源的一个服务注册与发现的组件,和其他Netflix公司的服务组件(例如负载均衡、熔断器、网关等)一起,被Spring Cloud社区整合为Spring Cloud Netflix模块。
什么是Eureka?
Eureka和Consul、Zookeeper类似,Eureka是一个用于服务注册和发现的组件,最开始主要应用于亚马逊公司旗下的云计算服务平台AWS。Eureka分为Eureka Server和Eureka Client,Eureka Server为Eureka服务注册中心,Eureka Client为Eureka客户端。
为什么选择Eureka?在Spring Cloud中,可选择Consul、Zookeeper和Eureka作为服务注册和发现的组件,那为什么要选择Eureka呢?
首先,Eureka完全开源,是Netflix公司的开源产品,经历了Netflix公司的生产环境的考验,以及3年时间的不断迭代,在功能和性能上都非常稳定,可以放心使用。
其次,Eureka是Spring Cloud首选推荐的服务注册与发现组件,与Spring Cloud其他组件可以无缝对接。
最后,Eureka和其他组件,比如负载均衡组件Ribbon、熔断器组件Hystrix、熔断器监控组件 Hystrix Dashboard组件、熔断器聚合监控Turbine组件,以及网关Zuul组件相互配合,能够很容易实现服务注册、负载均衡、熔断和智能路由等功能。这些组件都是由Netflix公司开源的,一起被称为Netflix OSS组件。Netflix OSS组件由Spring Cloud整合为Spring CloudNetflix组件,它是Spring Cloud构架微服务的核心组件,也是基础组件。
1.2 Eureka基本架构
Eureka的基本架构如第2章的图2-1所示,其中主要包括以下3种角色。
Register Service:服务注册中心,它是一个Eureka Server,提供服务注册和发现的功能。
Provider Service:服务提供者,它是一个Eureka Client,提供服务。
Consumer Service:服务消费者,它是一个Eureka Client,消费服务。
1.3 Eureka源码简析
1.3.1 Eureka的一些概念
(1)Register——服务注册
当Eureka Client向Eureka Server注册时,Eureka Client提供自身的元数据,比如IP地址、端口、运行状况指标的Url、主页地址等信息。
(2)Renew——服务续约
Eureka Client在默认的情况下会每隔30秒发送一次心跳来进行服务续约。通过服务续约来告知Eureka Server该Eureka Client仍然可用,没有出现故障。在正常情况下,如果Eureka Server在90秒内没有收到EurekaClient的心跳,Eureka Server会将Eureka Client实例从注册列表中删除。注意:官网建议不要更改服务续约的间隔时间。
(3)Fetch Registries——获取服务注册列表信息
Eureka Client从Eureka Server获取服务注册表信息,并将其缓存在本地。Eureka Client会使用服务注册列表信息查找其他服务的信息,从而进行远程调用。该注册列表信息定时(每30秒)更新一次,每次返回注册列表信息可能与Eureka Client的缓存信息不同,Eureka Client会自己处理这些信息。如果某种原因导致注册列表信息不能及时匹配,Eureka Client会重新获取整个注册表信息。Eureka Server缓存了所有的服务注册列表信息,并将整个注册列表以及每个应用程序的信息压缩,压缩内容和没有压缩的内容完全相同。Eureka Client和Eureka Server可以使用JSON 和XML数据格式进行通信。在默认的情况下,Eureka Client使用JSON格式的方式来获取服务注册列表的信息。
(4)Cancel——服务下线
Eureka Client在程序关闭时可以向Eureka Server发送下线请求。发送请求后,该客户端的实例信息将从Eureka Server的服务注册列表中删除。该下线请求不会自动完成,需要在程序关闭时调用以下代码:
DiscoveryManager.getInstance().shutdownComponent();
(5)Eviction——服务剔除
在默认情况下,当Eureka Client连续90秒没有向Eureka Server发送服务续约(即心跳)时,Eureka Server会将该服务实例从服务注册列表删除,即服务剔除。
1.3.2 Eureka的高可用架构
图5-3为Eureka的高可用架构,该图片来自GitHub中Eureka开源代码的文档。
从图5-3中可知,在这个架构中有两个角色,即Eureka Server和EurekaClient。而Eureka Client又分为Applicaton Service和Application Client,即服务提供者和服务消费者。每个区域有一个Eureka集群,并且每个区域至少有一个Eureka Server可以处理区域故障,以防服务器瘫痪。Eureka Client向Eureka Server注册,将自己的客户端信息提交给Eure构从图5-3中可知,在这个架构中有两个角色,即Eureka Server和EurekaClient。而Eureka Client又分为Applicaton Service和Application Client,即服务提供者和服务消费者。每个区域有一个Eureka集群,并且每个区域至少有一个Eureka Server可以处理区域故障,以防服务器瘫痪。
Eureka Client向Eureka Server注册,将自己的客户端信息提交给Eureka Server。然后,Eureka Client通过向Eureka Server发送心跳(每30秒一次)来续约服务。如果某个客户端不能持续续约,那么Eureka Server断定该客户端不可用,该不可用的客户端将在大约90秒后从Eureka Server服务注册列表中删除。服务注册列表信息和服务续约信息会被复制到集群中的每个Eureka Server节点。来自任何区域的Eureka Client都可以获取整个系统的服务注册列表信息。根据这些注册列表信息,Application Client可以远程调用Applicaton Service来消费服务。
1.3.3 Register服务注册
服务注册,即Eureka Client向Eureka Server提交自己的服务信息,包括IP地址、端口、ServiceId等信息。如果Eureka Client在配置文件中没有配置ServiceId,则默认为配置文件中配置的服务名,即${spring.application.name}的值。
当Eureka Client启动时,会将自身的服务信息发送到Eureka Server。这个过程其实非常简单,现在来从源码的角度分析服务注册的过程。在工程的Maven的依赖包下,找到eureka-client-1.9.8.jar包。在com.netflix.discovery包下有一个DiscoveryClient类,该类包含了Eureka Client向EurekaServer注册的相关方法。其中,DiscoveryClient实现了EurekaClient接口,并且它是一个单例模式,而EurekaClient继承了LookupService接口。它们之间的关系如图5-4所示。
在DiscoveryClient类中有一个服务注册的方法register(),该方法通过Http请求向Eureka Server注册,其代码如下:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
在DiscoveryClient类下继续追踪register()方法,这个方法被InstanceInfoReplicator 类的run()方法调用,其中InstanceInfoReplicator实现了Runnable接口,run()方法代码如下:
class InstanceInfoReplicator implements Runnable {
...
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
而InstanceInfoReplicator类是在DiscoveryClient初始化过程中使用的,其中有一个initScheduledTasks()方法,该方法主要开启了获取服务注册列表的信息。如果需要向Eureka Server注册,则开启注册,同时开启了定时任务向Eureka Server服务续约,具体代码如下:
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer 心跳定时任务
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator 服务实例信息复制器
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
logger.error("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
再来跟踪Eureka Server 的代码,在Maven的eureka-core:1.6.2的jar包下。打开com.netflix. eureka包,会发现有一个EurekaBootStrap的类,BootStrapContext类在程序启动时具有最先初始化的权限,代码如下:
/**
* init hook for server context. Override for custom logic.
*/
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
// PeerAwareInstanceRegistry,PeerAwareInstanceRegistryImpl和服务注册有关
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
// PeerEurekaNodes和服务高可用有关
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();
logger.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
其中,PeerAwareInstanceRegistryImpl和PeerEurekaNodes两个类从其命名上看,应该和服务注册以及Eureka Server高可用有关。先追踪PeerAwareInstanceRegistryImpl类,在该类中有一个register()方法,该方法提供了服务注册,并且将服务注册后的信息同步到其他的Eureka Server服务中。代码如下:
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 服务注册
super.register(info, leaseDuration, isReplication);
// 将服务注册后的信息同步到其他的Eureka Server
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
单击其中的super.register(info, leaseDuration, isReplication)方法,进入其父类AbstractInstance- Registry可以发现更多细节,**注册列表的信息被保存在一个Map中。**PeerAwareInstanceRegistryImpl类的replicateToPeers()方法用于**将注册列表信息同步到其他Eureka Server的其他Peers节点**,追踪代码,发现该方法会循环遍历向所有的Peers节点注册,执行replicateInstanceActionsToPeers()方法:
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
点击replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node),该方法通过判断action为Register,最终执行类PeerEurekaNodes的register()方法,该方法通过执行一个任务向其他节点同步该注册信息,代码如下:
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
经过一系列的源码追踪,可以发现PeerAwareInstanceRegistryImpl类的register()方法实现了服务的注册,并且向其他Eureka Server的Peer节点同步了该注册信息,那么register()方法被谁调用了呢?在前文中有关Eureka Client的分析中可以知道,Eureka Client是通过 Http来向Eureka Server注册的,那么Eureka Server肯定会提供一个服务注册的API接口给EurekaClient调用,PeerAwareInstanceRegistryImpl的register()方法最终肯定会被暴露的Http接口所调用。在IDEA开发工具中,同时按住“Alt”键和鼠标左键(查看某个类被谁调用的快捷键),可以很快定位到ApplicationResource类的addInstance ()方法,即服务注册的接口,其代码如下:
/**
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
* {@link InstanceInfo} information of the instance.
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
1.3.4 Renew服务续约
服务续约和服务注册非常相似,通过前文中的分析可以知道,服务注册在Eureka Client程序启动之后开启,并同时开启服务续约的定时任务。在eureka-client-1.9.8.jar的DiscoveryClient的类下有renew()方法,其代码如下:
/**
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
继续追踪,还是在DiscoveryClient中HeartbeatThread类调用了renew()方法:
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
继续追踪,还是在DiscoveryClient中initScheduledTasks()调用了HeartbeatThread类,initScheduledTasks()方法,该方法主要开启了获取服务注册列表的信息。如果需要向Eureka Server注册,则开启注册,同时开启了定时任务向Eureka Server服务续约。
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
...
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
...
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
另外,Eureka Server的续约接口在eureka-core:1.9.8.jar的com.netflix.eureka包下的InstanceResource类下,接口方法为renewLease(),它是一个RESTful API接口。为了减少本章的篇幅,省略了大部分代码的展示。其中有一个registry.renew()方法,即服务续约,代码如下,我们还可以根据registry.renew()的代码继续深入研究,和追踪服务注册的源码类似。:
/**
* A put request for renewing lease from a client instance.
*
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
* @param overriddenStatus
* overridden status if any.
* @param status
* the {@link InstanceStatus} of the instance.
* @param lastDirtyTimestamp
* last timestamp when this instance information was updated.
* @return response indicating whether the operation was a success or
* failure.
*/
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
服务续约有两个参数是可以配置的,即Eureka Client发送续约心跳的时间参数和Eureka Server在多长时间内没有收到心跳将实例剔除的时间参数。在默认情况下,这两个参数分别为30秒和90秒,官方的建议是不要修改,如果有特殊需求还是可以调整的,只需要分别在Eureka Client和Eureka Server的配置文件application.yml中加以下的配置:
eureka.instance.leaseRenewalIntervalInSeconds: 30
eureka.instance.leaseExpirationDurationInSeconds: 90
有关服务注册列表的获取、服务下线和服务剔除的源码不在这里进行跟踪解读,因为与服务注册和续约类似。总的来说,通过阅读源码可以发现,整体架构与Eureka 的高可用架构图完全一致。
1.3.5 为什么Eureka Client获取服务实例这么慢
(1)Eureka Client的注册延迟
Eureka Client启动之后,不是立即向Eureka Server注册的,而是有一个延迟向服务端注册的时间。通过跟踪源码,可以发现默认的延迟时间为40 秒,源码在eureka-client-1.9.8.jar的DefaultEurekaClientConfig类中,代码如下:
public int getInitialInstanceInfoReplicationIntervalSeconds() {
return configInstance.getIntProperty(
namespace + INITIAL_REGISTRATION_REPLICATION_DELAY_KEY, 40).get();
}
(2)Eureka Server的响应缓存
Eureka Server维护每 30 秒更新一次响应缓存,可通过更改配置eureka.server.responseCache UpdateIntervalMs来修改。所以即使是刚刚注册的实例,也不会立即出现在服务注册列表中。
(3)Eureka Client的缓存
Eureka Client保留注册表信息的缓存。该缓存每30秒更新一次(如前所述)。因此,Eureka Client刷新本地缓存并发现其他新注册的实例可能需要30秒。
(4)LoadBalancer的缓存
Ribbon的负载平衡器从本地的Eureka Client获取服务注册列表信息。Ribbon本身还维护了缓存,以避免每个请求都需要从Eureka Client获取服务注册列表。此缓存每30秒刷新一次(可由ribbon.ServerListRefreshInterval配置),所以可能至少需要30秒的时间才能使用新注册的实例。
综上因素,一个新注册的实例,默认延迟 40 秒向服务注册中心注册,所以不能马上被Eureka Server发现。另外,刚注册的Eureka Client也不能立即被其他服务调用,原因是调用方由于各种缓存没有及时获取到最新的服务注册列表信息。
1.3.6 Eureka自我保护模式
当有一个新的Eureka Server出现时,它尝试从相邻Peer节点获取所有服务实例注册表信息。如果从相邻的Peer节点获取信息时出现了故障,Eureka Server会尝试其他的Peer节点。如果Eureka Server能够成功获取所有的服务实例信息,则根据配置信息设置服务续约的阈值。在任何时间,如果Eureka Server接收到的服务续约低于为该值配置的百分比(默认为15分钟内低于85%),则服务器开启自我保护模式,即不再剔除注册列表的信息。这样做的好处在于,如果是Eureka Server自身的网络问题而导致Eureka Client无法续约,Eureka Client的注册列表信息不再被删除,也就是Eureka Client还可以被其他服务消费。
在默认情况下,Eureka Server的自我保护模式是开启的,如果需要关闭,则在配置文件添加以下代码:
eureka.server.enable-self-preservation: false
1.4 构建高可用Eureka集群
在实际的项目中,可能有几十个或者几百个的微服务实例,这时EurekaServer 承担了非常高的负载。由于Eureka Server在微服务架构中有着举足重轻的作用,所以需要构建高可用性的Eureka Server集群。
2 Eureka三级缓存机制
详解Eureka 缓存机制
eureka(三)-注册中心之多级缓存机制
Eureka 缓存机制详细配置
eureka的多级缓存机制