上篇文章写到通过 @EnableEurekaServer 注解开启注册中心,然而eureka server有几个关键的功能
- 服务注册
- 心跳续约
- 服务剔除
- 集群原理
- 自我保护机制
下面分析eureka server的服务注册功能,首先看下注册代码
@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// 校验客户端数据if (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// 处理客户端数据丢失情况DataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注册// isReplication参数作用则是判断这是否是集群之间的复制。防止重复注册。registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible}
上面参数的InstanceInfo对象就是服务实例的具体信息,比如,IP,集群微服务名,实例id,端口,最后操作时间,心跳url等信息,代码如下
public class InstanceInfo {
private static final String VERSION_UNKNOWN = "unknown";
public static final int DEFAULT_PORT = 7001;
public static final int DEFAULT_SECURE_PORT = 7002;
public static final int DEFAULT_COUNTRY_ID = 1; // US
// The (fixed) instanceId for this instanceInfo. This should be unique within the scope of the appName.
// 实例id
private volatile String instanceId;
// 集群微服务名
private volatile String appName;
@Auto
private volatile String appGroupName;
// IP地址
private volatile String ipAddr;
private static final String SID_DEFAULT = "na";
@Deprecated
private volatile String sid = SID_DEFAULT;
private volatile int port = DEFAULT_PORT;
private volatile int securePort = DEFAULT_SECURE_PORT;
@Auto
private volatile String homePageUrl;
@Auto
private volatile String statusPageUrl;
@Auto
private volatile String healthCheckUrl;
@Auto
private volatile String secureHealthCheckUrl;
@Auto
private volatile String vipAddress;
@Auto
private volatile String secureVipAddress;
@XStreamOmitField
private String statusPageRelativeUrl;
@XStreamOmitField
private String statusPageExplicitUrl;
@XStreamOmitField
private String healthCheckRelativeUrl;
@XStreamOmitField
private String healthCheckSecureExplicitUrl;
@XStreamOmitField
private String vipAddressUnresolved;
@XStreamOmitField
private String secureVipAddressUnresolved;
@XStreamOmitField
private String healthCheckExplicitUrl;
@Deprecated
private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
private volatile boolean isSecurePortEnabled = false;
private volatile boolean isUnsecurePortEnabled = true;
private volatile DataCenterInfo dataCenterInfo;
private volatile String hostName;
private volatile InstanceStatus status = InstanceStatus.UP;
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
@XStreamOmitField
private volatile boolean isInstanceInfoDirty = false;
private volatile LeaseInfo leaseInfo;
@Auto
private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
@XStreamAlias("metadata")
private volatile Map<String, String> metadata;
@Auto
// 最后操作时间
private volatile Long lastUpdatedTimestamp;
@Auto
private volatile Long lastDirtyTimestamp;
@Auto
private volatile ActionType actionType;
@Auto
private volatile String asgName;
private String version = VERSION_UNKNOWN;
通过代码跟踪,发现关键代码在PeerAwareInstanceRegistryImpl类中的register方法
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
// 默认的续约时间为90秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
// 如果在配置文件中配置了续约时间则重置续约时间
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 调用父类的注册方法
super.register(info, leaseDuration, isReplication);
// 同步集群中所有的节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
这里有两个重要步骤
- 调用父类的注册方法
- 同步集群中所有的节点
下面分析下服务注册的核心方法register
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
eureka server定义了一个ConcurrentHashMap,key为集群的名字,value为一个Map,其中里面的这个Map的key为服务实例的实例id,value为租债器对象
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 通过传进来的集群微服务名拿到一个微服务组
Map<String, Lease<InstanceInfo>> gMap=registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 通过实例id拿到租债器对象,existingLease 表示已经存在的微服务实例对象
// getHolder() 具体的微服务实例对象
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
// 主要是在并发情况下会出现
Long existingLastDirtyTimestamp =existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 将传入的服务实例组装成一个lease租债器对象
// leaseDuration 续约时间
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将当前服务实例对象放入微服务组中
gMap.put(registrant.getId(), lease);
...
}
服务注册主要逻辑是
- 通过服务集群名拿到一个一个微服务组,如果微服务组不存在则创建一个;
- 根据实例id判断判断微服务组中是否存在实例对象,主要是在并发情况下发生多次注册,如果存在则哪个时间戳最新就用哪个;
- 根据传入的参数初始化一个租债器对象,将实例的id为key,value为租债器对象放入微服务组中。
