前面分析了Eureka Server的功能,下面看下Eureka Client的功能实现

  • 注册到Eureka Server
  • 从Eureka Server中获取服务列表

1.注册到Eureka Server

我们在使用client的时候,需要使用@EnableDiscoveryClient注解表明自己是一个client,我们通过追踪这个注解类的时候发现和DiscoveryClient绑定了。

  1. Annotation to enable a DiscoveryClient implementation.

查看DiscoveryClient类有一个register方法

  1. /**
  2. * Register with the eureka service by making the appropriate REST call.
  3. * 通过REST调用向eureka服务注册
  4. */
  5. boolean register() throws Throwable {
  6. logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  7. EurekaHttpResponse<Void> httpResponse;
  8. try {
  9. httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  10. } catch (Exception e) {
  11. logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
  12. throw e;
  13. }
  14. if (logger.isInfoEnabled()) {
  15. logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  16. }
  17. return httpResponse.getStatusCode() == 204;
  18. }

这个 registrationClient 有很多实现,默认的实现是 SessionedEurekaHttpClient。当返回 204 的时候,表示注册成功。

DiscoveryClient里面维护了一个两个定时任务,

  • 定时向注册中心拉取服务(包括全量拉取、增量拉取)(默认30秒)
  • 定时发送心跳连接(默认30秒)

    1. private void initScheduledTasks() {
    2. if (clientConfig.shouldFetchRegistry()) {
    3. // registry cache refresh timer
    4. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
    5. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    6. scheduler.schedule(
    7. new TimedSupervisorTask(
    8. "cacheRefresh",
    9. scheduler,
    10. cacheRefreshExecutor,
    11. registryFetchIntervalSeconds,
    12. TimeUnit.SECONDS,
    13. expBackOffBound,
    14. // 向注册中心拉取服务
    15. new CacheRefreshThread()
    16. ),
    17. registryFetchIntervalSeconds, TimeUnit.SECONDS);
    18. }
    19. if (clientConfig.shouldRegisterWithEureka()) {
    20. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    21. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    22. logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    23. // Heartbeat timer
    24. scheduler.schedule(
    25. new TimedSupervisorTask(
    26. "heartbeat",
    27. scheduler,
    28. heartbeatExecutor,
    29. renewalIntervalInSecs,
    30. TimeUnit.SECONDS,
    31. expBackOffBound,
    32. // 心跳连接
    33. new HeartbeatThread()
    34. ),
    35. renewalIntervalInSecs, TimeUnit.SECONDS);
    36. // InstanceInfo replicator
    37. instanceInfoReplicator = new InstanceInfoReplicator(
    38. this,
    39. instanceInfo,
    40. clientConfig.getInstanceInfoReplicationIntervalSeconds(),
    41. 2); // burstSize
    42. statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    43. @Override
    44. public String getId() {
    45. return "statusChangeListener";
    46. }
    47. @Override
    48. public void notify(StatusChangeEvent statusChangeEvent) {
    49. if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
    50. InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
    51. // log at warn level if DOWN was involved
    52. logger.warn("Saw local status change event {}", statusChangeEvent);
    53. } else {
    54. logger.info("Saw local status change event {}", statusChangeEvent);
    55. }
    56. instanceInfoReplicator.onDemandUpdate();
    57. }
    58. };
    59. if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    60. applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    61. }
    62. instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    63. } else {
    64. logger.info("Not registering with Eureka server per configuration");
    65. }
    66. }

    2.心跳连接

    下面看下心跳连接

    1. private class HeartbeatThread implements Runnable {
    2. public void run() {
    3. // 通过适当的REST调用续订eureka服务
    4. if (renew()) {
    5. lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
    6. }
    7. }
    8. }
    1. boolean renew() {
    2. EurekaHttpResponse<InstanceInfo> httpResponse;
    3. try {
    4. httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
    5. logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
    6. if (httpResponse.getStatusCode() == 404) {
    7. REREGISTER_COUNTER.increment();
    8. logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
    9. long timestamp = instanceInfo.setIsDirtyWithTime();
    10. // 向注册中心发送注册服务
    11. boolean success = register();
    12. if (success) {
    13. instanceInfo.unsetIsDirty(timestamp);
    14. }
    15. return success;
    16. }
    17. return httpResponse.getStatusCode() == 200;
    18. } catch (Throwable e) {
    19. logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
    20. return false;
    21. }
    22. }

    3.向注册中心拉取服务

    客户端向注册中心拉取服务分为全量拉取和增量拉取
    下面看下拉取服务代码

    1. // 向注册中心拉取服务
    2. boolean success = fetchRegistry(remoteRegionsModified);
    1. private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    2. Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    3. try {
    4. // If the delta is disabled or if it is the first time, get all
    5. // applications
    6. // 拿到本地Eureka注册信息缓存
    7. Applications applications = getApplications();
    8. if (clientConfig.shouldDisableDelta()
    9. || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
    10. || forceFullRegistryFetch
    11. || (applications == null)
    12. || (applications.getRegisteredApplications().size() == 0)
    13. || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
    14. {
    15. logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
    16. logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
    17. logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
    18. logger.info("Application is null : {}", (applications == null));
    19. logger.info("Registered Applications size is zero : {}",
    20. (applications.getRegisteredApplications().size() == 0));
    21. logger.info("Application version is -1: {}", (applications.getVersion() == -1));
    22. // 全量拉取
    23. getAndStoreFullRegistry();
    24. } else {
    25. // 增量拉取
    26. getAndUpdateDelta(applications);
    27. }
    28. applications.setAppsHashCode(applications.getReconcileHashCode());
    29. logTotalInstances();
    30. } catch (Throwable e) {
    31. logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
    32. return false;
    33. } finally {
    34. if (tracer != null) {
    35. tracer.stop();
    36. }
    37. }
    38. // Notify about cache refresh before updating the instance remote status
    39. onCacheRefreshed();
    40. // Update remote status based on refreshed data held in the cache
    41. updateInstanceRemoteStatus();
    42. // registry was fetched successfully, so return true
    43. return true;
    44. }

    3.1全量拉取

    当客户端第一启动时会向注册中心全量拉取所有的服务

    1. private void getAndStoreFullRegistry() throws Throwable {
    2. long currentUpdateGeneration = fetchRegistryGeneration.get();
    3. logger.info("Getting all instance registry info from the eureka server");
    4. Applications apps = null;
    5. EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
    6. ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
    7. : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    8. if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
    9. apps = httpResponse.getEntity();
    10. }
    11. logger.info("The response status is {}", httpResponse.getStatusCode());
    12. if (apps == null) {
    13. logger.error("The application is null for some reason. Not storing this information");
    14. } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    15. localRegionApps.set(this.filterAndShuffle(apps));
    16. logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    17. } else {
    18. logger.warn("Not updating applications as another thread is updating it already");
    19. }
    20. }

3.2增量拉取

把Eureka Server注册中心最近(3分钟内)更新的注册信息拉取下来

  1. private void getAndUpdateDelta(Applications applications) throws Throwable {
  2. long currentUpdateGeneration = fetchRegistryGeneration.get();
  3. Applications delta = null;
  4. EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  5. if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
  6. delta = httpResponse.getEntity();
  7. }
  8. if (delta == null) {
  9. logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
  10. + "Hence got the full registry.");
  11. getAndStoreFullRegistry();
  12. } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
  13. logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
  14. String reconcileHashCode = "";
  15. if (fetchRegistryUpdateLock.tryLock()) {
  16. try {
  17. updateDelta(delta);
  18. reconcileHashCode = getReconcileHashCode(applications);
  19. } finally {
  20. fetchRegistryUpdateLock.unlock();
  21. }
  22. } else {
  23. logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
  24. }
  25. // There is a diff in number of instances for some reason
  26. if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
  27. reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
  28. }
  29. } else {
  30. logger.warn("Not updating application delta as another thread is updating it already");
  31. logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
  32. }
  33. }