微服务三要素:业务建模、技术组件和研发过程

  1. 业务建模

针对服务建模,首先需要明确服务类别,以及服务与业务之间的关系,尽可能的明确领域的边界;针对服务建模,使用领域驱动设计方法(DDD),通过识别领域中各个子域、判断这些子域是否独立、考虑子域与子域的交互关系,从而明确各个界限上下文之间的边界。在业界对于领域的划分:核心子域、支撑子域和通用子域三种类型,其中系统中的

  1. 技术组件
  2. 研发过程

注册中心(Eureka)

Eureka服务器端(服务注册服务端)

对于注册中心服务器而言,服务注册、续约、取消和剔除等不同的操作所执行的工作流程基本一致,即都是对服务存储的操作,并把这一操作同步到其他Eureka节点。

单机注册中心的注册调用类 AbstractInstanceRegistry.class

  1. /**
  2. * Registers a new instance with a given duration.
  3. * 在一定时间内注册一个新实例
  4. * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
  5. */
  6. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  7. try {
  8. read.lock();
  9. //从已存储的registry获取一个服务定义
  10. Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  11. REGISTER.increment(isReplication);
  12. if (gMap == null) {
  13. //初始化一个ConcurrentHashMap<String, Lease<InstanceInfo>>,并放入registry中
  14. final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
  15. gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
  16. if (gMap == null) {
  17. gMap = gNewMap;
  18. }
  19. }
  20. //根据当前注册的ID找到对应的Lease
  21. Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
  22. // Retain the last dirty timestamp without overwriting it, if there is already a lease
  23. //如果Lease能找到,根据当前节点的最新更新时间和注册节点的最新更新时间比较,如果前者的时间
  24. //晚于后者的时间,那么注册实例就以存在的实例为准
  25. if (existingLease != null && (existingLease.getHolder() != null)) {
  26. Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
  27. Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
  28. logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  29. // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
  30. // InstanceInfo instead of the server local copy.
  31. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
  32. logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
  33. " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  34. logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
  35. registrant = existingLease.getHolder();
  36. }
  37. } else {
  38. // The lease does not exist and hence it is a new registration
  39. //如果找不到,代表是一个新注册,则更新其每分钟期望得续约数量及其阈值
  40. synchronized (lock) {
  41. if (this.expectedNumberOfClientsSendingRenews > 0) {
  42. // Since the client wants to register it, increase the number of clients sending renews
  43. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
  44. updateRenewsPerMinThreshold();
  45. }
  46. }
  47. logger.debug("No previous lease information found; it is new registration");
  48. }
  49. //创建一个新的Lease并放入Map中
  50. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
  51. if (existingLease != null) {
  52. lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
  53. }
  54. gMap.put(registrant.getId(), lease);
  55. synchronized (recentRegisteredQueue) {
  56. recentRegisteredQueue.add(new Pair<Long, String>(
  57. System.currentTimeMillis(),
  58. registrant.getAppName() + "(" + registrant.getId() + ")"));
  59. }
  60. // This is where the initial state transfer of overridden status happens
  61. if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
  62. logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
  63. + "overrides", registrant.getOverriddenStatus(), registrant.getId());
  64. if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
  65. logger.info("Not found overridden id {} and hence adding it", registrant.getId());
  66. overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
  67. }
  68. }
  69. InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
  70. if (overriddenStatusFromMap != null) {
  71. logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
  72. registrant.setOverriddenStatus(overriddenStatusFromMap);
  73. }
  74. // Set the status based on the overridden status rules
  75. InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
  76. registrant.setStatusWithoutDirty(overriddenInstanceStatus);
  77. // If the lease is registered with UP status, set lease service up timestamp
  78. if (InstanceStatus.UP.equals(registrant.getStatus())) {
  79. lease.serviceUp();
  80. }
  81. //处理服务的InstanceStatus
  82. registrant.setActionType(ActionType.ADDED);
  83. recentlyChangedQueue.add(new RecentlyChangedItem(lease));
  84. //更新服务最新 更新时间
  85. registrant.setLastUpdatedTimestamp();
  86. //刷新缓存
  87. invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
  88. logger.info("Registered instance {}/{} with status {} (replication={})",
  89. registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
  90. } finally {
  91. read.unlock();
  92. }
  93. }

在注册集群下,使用的注册类为PeerAwareInstanceRegistry.class

  1. //继承了注册类通用接口,
  2. public interface PeerAwareInstanceRegistry extends InstanceRegistry {
  3. void init(PeerEurekaNodes peerEurekaNodes) throws Exception;
  4. /**
  5. * Populates the registry information from a peer eureka node. This
  6. * operation fails over to other nodes until the list is exhausted if the
  7. * communication fails.
  8. */
  9. int syncUp();
  10. /**
  11. * Checks to see if the registry access is allowed or the server is in a
  12. * situation where it does not all getting registry information. The server
  13. * does not return registry information for a period specified in
  14. * {@link com.netflix.eureka.EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}, if it cannot
  15. * get the registry information from the peer eureka nodes at start up.
  16. *
  17. * @return false - if the instances count from a replica transfer returned
  18. * zero and if the wait time has not elapsed, otherwise returns true
  19. */
  20. boolean shouldAllowAccess(boolean remoteRegionRequired);
  21. //在注册集群下,使用的注册方法
  22. void register(InstanceInfo info, boolean isReplication);
  23. void statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication);
  24. }
  1. /**
  2. * Registers the information about the {@link InstanceInfo} and replicates
  3. * this information to all peer eureka nodes. If this is replication event
  4. * from other replica nodes then it is not replicated.
  5. *
  6. * @param info
  7. * the {@link InstanceInfo} to be registered and replicated.
  8. * @param isReplication
  9. * true if this is a replication event from other replica nodes,
  10. * false otherwise.
  11. */
  12. @Override
  13. public void register(final InstanceInfo info, final boolean isReplication) {
  14. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  15. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  16. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  17. }
  18. super.register(info, leaseDuration, isReplication);
  19. //注册中心集群节点间注册信息同步
  20. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  21. }
  22. /**
  23. * Replicates all eureka actions to peer eureka nodes except for replication
  24. * traffic to this node.
  25. * 节点间进行同步
  26. */
  27. private void replicateToPeers(Action action, String appName, String id,
  28. InstanceInfo info /* optional */,
  29. InstanceStatus newStatus /* optional */, boolean isReplication) {
  30. Stopwatch tracer = action.getTimer().start();
  31. try {
  32. if (isReplication) {
  33. numberOfReplicationsLastMin.increment();
  34. }
  35. // If it is a replication already, do not replicate again as this will create a poison replication
  36. if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
  37. return;
  38. }
  39. for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
  40. // If the url represents this host, do not replicate to yourself.
  41. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
  42. continue;
  43. }
  44. replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
  45. }
  46. } finally {
  47. tracer.stop();
  48. }
  49. }

Eureka的设计与实现上的技巧,也就是high level api 和low level api,如下图所示:
Spring cloud - 图1
针对high level api,主要是通过装饰器模式进行一系列包装,从而创建目标EurekaHttpClient。关于low level api的话,主要是HTTP远程调用的实现,Netflix提供的是基于Jersey的版本,而Spring Cloud则是提供了基于RestTemplate的版本。

服务消费者操作源码解析

对于Eureka而言,作为客户端组件的DiscoveryClient同样具备这种缓存功能。
Eureka客户端通过定时任务完成缓存刷新操作,在DiscoveryClient中的initScheduledTasks方法用于初始化各种调度任务,对于缓存刷新,调度器的初始化过程如下:

  1. /**
  2. * Initializes all scheduled tasks.
  3. */
  4. private void initScheduledTasks() {
  5. if (clientConfig.shouldFetchRegistry()) {
  6. // registry cache refresh timer
  7. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
  8. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
  9. scheduler.schedule(
  10. new TimedSupervisorTask(
  11. "cacheRefresh",
  12. scheduler,
  13. cacheRefreshExecutor,
  14. registryFetchIntervalSeconds,
  15. TimeUnit.SECONDS,
  16. expBackOffBound,
  17. new CacheRefreshThread()
  18. ),
  19. registryFetchIntervalSeconds, TimeUnit.SECONDS);
  20. }
  21. if (clientConfig.shouldRegisterWithEureka()) {
  22. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
  23. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
  24. logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
  25. // Heartbeat timer
  26. scheduler.schedule(
  27. new TimedSupervisorTask(
  28. "heartbeat",
  29. scheduler,
  30. heartbeatExecutor,
  31. renewalIntervalInSecs,
  32. TimeUnit.SECONDS,
  33. expBackOffBound,
  34. new HeartbeatThread()
  35. ),
  36. renewalIntervalInSecs, TimeUnit.SECONDS);
  37. // InstanceInfo replicator
  38. instanceInfoReplicator = new InstanceInfoReplicator(
  39. this,
  40. instanceInfo,
  41. clientConfig.getInstanceInfoReplicationIntervalSeconds(),
  42. 2); // burstSize
  43. statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
  44. @Override
  45. public String getId() {
  46. return "statusChangeListener";
  47. }
  48. @Override
  49. public void notify(StatusChangeEvent statusChangeEvent) {
  50. if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
  51. InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
  52. // log at warn level if DOWN was involved
  53. logger.warn("Saw local status change event {}", statusChangeEvent);
  54. } else {
  55. logger.info("Saw local status change event {}", statusChangeEvent);
  56. }
  57. instanceInfoReplicator.onDemandUpdate();
  58. }
  59. };
  60. if (clientConfig.shouldOnDemandUpdateStatusChange()) {
  61. applicationInfoManager.registerStatusChangeListener(statusChangeListener);
  62. }
  63. instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  64. } else {
  65. logger.info("Not registering with Eureka server per configuration");
  66. }
  67. }

显然,这里启动了一个调度任务并通过CacheRefreshThread线程完成具体操作,CacheRefreshThread线程定义如下:

  1. /**
  2. * The task that fetches the registry information at specified intervals.
  3. *
  4. */
  5. class CacheRefreshThread implements Runnable {
  6. public void run() {
  7. refreshRegistry();
  8. }
  9. }

对于服务消费者,最重要的操作是获取服务注册信息。在这里的refreshRegistry方法中,在进行一系列的校验后,最终调用fetchRegistry方法完成注册信息更新,该方法代码如下:

  1. /**
  2. * Fetches the registry information.
  3. *
  4. * <p>
  5. * This method tries to get only deltas after the first fetch unless there
  6. * is an issue in reconciling eureka server and client registry information.
  7. * </p>
  8. *
  9. * @param forceFullRegistryFetch Forces a full registry fetch.
  10. *
  11. * @return true if the registry was fetched
  12. */
  13. private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  14. Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  15. try {
  16. // If the delta is disabled or if it is the first time, get all
  17. // applications 获取应用
  18. Applications applications = getApplications();
  19. //如果满足全部条件,则会全量拉取服务实例数据
  20. if (clientConfig.shouldDisableDelta()
  21. || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
  22. || forceFullRegistryFetch
  23. || (applications == null)
  24. || (applications.getRegisteredApplications().size() == 0)
  25. || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
  26. {
  27. logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
  28. logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
  29. logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
  30. logger.info("Application is null : {}", (applications == null));
  31. logger.info("Registered Applications size is zero : {}",
  32. (applications.getRegisteredApplications().size() == 0));
  33. logger.info("Application version is -1: {}", (applications.getVersion() == -1));
  34. getAndStoreFullRegistry();
  35. } else {
  36. //增量拉取服务实例数据
  37. getAndUpdateDelta(applications);
  38. }
  39. //重新计算和设置一致性hashcode
  40. applications.setAppsHashCode(applications.getReconcileHashCode());
  41. logTotalInstances();
  42. } catch (Throwable e) {
  43. logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
  44. return false;
  45. } finally {
  46. if (tracer != null) {
  47. tracer.stop();
  48. }
  49. }
  50. // Notify about cache refresh before updating the instance remote status
  51. //刷新本地缓存
  52. onCacheRefreshed();
  53. // Update remote status based on refreshed data held in the cache
  54. //更新远程服务实例运行状态
  55. updateInstanceRemoteStatus();
  56. // registry was fetched successfully, so return true
  57. return true;
  58. }

全量更新方法如下:

  1. /**
  2. * Gets the full registry information from the eureka server and stores it locally.
  3. * When applying the full registry, the following flow is observed:
  4. *
  5. * if (update generation have not advanced (due to another thread))
  6. * atomically set the registry to the new registry
  7. * fi
  8. *
  9. * @return the full registry information.
  10. * @throws Throwable
  11. * on error.
  12. */
  13. private void getAndStoreFullRegistry() throws Throwable {
  14. long currentUpdateGeneration = fetchRegistryGeneration.get();
  15. logger.info("Getting all instance registry info from the eureka server");
  16. Applications apps = null;
  17. EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
  18. ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
  19. : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
  20. if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
  21. apps = httpResponse.getEntity();
  22. }
  23. logger.info("The response status is {}", httpResponse.getStatusCode());
  24. if (apps == null) {
  25. logger.error("The application is null for some reason. Not storing this information");
  26. } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
  27. localRegionApps.set(this.filterAndShuffle(apps));
  28. logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
  29. } else {
  30. logger.warn("Not updating applications as another thread is updating it already");
  31. }
  32. }

以下我们重点介绍一下getAndUpdateDelta()方法,重点学习一下eureka中如何实现增量数据更新的设计技巧,方法代码如下:

  1. /**
  2. * Get the delta registry information from the eureka server and update it locally.
  3. * When applying the delta, the following flow is observed:
  4. *
  5. * if (update generation have not advanced (due to another thread))
  6. * atomically try to: update application with the delta and get reconcileHashCode
  7. * abort entire processing otherwise
  8. * do reconciliation if reconcileHashCode clash
  9. * fi
  10. *
  11. * @return the client response
  12. * @throws Throwable on error
  13. */
  14. private void getAndUpdateDelta(Applications applications) throws Throwable {
  15. long currentUpdateGeneration = fetchRegistryGeneration.get();
  16. Applications delta = null;
  17. //通过eurekaTransport.queryClient获取增量信息
  18. EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  19. if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
  20. delta = httpResponse.getEntity();
  21. }
  22. if (delta == null) {
  23. logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
  24. + "Hence got the full registry.");
  25. //如果增量信息为空,就直接发起一次全量更新
  26. getAndStoreFullRegistry();
  27. } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
  28. logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
  29. String reconcileHashCode = "";
  30. if (fetchRegistryUpdateLock.tryLock()) {
  31. try {
  32. //比对从服务器端返回的增量数据和本地数据,合并两者的差异数据
  33. updateDelta(delta);
  34. //用合并了增量数据之后的本地数据生成一致性hashcode
  35. reconcileHashCode = getReconcileHashCode(applications);
  36. } finally {
  37. fetchRegistryUpdateLock.unlock();
  38. }
  39. } else {
  40. logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
  41. }
  42. // There is a diff in number of instances for some reason
  43. //比较本地数据中的hashcode和来自服务器端的hashcode
  44. if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
  45. //如果hashcode不一致,就触发远程调用进行全量更新
  46. reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
  47. }
  48. } else {
  49. logger.warn("Not updating application delta as another thread is updating it already");
  50. logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
  51. }
  52. }

回顾Eureka服务器端基本原理,我们知道Eureka服务器端会保存一个服务注册列表缓存。Eureka官方文档提到这个数据保留时间是三分钟,而Eureka客户端的定时调度机制会每个30秒刷新本地缓存。原则上,只要Eureka客户端不停地获取服务器端的更新数据,就能保证自己的数据和Eureka服务器端保持一致。但如果客户端没有在3分钟之内没有获取更新数据,就会导致自身与服务器端的数据不一致。这就是这种更新机制所必须考虑的问题,在设计此类场景时需要注意的一个点。
对于上述产生的问题,Eureka采用了一致性hashcode方法解决。Eureka服务器端每次返回的增量数据中都会带有一个一致性hashcode,这个hashcode会与eureka客户端本地服务列表数据算出的hashcode进行对比,如果不一致则表示增量更新出了问题,这时候需要执行一次全量更新。
在eureka中,计算一致性hashcode的方法如下所示,该方法基于服务注册实例信息完成编码计算过程,最终返回一个String类型的计算结果:

  1. private String getReconcileHashCode(Applications applications) {
  2. TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
  3. if (isFetchingRemoteRegionRegistries()) {
  4. for (Applications remoteApp : remoteRegionVsApps.values()) {
  5. remoteApp.populateInstanceCountMap(instanceCountMap);
  6. }
  7. }
  8. applications.populateInstanceCountMap(instanceCountMap);
  9. return Applications.getReconcileHashCode(instanceCountMap);
  10. }

总结

Eureka客户端缓存定时更新流程如下图所示,可以看到他与服务注册的流程基本一致,也就是说在Eureka中,服务提供者和服务消费者作为Eureka服务器的客户端采用了同一套体系完成与服务器端的交互。
**Spring cloud - 图2
Eureka缓存刷新流程时序图

负载均衡(Ribbon)

使用Ribbon实现客户端负载均衡

Ribbon组件是客户端的一个负载均衡器,Ribbon会自动基于内置的负载均衡算法去连接服务实例,需要嵌入到服务消费者内部使用。

基于Ribbon实现负载均衡的方式主要包括两种:

  1. 使用@LoadBalanced注解

该注解使用于修饰发起Http请求的RestTemplate(org.springframework.web.client.RestTemplate)工具类,并在该工具类中自动嵌入客户端负载均衡功能。开发人员不需要针对负载均衡做特殊开发或配置。以下是使用@LoadBalanced注解样例:

  1. @SpringBootApplication
  2. @EnableEurekaClient
  3. public class InterventionApplication {
  4. @LoadBalanced
  5. @Bean
  6. public RestTemplate getRestTemplate(){
  7. return new RestTemplate();
  8. }
  9. public static void main(String[] args) {
  10. SpringApplication.run(InterventionApplication.class, args);
  11. }
  12. }
  1. 使用@RibbonClient注解

开发人员使用该注解可以针对负载均衡做特殊的开发或配置,可以选择使用的负载均衡算法,@LoadBalanced注解默认使用Ribbon组件提供的轮询策略进行负载均衡。
使用@RibbonClient注解,首先需要创建一个独立的配置类,用来制定具体的负载均衡规则。

  1. @Configuration
  2. public class SpringHealthLoadBalanceConfig{
  3. @Autowired
  4. IClientConfig config;
  5. @Bean
  6. @ConditionalOnMissingBean
  7. public IRule springHealthRule(IClientConfig config) {
  8. return new RandomRule();
  9. }
  10. }

然后在RestTemplate上使用@RibbonCient注解制定自己定义的负载均衡规则。

  1. @SpringBootApplication
  2. @EnableEurekaClient
  3. @RibbonClient(name = "userservice", configuration = SpringHealthLoadBalanceConfig.class)
  4. public class InterventionApplication{
  5. @Bean
  6. @LoadBalanced
  7. public RestTemplate restTemplate(){
  8. return new RestTemplate();
  9. }
  10. public static void main(String[] args) {
  11. SpringApplication.run(InterventionApplication.class, args);
  12. }
  13. }

Ribbon中负载均衡策略

Ribbon中的负载均衡策略,分为两大类,一种是静态负载均衡算法,另一种是动态负载均衡算法。

静态负载均衡算法

主要包含随机(Random)、轮询(Round Ribbon)和加权轮询(Weighted Round Ribbon)算法

动态负载均衡算法

静态的算法设计权重,就可以转化为动态算法,典型的动态算法有 IP哈希算法、最少连接数算法、服务调用时延算法等

BestAvailableRule算法

选择一个并发请求量最小的服务器,逐个考察服务器然后选择其中活跃请求数最小的服务器。

WeightedResponseTimeRule算法

根据服务器响应时间加权重,服务器响应时间与权重成反比。服务器响应时间越长,权重越小;反之,服务器响应时间越短,权重越大。响应时间的计算依赖于ILoadBalancer接口中的LoadBalancerStats。WeightedResponseTimeRule会定时从LoadBalancerStats读取平均响应时间,为每个服务更新权重。权重的计算,每次请求的响应时间减去每个服务自己平均的响应时间就是该服务的权重。
该算法核心代码如下:

  1. class ServerWeight {
  2. ServerWeight() {
  3. }
  4. public void maintainWeights() {
  5. ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
  6. if (lb != null) {
  7. if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
  8. try {
  9. WeightedResponseTimeRule.logger.info("Weight adjusting job started");
  10. AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
  11. //从ILoadBalancer获取LoadBalancerStats
  12. LoadBalancerStats stats = nlb.getLoadBalancerStats();
  13. if (stats != null) {
  14. double totalResponseTime = 0.0D;
  15. ServerStats ss;
  16. for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
  17. Server server = (Server)var6.next();
  18. ss = stats.getSingleServerStat(server);
  19. }
  20. Double weightSoFar = 0.0D;
  21. List<Double> finalWeights = new ArrayList();
  22. Iterator var20 = nlb.getAllServers().iterator();
  23. while(var20.hasNext()) {
  24. Server serverx = (Server)var20.next();
  25. ServerStats ssx = stats.getSingleServerStat(serverx);
  26. //计算权重
  27. double weight = totalResponseTime - ssx.getResponseTimeAvg();
  28. weightSoFar = weightSoFar + weight;
  29. finalWeights.add(weightSoFar);
  30. }
  31. WeightedResponseTimeRule.this.setWeights(finalWeights);
  32. return;
  33. }
  34. } catch (Exception var16) {
  35. WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
  36. return;
  37. } finally {
  38. WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
  39. }
  40. }
  41. }
  42. }
  43. }

AvailabilityFilteringRule算法

通过检查LoadBalancerStats中记录的各个服务器的运行状态,过滤掉那些处于一直连接失败或处于高并发状态下的服务器。

API网关

网关是Spring Cloud核心组件,在微服务架构中,API 网关起到了客户端与微服务之间的隔离作用。在Spring Cloud中,针对API网关的实现提供了两种解决方案,一种是集成Netflix中的Zuul网关,另一种是自研的Spring Cloud Gateway。

Zuul

使用Zuul如何构建一个网关

  1. 引入maven依赖

    1. <dependency>
    2. <groupId>org.springframework.cloud</groupId>
    3. <artifactId>spring-cloud-starter-netflix-zuul</artifactId>
    4. </dependency>
  2. 创建启动类

    1. @SpringBootApplication
    2. @EnableZuulProxy
    3. public class ZuulServerApplication {
    4. public static void main(String[] args) {
    5. SpringApplication.run(ZuulServerApplication.class, args);
    6. }
    7. }

    如何使用Zuul实现服务路由

    对于API网关,最重要的功能就是服务路由。即通过zuul网关的请求会路由并转发到对应的后端服务。
    格式如下:

    1. http://zuulservice:5555/service //其中zuulservice是zuul的服务地址,service是所对应的后端服务

    基于服务发现映射服务路由

    Zuul可以基于服务注册中心的服务发现机制实现自动化服务路由功能。所以使用zuul实现路由服务最常见的、最推荐的做法就是利用这种自动化的路由映射关系来确定路由信息。这种方式借助注册中心,注册中心上注册的服务信息透明可见。

基于动态配置映射服务路由

这种方式对于开发人员或运维人员在系统映射上有定制化要求时使用,弥补基于服务发现映射服务路由的不足之处。
这种方式是在application.yml配置文件中设置自定义的路由名称,如下:

  1. zuul:
  2. # 服务前缀名称
  3. prefix: /springhealth
  4. routes:
  5. # 忽略由注册中心发现映射服务路由
  6. ignored-services: 'userservice'
  7. userservice: /user/**

基于静态配置映射服务路由

ApiGateway

  1. zuul与spring cloud gateway对比
  • 通信方式上:zuul采用是阻塞式I/O,Gateway采用的是响应式、非阻塞式I/O;
  • 功能上:Gateway比zuul功能更加丰富,除了通用的服务路由之外,还支持请求限流等面向服务容错方面的功能,也与Hystrix良好的集成。
  • 技术实现上:zuul是对Servlet进一步封装;Gateway是基于spring5和spring boot 2

    Spring Cloud Gateway 基本架构

    Spring Cloud Gateway中两个核心概念:一个是过滤器(Filter)、另一个是谓词(Predicate)。
    Spring cloud - 图3
    Spring Cloud Gateway基本架构图