Eureka 核心步骤源码解析

一、Eureka Client

Eureka Client做的事情主要包括:

  • 服务注册(Register)Eureka Client会向Eureka Server进行服务注册,Client在我们的实际应用中可以是服务提供者或者是服务消费者
  • 服务续约(Renew)Eureka Client需要定期(默认30秒)向Eureka Server发送一次心跳来续订租约,以便让Eureka Server知道自已仍在运行,如果Eureka Server在90秒内未收到续订心跳,则会将Client实例从其注册表中删除
  • 服务注册列表获取(Fetch)Eureka Client需要从Eueka Server获取服务注册列表来查找其他服务信息,获取服务列表后会在本地进行缓存,可以定期(默认30秒)更新服务注册列表信息
  • 服务下线(Cancel)Eureka Client在shutdown时会向Eureka Server发送服务下线请求,以便Server将实例从注册表中删除

下面我们跟着源码梳理以上的相关功能代码流程,只做流程大概分析梳理,不具体到细节,说明部分在代码中通过注释的方式呈现。
在容器启动时,会加载com.netflix.discovery.DiscoveryClient并调用其构造方法,如下:

  1. @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
  2. // 省略部分代码
  3. // 如果配置不用注册到Eureka && 配置不用从注册中心获取配置,则不用初始化相关组件
  4. if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
  5. logger.info("Client configured to neither register nor query for data.");
  6. scheduler = null;
  7. heartbeatExecutor = null;
  8. cacheRefreshExecutor = null;
  9. eurekaTransport = null;
  10. instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
  11. // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  12. // to work with DI'd DiscoveryClient
  13. DiscoveryManager.getInstance().setDiscoveryClient(this);
  14. DiscoveryManager.getInstance().setEurekaClientConfig(config);
  15. initTimestampMs = System.currentTimeMillis();
  16. logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
  17. initTimestampMs, this.getApplications().size());
  18. return; // no need to setup up an network tasks and we are done
  19. }
  20. try {
  21. // default size of 2 - 1 each for heartbeat and cacheRefresh
  22. scheduler = Executors.newScheduledThreadPool(2,
  23. new ThreadFactoryBuilder()
  24. .setNameFormat("DiscoveryClient-%d")
  25. .setDaemon(true)
  26. .build()
  27. );
  28. // 发送心跳续约的线程池
  29. heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  30. new SynchronousQueue<Runnable>(),
  31. new ThreadFactoryBuilder()
  32. .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
  33. .setDaemon(true)
  34. .build()
  35. ); // use direct handoff
  36. // 注册信息缓存刷新的线程池
  37. cacheRefreshExecutor = new ThreadPoolExecutor(
  38. 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  39. new SynchronousQueue<Runnable>(),
  40. new ThreadFactoryBuilder()
  41. .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
  42. .setDaemon(true)
  43. .build()
  44. ); // use direct handoff
  45. } catch (Throwable e) {
  46. throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
  47. } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
  48. // 初始化定时任务,服务心跳renew、服务注册、服务列表获取等功能在此处完成
  49. initScheduledTasks();
  50. }

接着看initScheduledTasks方法

private void initScheduledTasks() {     
    // 1.如果配置fetchRegistry=true,则定期执行获取服务列表的定时任务,默认30秒一次,通过registryFetchIntervalSeconds可以配置获取频率 
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 
        scheduler.schedule(
            new TimedSupervisorTask( 
                "cacheRefresh",
                scheduler,  
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS, 
                expBackOffBound,
                // 具体执行逻辑在CacheRefreshThread.run()方法,后面再看
                new CacheRefreshThread() 
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS); 
    }     // 2.如果配置registerWithEureka=true,则定期向Eureka Server报送心跳,通过leaseRenewalIntervalInSeconds可以配置报送频率 
    if (clientConfig.shouldRegisterWithEureka()) { 
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); 
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); 
        // Heartbeat timer         
        scheduler.schedule(
        new TimedSupervisorTask(

            "heartbeat",                         
            scheduler,                         
            heartbeatExecutor,  
            renewalIntervalInSecs, 
            TimeUnit.SECONDS, 
            expBackOffBound,  
            // 具体执行逻辑在HeartbeatThread的run()方法,后面再看 
            new HeartbeatThread()
        ),  
            renewalIntervalInSecs, TimeUnit.SECONDS);
        // 3.启动instanceInfoReplicator,服务注册就在这里完成,后面再看
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {         
    logger.info("Not registering with Eureka server per configuration");
  }
}

再细看上述的3步流程

1)获取服务列表

跟踪CacheRefreshThread.run()方法

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    } 
}
@VisibleForTesting void refreshRegistry() {
    try {
        // 获取服务列表信息
        boolean success = fetchRegistry(remoteRegionsModified);
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    } 
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications        
        Applications applications = getApplications(); 
        if (clientConfig.shouldDisableDelta() 
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch 
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta 
        {             // 从Eureka Server全量获取服务列表缓存在本地
            getAndStoreFullRegistry();
        } else {
            // 更新服务列表
            getAndUpdateDelta(applications);
        } 
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally { 
        if (tracer != null) {
    tracer.stop();
    }    
}
return true; 
}

2)服务心跳renew

跟踪HeartbeatThread.run()方法

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); 
        }    
   } 
}
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
      try {
          httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
         logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); 
         if (httpResponse.getStatusCode() == 404) {
             REREGISTER_COUNTER.increment(); 
             logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
             long timestamp = instanceInfo.setIsDirtyWithTime();
             boolean success = register(); 
             if (success) { 
                 instanceInfo.unsetIsDirty(timestamp);
             }            
             return success;
         }        
        return httpResponse.getStatusCode() == 200; 
    } catch (Throwable e) { 
          logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); 
          return false;    
   }
}

3)服务注册

跟踪InstanceInfoReplicator.start()方法

public void start(int initialDelayMs) {
    if (started.compareAndSet(false, true)) {
           // 这里设置标识,便于启动时完成服务注册
        instanceInfo.setIsDirty();
        // for initial register 
        // 实际上执行的是run()方法,看下面
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next); 
    } 
}
public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        // 在start()方法中设置了标识位,所以此处能满足条件调用register()方法进行服务注册,看下面
        if (dirtyTimestamp != null) {
            // 服务注册
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }     
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); 
        scheduledPeriodicRef.set(next);
    } }

服务注册最终是调用register()方法

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) { 
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
     }     
    return httpResponse.getStatusCode() == 204;
}

4)服务下线

在应用shutdown时,会调用com.netflix.discovery.DiscoveryClient的shutdown()方法,其内部完成了服务下线的功能。

public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) { 
    logger.info("Shutting down DiscoveryClient ...");

     if (statusChangeListener != null && applicationInfoManager != null) {
          // 设置应用状态为DOWN
          applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
          // 服务下线
         unregister();
     }
     if (eurekaTransport != null) {
         eurekaTransport.shutdown();
     }
     heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();
        logger.info("Completed shut down of DiscoveryClient");
    }
}
void unregister() {
    // It can be null if shouldRegisterWithEureka == false 
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}

在服务注册、心跳renew、服务下线的方法调用中,通过debug发现实际上是调用AbstractJerseyEurekaHttpClient的相关方法实现。

// 服务注册register
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);

// 心跳renew
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);

// 服务下线cancel
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());

AbstractJerseyEurekaHttpClient类中服务注册的register()方法为例:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
            .header("Accept-Encoding", "gzip") 
            .type(MediaType.APPLICATION_JSON_TYPE)
            .accept(MediaType.APPLICATION_JSON)
            .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                         response == null ? "N/A" : response.getStatus());
        }             if (response != null) {
            response.close();
        }
    } 
}

serviceUrl就是配置文件eureka.client.serviceUrl.defaultZone = http://127.0.0.1:8761/eureka/,向Eureka Server请求实际上是通过Jersey框架完成。(Jersey是一个REST框架)
AbstractJerseyEurekaHttpClient类实现了EurekaHttpClient接口,接口中定义了服务注册与发现的相关方法。

public interface EurekaHttpClient {
    EurekaHttpResponse<Void> register(InstanceInfo info);
    EurekaHttpResponse<Void> cancel(String appName, String id); 
    EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);
    EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);     EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info); 
    EurekaHttpResponse<Applications> getApplications(String... regions);
    EurekaHttpResponse<Applications> getDelta(String... regions); 
    EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);
    EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions); 
    EurekaHttpResponse<Application> getApplication(String appName);
    EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);
    EurekaHttpResponse<InstanceInfo> getInstance(String id);
    void shutdown();
}

二、Eureka Server

Eureka Server服务端有提供接口给client端调用,分别在ApplicationResource,InstanceResource这两个类中。
Eureka Server做的事情主要包括:

  • 维护服务注册信息列表
  • 接收来自Eureka Client的register、renew、cancel请求
  • Eureka Server多节点之间的数据复制同步

查看spring-cloud-netflix-eureka-server-x.x.x.RELEASE,在META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

在项目启动时EurekaServerAutoConfiguration会加载至spring容器

public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {

    // Eureka Server的相关配置类
    @Configuration
    protected static class EurekaServerConfigBeanConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) { 
                // Set a sensible default if we are supposed to replicate
                server.setRegistrySyncRetries(5);
            } 
            return server;
        }
    }

    // dashboard页面控制器
    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }

    // 处理Eureka Client的register、renew、cancel等请求
    //而InstanceRegistry 继承了PeerAwareInstanceRegistryImpl
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications();
        // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                                    serverCodecs, this.eurekaClient,
                                    this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    } 

    // 处理Eureka Server多节点同步
    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
                                           ServerCodecs serverCodecs) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,                 
          this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
        } 
     }

com.netflix.eureka.resources.ApplicationResource和com.netflix.eureka.resources.InstanceResource处理来自Eureka Client的register、renew、cancel等请求,

注册信息列表

ApplicationResource中的服务注册接口为例。

@Produces({"application/xml", "application/json"})
public class ApplicationResource {

    实际上调用的是PeerAwareInstanceRegistryImpl中的register方法
    private final PeerAwareInstanceRegistry registry;

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        // 省略相关代码
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();
        // 204 to be backwards compatible
    }

    实际上调用的是PeerAwareInstanceRegistryImpl中的register方法
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //在自己内存中注册一份后还会同步注册到eureka从节点上
        // 调用AbstractInstanceRegistry父类的方法
        super.register(info, leaseDuration, isReplication); 
        // 向其他Eureka Server节点同步注册信息
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
}

其他renew、cancel逻辑也在AbstractInstanceRegistry里面。

    /**
     * Registers a new instance with a given duration.
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

其实可以看到最终信息是保存在一个Map容器中,key为applicaiton.name,值为Map,其中值Map的key为instanceId,因为可以有多个,所以用Map存储。

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

同步注册信息

注册后向其他Eureka Server节点同步注册信息
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);


    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }



/

获取服务列表获取服务列表

 @GET
 public Response getApplication(@PathParam("version") String version,
                                   @HeaderParam("Accept") final String acceptHeader,
                                   @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        if (!registry.shouldAllowAccess(false)) {
            return Response.status(Status.FORBIDDEN).build();
        }

        EurekaMonitors.GET_APPLICATION.increment();

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = Key.KeyType.XML;
        }

        Key cacheKey = new Key(
                Key.EntityType.Application,
                appName,
                keyType,
                CurrentRequestVersion.get(),
                EurekaAccept.fromString(eurekaAccept)
        );

        String payLoad = responseCache.get(cacheKey);
        CurrentRequestVersion.remove();

        if (payLoad != null) {
            logger.debug("Found: {}", appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }

responseCache.get(cacheKey)

public String get(final Key key) {
        return get(key, shouldUseReadOnlyResponseCache);
    }

 @VisibleForTesting
 String get(final Key key, boolean useReadOnlyCache) {
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
        }
}

@VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }


ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;

        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
        //当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }

private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();

可以看到这里会先从只读缓存中获取,如果获取不到再从读写缓存中获取,如果读写缓存获取不到就会从内容获取服务列表地址,这里可以看作就是用了读写分离的思想。
当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。

 //当然,这里还需要将缓存同步到只读缓存,因为服务下线的话读缓存中还是存在地址的。这里还有个同步任务。
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    } finally {
                        CurrentRequestVersion.remove();
                    }
                }
            }
        };
    }

任务是隔30s执行,将读写缓存中的数据同步到只读缓存中。

服务剔除

eureka在自动装配EurekaServerInitializerConfiguration

@Override
    public void start() {
        new Thread(() -> {
            try {
                // TODO: is this class even needed now?
                eurekaServerBootstrap.contextInitialized(
                        EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");

                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
            }
        }).start();
    }

    public void contextInitialized(ServletContext context) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();

            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

    protected void initEurekaServerContext() throws Exception {
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);

        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                    this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);

        log.info("Initialized server context");

        // Copy registry from neighboring eureka node
        int registryCount = this.registry.syncUp();
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

PeerAwareInstanceRegistryImpl中


    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfClientsSendingRenews = count;
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }

AbstractInstanceRegistry中


    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
/* visible for testing */ class EvictionTask extends TimerTask {

        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
  <br />可以看到这里会每隔1分钟剔除心跳间隔超过心跳剔除阈值的服务,默认阈值90s。<br />到这里,我们知道eureka-server有读写缓存,eureka-client也有缓存,ribbon也有缓存**<br />如果一个服务非正常下线,客户端更新需要<br />eureka.instance.leaseExpirationDurationInSeconds_2+readOnlyCacheMap+client fetch interval+ribbon=180+30+30+30=270s,leaseExpirationDurationInSeconds_2的原因是因为renew()方法的bug,在lastUpdateTimestamp的时候多加了个leaseExpirationDurationInSeconds的时间。<br />一个服务正常上线感知也需要readOnlyCacheMap+client fetch interval+ribbon=30+30+30=90s<br />以上的时间间隔配置都是可以配置的<br />server端 ## 禁用readOnlyCacheMap 中小公司可以,因为本来走的内存,量不大不会慢 eureka.server.useReadOnlyResponseCache=false ## 自我保护模式坑比好处多,所以关闭它 eureka.server.enableSelfPreservation=false ## 剔除无效节点  默认60s执行一次 eureka.server.evictionIntervalTimerInMs=5000 client端 ## 心跳间隔,默认30s eureka.instance.leaseRenewalIntervalInSeconds=5 ## 没有心跳的淘汰时间,默认90s eureka.instance.leaseExpirationDurationInSeconds=10 # 定时刷新本地缓存时间 默认30s eureka.client.registryFetchIntervalSeconds=5 # ribbon缓存时间 默认30s ribbon.ServerListRefreshInterval=2000