前面分析了Eureka Server的功能,下面看下Eureka Client的功能实现
- 注册到Eureka Server
- 从Eureka Server中获取服务列表
1.注册到Eureka Server
我们在使用client的时候,需要使用@EnableDiscoveryClient注解表明自己是一个client,我们通过追踪这个注解类的时候发现和DiscoveryClient绑定了。
Annotation to enable a DiscoveryClient implementation.
查看DiscoveryClient类有一个register方法
/*** Register with the eureka service by making the appropriate REST call.* 通过REST调用向eureka服务注册*/boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;}
这个 registrationClient 有很多实现,默认的实现是 SessionedEurekaHttpClient。当返回 204 的时候,表示注册成功。
DiscoveryClient里面维护了一个两个定时任务,
- 定时向注册中心拉取服务(包括全量拉取、增量拉取)(默认30秒)
定时发送心跳连接(默认30秒)
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,// 向注册中心拉取服务new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,// 心跳连接new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
2.心跳连接
下面看下心跳连接
private class HeartbeatThread implements Runnable {public void run() {// 通过适当的REST调用续订eureka服务if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();// 向注册中心发送注册服务boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}
3.向注册中心拉取服务
客户端向注册中心拉取服务分为全量拉取和增量拉取
下面看下拉取服务代码// 向注册中心拉取服务boolean success = fetchRegistry(remoteRegionsModified);
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applications// 拿到本地Eureka注册信息缓存Applications applications = getApplications();if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));// 全量拉取getAndStoreFullRegistry();} else {// 增量拉取getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);return false;} finally {if (tracer != null) {tracer.stop();}}// Notify about cache refresh before updating the instance remote statusonCacheRefreshed();// Update remote status based on refreshed data held in the cacheupdateInstanceRemoteStatus();// registry was fetched successfully, so return truereturn true;}
3.1全量拉取
当客户端第一启动时会向注册中心全量拉取所有的服务
private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());if (apps == null) {logger.error("The application is null for some reason. Not storing this information");} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}
3.2增量拉取
把Eureka Server注册中心最近(3分钟内)更新的注册信息拉取下来
private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+ "Hence got the full registry.");getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());String reconcileHashCode = "";if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall}} else {logger.warn("Not updating application delta as another thread is updating it already");logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());}}
