入口:ApplicationResource#addInstance

    1. @POST
    2. @Consumes({"application/json", "application/xml"})
    3. public Response addInstance(InstanceInfo info,
    4. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    5. logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    6. // 校验客户端数据
    7. if (isBlank(info.getId())) {
    8. return Response.status(400).entity("Missing instanceId").build();
    9. } else if (isBlank(info.getHostName())) {
    10. return Response.status(400).entity("Missing hostname").build();
    11. } else if (isBlank(info.getIPAddr())) {
    12. return Response.status(400).entity("Missing ip address").build();
    13. } else if (isBlank(info.getAppName())) {
    14. return Response.status(400).entity("Missing appName").build();
    15. } else if (!appName.equals(info.getAppName())) {
    16. return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    17. } else if (info.getDataCenterInfo() == null) {
    18. return Response.status(400).entity("Missing dataCenterInfo").build();
    19. } else if (info.getDataCenterInfo().getName() == null) {
    20. return Response.status(400).entity("Missing dataCenterInfo Name").build();
    21. }
    22. // 处理客户端数据丢失情况
    23. DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    24. if (dataCenterInfo instanceof UniqueIdentifier) {
    25. String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
    26. if (isBlank(dataCenterInfoId)) {
    27. boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
    28. if (experimental) {
    29. String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
    30. return Response.status(400).entity(entity).build();
    31. } else if (dataCenterInfo instanceof AmazonInfo) {
    32. AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
    33. String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
    34. if (effectiveId == null) {
    35. amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
    36. }
    37. } else {
    38. logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
    39. }
    40. }
    41. }
    42. // 注册
    43. // isReplication参数作用则是判断这是否是集群之间的复制。防止重复注册。
    44. registry.register(info, "true".equals(isReplication));
    45. return Response.status(204).build(); // 204 to be backwards compatible
    46. }

    上面参数的InstanceInfo对象就是服务实例的具体信息,比如,IP,集群微服务名,实例id,端口,最后操作时间,心跳url等信息,代码如下

    1. public class InstanceInfo {
    2. private static final String VERSION_UNKNOWN = "unknown";
    3. public static final int DEFAULT_PORT = 7001;
    4. public static final int DEFAULT_SECURE_PORT = 7002;
    5. public static final int DEFAULT_COUNTRY_ID = 1; // US
    6. // The (fixed) instanceId for this instanceInfo. This should be unique within the scope of the appName.
    7. // 实例id
    8. private volatile String instanceId;
    9. // 集群微服务名
    10. private volatile String appName;
    11. @Auto
    12. private volatile String appGroupName;
    13. // IP地址
    14. private volatile String ipAddr;
    15. private static final String SID_DEFAULT = "na";
    16. @Deprecated
    17. private volatile String sid = SID_DEFAULT;
    18. private volatile int port = DEFAULT_PORT;
    19. private volatile int securePort = DEFAULT_SECURE_PORT;
    20. @Auto
    21. private volatile String homePageUrl;
    22. @Auto
    23. private volatile String statusPageUrl;
    24. @Auto
    25. private volatile String healthCheckUrl;
    26. @Auto
    27. private volatile String secureHealthCheckUrl;
    28. @Auto
    29. private volatile String vipAddress;
    30. @Auto
    31. private volatile String secureVipAddress;
    32. @XStreamOmitField
    33. private String statusPageRelativeUrl;
    34. @XStreamOmitField
    35. private String statusPageExplicitUrl;
    36. @XStreamOmitField
    37. private String healthCheckRelativeUrl;
    38. @XStreamOmitField
    39. private String healthCheckSecureExplicitUrl;
    40. @XStreamOmitField
    41. private String vipAddressUnresolved;
    42. @XStreamOmitField
    43. private String secureVipAddressUnresolved;
    44. @XStreamOmitField
    45. private String healthCheckExplicitUrl;
    46. @Deprecated
    47. private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
    48. private volatile boolean isSecurePortEnabled = false;
    49. private volatile boolean isUnsecurePortEnabled = true;
    50. private volatile DataCenterInfo dataCenterInfo;
    51. private volatile String hostName;
    52. private volatile InstanceStatus status = InstanceStatus.UP;
    53. private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
    54. @XStreamOmitField
    55. private volatile boolean isInstanceInfoDirty = false;
    56. private volatile LeaseInfo leaseInfo;
    57. @Auto
    58. private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
    59. @XStreamAlias("metadata")
    60. private volatile Map<String, String> metadata;
    61. @Auto
    62. // 最后操作时间
    63. private volatile Long lastUpdatedTimestamp;
    64. @Auto
    65. private volatile Long lastDirtyTimestamp;
    66. @Auto
    67. private volatile ActionType actionType;
    68. @Auto
    69. private volatile String asgName;
    70. private String version = VERSION_UNKNOWN;

    关键代码类图
    image.png

    通过代码跟踪,发现关键代码在PeerAwareInstanceRegistryImpl类中的register方法,这个类主要负责集群信息同步

    1. @Override
    2. public void register(final InstanceInfo info, final boolean isReplication) {
    3. // 默认的续约时间为90秒
    4. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    5. // 如果在配置文件中配置了续约时间则重置续约时间
    6. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
    7. leaseDuration = info.getLeaseInfo().getDurationInSecs();
    8. }
    9. // 调用父类的注册方法
    10. super.register(info, leaseDuration, isReplication);
    11. // 同步集群中所有的节点
    12. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    13. }

    父类AbstractInstanceRegistry的register方法

    1. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    2. try {
    3. read.lock();
    4. // 通过传进来的集群微服务名拿到一个微服务组
    5. Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
    6. REGISTER.increment(isReplication);
    7. if (gMap == null) {
    8. final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
    9. gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
    10. if (gMap == null) {
    11. gMap = gNewMap;
    12. }
    13. }
    14. // 通过实例id拿到租债器对象,existingLease 表示已经存在的微服务实例对象
    15. // getHolder() 具体的微服务实例对象
    16. Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
    17. // Retain the last dirty timestamp without overwriting it, if there is already a lease
    18. if (existingLease != null && (existingLease.getHolder() != null)) {
    19. Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
    20. Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
    21. logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    22. // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
    23. // InstanceInfo instead of the server local copy.
    24. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
    25. logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
    26. " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    27. logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
    28. registrant = existingLease.getHolder();
    29. }
    30. } else {
    31. // The lease does not exist and hence it is a new registration
    32. synchronized (lock) {
    33. if (this.expectedNumberOfClientsSendingRenews > 0) {
    34. // Since the client wants to register it, increase the number of clients sending renews
    35. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
    36. updateRenewsPerMinThreshold();
    37. }
    38. }
    39. logger.debug("No previous lease information found; it is new registration");
    40. }
    41. // 将传入的服务实例组装成一个lease租债器对象
    42. // leaseDuration 续约时间
    43. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
    44. if (existingLease != null) {
    45. lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
    46. }
    47. gMap.put(registrant.getId(), lease);
    48. recentRegisteredQueue.add(new Pair<Long, String>(
    49. System.currentTimeMillis(),
    50. registrant.getAppName() + "(" + registrant.getId() + ")"));
    51. // This is where the initial state transfer of overridden status happens
    52. if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
    53. logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
    54. + "overrides", registrant.getOverriddenStatus(), registrant.getId());
    55. if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
    56. logger.info("Not found overridden id {} and hence adding it", registrant.getId());
    57. overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
    58. }
    59. }
    60. InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
    61. if (overriddenStatusFromMap != null) {
    62. logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
    63. registrant.setOverriddenStatus(overriddenStatusFromMap);
    64. }
    65. // Set the status based on the overridden status rules
    66. InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
    67. registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    68. // If the lease is registered with UP status, set lease service up timestamp
    69. if (InstanceStatus.UP.equals(registrant.getStatus())) {
    70. lease.serviceUp();
    71. }
    72. registrant.setActionType(ActionType.ADDED);
    73. recentlyChangedQueue.add(new RecentlyChangedItem(lease));
    74. registrant.setLastUpdatedTimestamp();
    75. invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    76. logger.info("Registered instance {}/{} with status {} (replication={})",
    77. registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    78. } finally {
    79. read.unlock();
    80. }
    81. }

    服务注册主要逻辑是

    • 通过服务集群名拿到一个一个微服务组,如果微服务组不存在则创建一个;
    • 根据实例id判断判断微服务组中是否存在实例对象,主要是在并发情况下发生多次注册,如果存在则哪个时间戳最新就用哪个;
    • 根据传入的参数初始化一个租债器对象,将实例的id为key,value为租债器对象放入微服务组中。