上篇文章写到通过 @EnableEurekaServer 注解开启注册中心,然而eureka server有几个关键的功能

    • 服务注册
    • 心跳续约
    • 服务剔除
    • 集群原理
    • 自我保护机制

    下面分析eureka server的服务注册功能,首先看下注册代码

    1. @POST
    2. @Consumes({"application/json", "application/xml"})
    3. public Response addInstance(InstanceInfo info,
    4. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    5. logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    6. // 校验客户端数据
    7. if (isBlank(info.getId())) {
    8. return Response.status(400).entity("Missing instanceId").build();
    9. } else if (isBlank(info.getHostName())) {
    10. return Response.status(400).entity("Missing hostname").build();
    11. } else if (isBlank(info.getIPAddr())) {
    12. return Response.status(400).entity("Missing ip address").build();
    13. } else if (isBlank(info.getAppName())) {
    14. return Response.status(400).entity("Missing appName").build();
    15. } else if (!appName.equals(info.getAppName())) {
    16. return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    17. } else if (info.getDataCenterInfo() == null) {
    18. return Response.status(400).entity("Missing dataCenterInfo").build();
    19. } else if (info.getDataCenterInfo().getName() == null) {
    20. return Response.status(400).entity("Missing dataCenterInfo Name").build();
    21. }
    22. // 处理客户端数据丢失情况
    23. DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    24. if (dataCenterInfo instanceof UniqueIdentifier) {
    25. String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
    26. if (isBlank(dataCenterInfoId)) {
    27. boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
    28. if (experimental) {
    29. String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
    30. return Response.status(400).entity(entity).build();
    31. } else if (dataCenterInfo instanceof AmazonInfo) {
    32. AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
    33. String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
    34. if (effectiveId == null) {
    35. amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
    36. }
    37. } else {
    38. logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
    39. }
    40. }
    41. }
    42. // 注册
    43. // isReplication参数作用则是判断这是否是集群之间的复制。防止重复注册。
    44. registry.register(info, "true".equals(isReplication));
    45. return Response.status(204).build(); // 204 to be backwards compatible
    46. }

    上面参数的InstanceInfo对象就是服务实例的具体信息,比如,IP,集群微服务名,实例id,端口,最后操作时间,心跳url等信息,代码如下

    public class InstanceInfo {
    
        private static final String VERSION_UNKNOWN = "unknown";
    
        public static final int DEFAULT_PORT = 7001;
        public static final int DEFAULT_SECURE_PORT = 7002;
        public static final int DEFAULT_COUNTRY_ID = 1; // US
    
        // The (fixed) instanceId for this instanceInfo. This should be unique within the scope of the appName.
        // 实例id
        private volatile String instanceId;
    
        // 集群微服务名
        private volatile String appName;
        @Auto
        private volatile String appGroupName;
    
        // IP地址
        private volatile String ipAddr;
    
        private static final String SID_DEFAULT = "na";
        @Deprecated
        private volatile String sid = SID_DEFAULT;
    
        private volatile int port = DEFAULT_PORT;
        private volatile int securePort = DEFAULT_SECURE_PORT;
    
        @Auto
        private volatile String homePageUrl;
        @Auto
        private volatile String statusPageUrl;
        @Auto
        private volatile String healthCheckUrl;
        @Auto
        private volatile String secureHealthCheckUrl;
        @Auto
        private volatile String vipAddress;
        @Auto
        private volatile String secureVipAddress;
        @XStreamOmitField
        private String statusPageRelativeUrl;
        @XStreamOmitField
        private String statusPageExplicitUrl;
        @XStreamOmitField
        private String healthCheckRelativeUrl;
        @XStreamOmitField
        private String healthCheckSecureExplicitUrl;
        @XStreamOmitField
        private String vipAddressUnresolved;
        @XStreamOmitField
        private String secureVipAddressUnresolved;
        @XStreamOmitField
        private String healthCheckExplicitUrl;
        @Deprecated
        private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
        private volatile boolean isSecurePortEnabled = false;
        private volatile boolean isUnsecurePortEnabled = true;
        private volatile DataCenterInfo dataCenterInfo;
        private volatile String hostName;
        private volatile InstanceStatus status = InstanceStatus.UP;
        private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
        @XStreamOmitField
        private volatile boolean isInstanceInfoDirty = false;
        private volatile LeaseInfo leaseInfo;
        @Auto
        private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
        @XStreamAlias("metadata")
        private volatile Map<String, String> metadata;
        @Auto
        // 最后操作时间
        private volatile Long lastUpdatedTimestamp;
        @Auto
        private volatile Long lastDirtyTimestamp;
        @Auto
        private volatile ActionType actionType;
        @Auto
        private volatile String asgName;
        private String version = VERSION_UNKNOWN;
    

    通过代码跟踪,发现关键代码在PeerAwareInstanceRegistryImpl类中的register方法

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        // 默认的续约时间为90秒
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        // 如果在配置文件中配置了续约时间则重置续约时间
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 调用父类的注册方法
        super.register(info, leaseDuration, isReplication);
        // 同步集群中所有的节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    

    这里有两个重要步骤

    • 调用父类的注册方法
    • 同步集群中所有的节点

    下面分析下服务注册的核心方法register

    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    

    eureka server定义了一个ConcurrentHashMap,key为集群的名字,value为一个Map,其中里面的这个Map的key为服务实例的实例id,value为租债器对象

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                // 通过传进来的集群微服务名拿到一个微服务组
                Map<String, Lease<InstanceInfo>> gMap=registry.get(registrant.getAppName());
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                // 通过实例id拿到租债器对象,existingLease 表示已经存在的微服务实例对象
                // getHolder() 具体的微服务实例对象
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
                if (existingLease != null && (existingLease.getHolder() != null)) {
                    // 主要是在并发情况下会出现
                    Long existingLastDirtyTimestamp =existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();
                    }
                } else {
                    // The lease does not exist and hence it is a new registration
                    synchronized (lock) {
                        if (this.expectedNumberOfRenewsPerMin > 0) {
                            // Since the client wants to cancel it, reduce the threshold
                            // (1
                            // for 30 seconds, 2 for a minute)
                            this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                            this.numberOfRenewsPerMinThreshold =
                                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                // 将传入的服务实例组装成一个lease租债器对象
                // leaseDuration 续约时间
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                // 将当前服务实例对象放入微服务组中
                gMap.put(registrant.getId(), lease);
                ...
        }
    

    服务注册主要逻辑是

    • 通过服务集群名拿到一个一个微服务组,如果微服务组不存在则创建一个;
    • 根据实例id判断判断微服务组中是否存在实例对象,主要是在并发情况下发生多次注册,如果存在则哪个时间戳最新就用哪个;
    • 根据传入的参数初始化一个租债器对象,将实例的id为key,value为租债器对象放入微服务组中。