入口:ApplicationResource#addInstance
@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// 校验客户端数据if (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// 处理客户端数据丢失情况DataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注册// isReplication参数作用则是判断这是否是集群之间的复制。防止重复注册。registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible}
上面参数的InstanceInfo对象就是服务实例的具体信息,比如,IP,集群微服务名,实例id,端口,最后操作时间,心跳url等信息,代码如下
public class InstanceInfo {private static final String VERSION_UNKNOWN = "unknown";public static final int DEFAULT_PORT = 7001;public static final int DEFAULT_SECURE_PORT = 7002;public static final int DEFAULT_COUNTRY_ID = 1; // US// The (fixed) instanceId for this instanceInfo. This should be unique within the scope of the appName.// 实例idprivate volatile String instanceId;// 集群微服务名private volatile String appName;@Autoprivate volatile String appGroupName;// IP地址private volatile String ipAddr;private static final String SID_DEFAULT = "na";@Deprecatedprivate volatile String sid = SID_DEFAULT;private volatile int port = DEFAULT_PORT;private volatile int securePort = DEFAULT_SECURE_PORT;@Autoprivate volatile String homePageUrl;@Autoprivate volatile String statusPageUrl;@Autoprivate volatile String healthCheckUrl;@Autoprivate volatile String secureHealthCheckUrl;@Autoprivate volatile String vipAddress;@Autoprivate volatile String secureVipAddress;@XStreamOmitFieldprivate String statusPageRelativeUrl;@XStreamOmitFieldprivate String statusPageExplicitUrl;@XStreamOmitFieldprivate String healthCheckRelativeUrl;@XStreamOmitFieldprivate String healthCheckSecureExplicitUrl;@XStreamOmitFieldprivate String vipAddressUnresolved;@XStreamOmitFieldprivate String secureVipAddressUnresolved;@XStreamOmitFieldprivate String healthCheckExplicitUrl;@Deprecatedprivate volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to USprivate volatile boolean isSecurePortEnabled = false;private volatile boolean isUnsecurePortEnabled = true;private volatile DataCenterInfo dataCenterInfo;private volatile String hostName;private volatile InstanceStatus status = InstanceStatus.UP;private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;@XStreamOmitFieldprivate volatile boolean isInstanceInfoDirty = false;private volatile LeaseInfo leaseInfo;@Autoprivate volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;@XStreamAlias("metadata")private volatile Map<String, String> metadata;@Auto// 最后操作时间private volatile Long lastUpdatedTimestamp;@Autoprivate volatile Long lastDirtyTimestamp;@Autoprivate volatile ActionType actionType;@Autoprivate volatile String asgName;private String version = VERSION_UNKNOWN;
关键代码类图
通过代码跟踪,发现关键代码在PeerAwareInstanceRegistryImpl类中的register方法,这个类主要负责集群信息同步
@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {// 默认的续约时间为90秒int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;// 如果在配置文件中配置了续约时间则重置续约时间if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}// 调用父类的注册方法super.register(info, leaseDuration, isReplication);// 同步集群中所有的节点replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
父类AbstractInstanceRegistry的register方法
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {read.lock();// 通过传进来的集群微服务名拿到一个微服务组Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}// 通过实例id拿到租债器对象,existingLease 表示已经存在的微服务实例对象// getHolder() 具体的微服务实例对象Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 将传入的服务实例组装成一个lease租债器对象// leaseDuration 续约时间Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {read.unlock();}}
服务注册主要逻辑是
- 通过服务集群名拿到一个一个微服务组,如果微服务组不存在则创建一个;
- 根据实例id判断判断微服务组中是否存在实例对象,主要是在并发情况下发生多次注册,如果存在则哪个时间戳最新就用哪个;
- 根据传入的参数初始化一个租债器对象,将实例的id为key,value为租债器对象放入微服务组中。
