Eureka 核心步骤源码解析
一、Eureka Client
Eureka Client做的事情主要包括:
- 服务注册(Register)Eureka Client会向Eureka Server进行服务注册,Client在我们的实际应用中可以是服务提供者或者是服务消费者
- 服务续约(Renew)Eureka Client需要定期(默认30秒)向Eureka Server发送一次心跳来续订租约,以便让Eureka Server知道自已仍在运行,如果Eureka Server在90秒内未收到续订心跳,则会将Client实例从其注册表中删除
- 服务注册列表获取(Fetch)Eureka Client需要从Eueka Server获取服务注册列表来查找其他服务信息,获取服务列表后会在本地进行缓存,可以定期(默认30秒)更新服务注册列表信息
- 服务下线(Cancel)Eureka Client在shutdown时会向Eureka Server发送服务下线请求,以便Server将实例从注册表中删除
下面我们跟着源码梳理以上的相关功能代码流程,只做流程大概分析梳理,不具体到细节,说明部分在代码中通过注释的方式呈现。
在容器启动时,会加载com.netflix.discovery.DiscoveryClient并调用其构造方法,如下:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {// 省略部分代码// 如果配置不用注册到Eureka && 配置不用从注册中心获取配置,则不用初始化相关组件if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");scheduler = null;heartbeatExecutor = null;cacheRefreshExecutor = null;eurekaTransport = null;instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()// to work with DI'd DiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs = System.currentTimeMillis();logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",initTimestampMs, this.getApplications().size());return; // no need to setup up an network tasks and we are done}try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());// 发送心跳续约的线程池heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoff// 注册信息缓存刷新的线程池cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoff} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);} // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch// 初始化定时任务,服务心跳renew、服务注册、服务列表获取等功能在此处完成initScheduledTasks();}
接着看initScheduledTasks方法
private void initScheduledTasks() {
// 1.如果配置fetchRegistry=true,则定期执行获取服务列表的定时任务,默认30秒一次,通过registryFetchIntervalSeconds可以配置获取频率
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// 具体执行逻辑在CacheRefreshThread.run()方法,后面再看
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
} // 2.如果配置registerWithEureka=true,则定期向Eureka Server报送心跳,通过leaseRenewalIntervalInSeconds可以配置报送频率
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 具体执行逻辑在HeartbeatThread的run()方法,后面再看
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 3.启动instanceInfoReplicator,服务注册就在这里完成,后面再看
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
1)获取服务列表
跟踪CacheRefreshThread.run()方法
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting void refreshRegistry() {
try {
// 获取服务列表信息
boolean success = fetchRegistry(remoteRegionsModified);
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{ // 从Eureka Server全量获取服务列表缓存在本地
getAndStoreFullRegistry();
} else {
// 更新服务列表
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
return true;
}
2)服务心跳renew
跟踪HeartbeatThread.run()方法
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
3)服务注册
跟踪InstanceInfoReplicator.start()方法
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
// 这里设置标识,便于启动时完成服务注册
instanceInfo.setIsDirty();
// for initial register
// 实际上执行的是run()方法,看下面
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
// 在start()方法中设置了标识位,所以此处能满足条件调用register()方法进行服务注册,看下面
if (dirtyTimestamp != null) {
// 服务注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
} }
服务注册最终是调用register()方法
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
4)服务下线
在应用shutdown时,会调用com.netflix.discovery.DiscoveryClient的shutdown()方法,其内部完成了服务下线的功能。
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
// 设置应用状态为DOWN
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
// 服务下线
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
在服务注册、心跳renew、服务下线的方法调用中,通过debug发现实际上是调用AbstractJerseyEurekaHttpClient的相关方法实现。
// 服务注册register
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
// 心跳renew
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
// 服务下线cancel
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
以AbstractJerseyEurekaHttpClient类中服务注册的register()方法为例:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
} if (response != null) {
response.close();
}
}
}
serviceUrl就是配置文件eureka.client.serviceUrl.defaultZone = http://127.0.0.1:8761/eureka/,向Eureka Server请求实际上是通过Jersey框架完成。(Jersey是一个REST框架)
AbstractJerseyEurekaHttpClient类实现了EurekaHttpClient接口,接口中定义了服务注册与发现的相关方法。
public interface EurekaHttpClient {
EurekaHttpResponse<Void> register(InstanceInfo info);
EurekaHttpResponse<Void> cancel(String appName, String id);
EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info); EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);
EurekaHttpResponse<Applications> getApplications(String... regions);
EurekaHttpResponse<Applications> getDelta(String... regions);
EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions);
EurekaHttpResponse<Application> getApplication(String appName);
EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
EurekaHttpResponse<InstanceInfo> getInstance(String id);
void shutdown();
}
二、Eureka Server
Eureka Server服务端有提供接口给client端调用,分别在ApplicationResource,InstanceResource这两个类中。
Eureka Server做的事情主要包括:
- 维护服务注册信息列表
- 接收来自Eureka Client的register、renew、cancel请求
- Eureka Server多节点之间的数据复制同步
查看spring-cloud-netflix-eureka-server-x.x.x.RELEASE,在META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
在项目启动时EurekaServerAutoConfiguration会加载至spring容器
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
// Eureka Server的相关配置类
@Configuration
protected static class EurekaServerConfigBeanConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
// dashboard页面控制器
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
// 处理Eureka Client的register、renew、cancel等请求
//而InstanceRegistry 继承了PeerAwareInstanceRegistryImpl
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications();
// force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// 处理Eureka Server多节点同步
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
}
com.netflix.eureka.resources.ApplicationResource和com.netflix.eureka.resources.InstanceResource处理来自Eureka Client的register、renew、cancel等请求,
注册信息列表
以ApplicationResource中的服务注册接口为例。
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
实际上调用的是PeerAwareInstanceRegistryImpl中的register方法
private final PeerAwareInstanceRegistry registry;
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 省略相关代码
registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
// 204 to be backwards compatible
}
实际上调用的是PeerAwareInstanceRegistryImpl中的register方法
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//在自己内存中注册一份后还会同步注册到eureka从节点上
// 调用AbstractInstanceRegistry父类的方法
super.register(info, leaseDuration, isReplication);
// 向其他Eureka Server节点同步注册信息
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
}
其他renew、cancel逻辑也在AbstractInstanceRegistry里面。
/**
* Registers a new instance with a given duration.
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
其实可以看到最终信息是保存在一个Map容器中,key为applicaiton.name,值为Map,其中值Map的key为instanceId,因为可以有多个,所以用Map存储。
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
同步注册信息
注册后向其他Eureka Server节点同步注册信息
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/
获取服务列表获取服务列表
@GET
public Response getApplication(@PathParam("version") String version,
@HeaderParam("Accept") final String acceptHeader,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
if (!registry.shouldAllowAccess(false)) {
return Response.status(Status.FORBIDDEN).build();
}
EurekaMonitors.GET_APPLICATION.increment();
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = Key.KeyType.XML;
}
Key cacheKey = new Key(
Key.EntityType.Application,
appName,
keyType,
CurrentRequestVersion.get(),
EurekaAccept.fromString(eurekaAccept)
);
String payLoad = responseCache.get(cacheKey);
CurrentRequestVersion.remove();
if (payLoad != null) {
logger.debug("Found: {}", appName);
return Response.ok(payLoad).build();
} else {
logger.debug("Not Found: {}", appName);
return Response.status(Status.NOT_FOUND).build();
}
}
responseCache.get(cacheKey)
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
//当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
private final ConcurrentMap
可以看到这里会先从只读缓存中获取,如果获取不到再从读写缓存中获取,如果读写缓存获取不到就会从内容获取服务列表地址,这里可以看作就是用了读写分离的思想。
当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。
//当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
服务剔除
eureka在自动装配EurekaServerInitializerConfiguration
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
PeerAwareInstanceRegistryImpl中
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
AbstractInstanceRegistry中
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
/* visible for testing */ class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
<br />可以看到这里会每隔1分钟剔除心跳间隔超过心跳剔除阈值的服务,默认阈值90s。<br />到这里,我们知道eureka-server有读写缓存,eureka-client也有缓存,ribbon也有缓存**<br />如果一个服务非正常下线,客户端更新需要<br />eureka.instance.leaseExpirationDurationInSeconds_2+readOnlyCacheMap+client fetch interval+ribbon=180+30+30+30=270s,leaseExpirationDurationInSeconds_2的原因是因为renew()方法的bug,在lastUpdateTimestamp的时候多加了个leaseExpirationDurationInSeconds的时间。<br />一个服务正常上线感知也需要readOnlyCacheMap+client fetch interval+ribbon=30+30+30=90s<br />以上的时间间隔配置都是可以配置的<br />server端 ## 禁用readOnlyCacheMap 中小公司可以,因为本来走的内存,量不大不会慢 eureka.server.useReadOnlyResponseCache=false ## 自我保护模式坑比好处多,所以关闭它 eureka.server.enableSelfPreservation=false ## 剔除无效节点 默认60s执行一次 eureka.server.evictionIntervalTimerInMs=5000 client端 ## 心跳间隔,默认30s eureka.instance.leaseRenewalIntervalInSeconds=5 ## 没有心跳的淘汰时间,默认90s eureka.instance.leaseExpirationDurationInSeconds=10 # 定时刷新本地缓存时间 默认30s eureka.client.registryFetchIntervalSeconds=5 # ribbon缓存时间 默认30s ribbon.ServerListRefreshInterval=2000
