拉取增量注册表并合并
就是拉取一下增量注册表,然后根据最近增量注册表中不同的操作,做不同的事情
/**
* 增量拉取注册表信息
*/
private class FetchDeltaRegistryWorker extends Thread {
@Override
public void run() {
//当客户端存活的时候继续循环,没有存活时就不能继续循环了
while (registerClient.isRunning()) {
try {
//通过通信组件获取服务注册表
LinkedList<RecentlyChangedServiceInstance> deltaRegistry = httpSender.fetchDeltaServiceRegistry();
//分以下几种情况,一个是注册的操作,一种是删除的
//如果是注册的话,就判断一下本地缓存的注册表中有没有这个实例,不在的话就要在本地缓存中放一份
//如果是删除,看一下缓存中实例是否存在,存在就删掉
//由于要操作同一份的缓存变量,这里要加锁的
synchronized (registry) {
mergeDeltaRegistry(deltaRegistry);
}
Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 合并增量注册表到本地缓存注册表
* @param deltaRegistry
*/
private void mergeDeltaRegistry(LinkedList<RecentlyChangedServiceInstance> deltaRegistry) {
for (RecentlyChangedServiceInstance recentlyChangedItem : deltaRegistry) {
//如果是注册操作
if (ServiceInstanceOperation.REGISTER.equals(recentlyChangedItem.changedOperation)) {
Map<String, ServiceInstance> serviceInstanceMap =
registry.get(recentlyChangedItem.serviceInstance.getServiceName());
//如果缓存没有该实例列表就要搞一份进去缓存中
if (serviceInstanceMap == null) {
serviceInstanceMap = new HashMap<String, ServiceInstance>();
registry.put(recentlyChangedItem.serviceInstance.getServiceName(),serviceInstanceMap);
}
//如果已经有实例的列表了,看下有没有具体的那个实例,没有那个具体的实例的话就要把实例放进去
ServiceInstance serviceInstance =
serviceInstanceMap.get(recentlyChangedItem.serviceInstance.getServiceInstanceId());
if (serviceInstance == null) {
serviceInstanceMap.put(recentlyChangedItem.serviceInstance.getServiceInstanceId(),
recentlyChangedItem.serviceInstance);
}
} else if (ServiceInstanceOperation.REMOVE.equals(recentlyChangedItem.changedOperation)) {
//如果是删除操作,就把服务实例删除就行
Map<String, ServiceInstance> serviceInstanceMap =
registry.get(recentlyChangedItem.serviceInstance.getServiceName());
if (serviceInstanceMap != null) {
serviceInstanceMap.remove(recentlyChangedItem.serviceInstance.getServiceInstanceId());
}
}
}
}
合并注册表后对客户端注册表的校验
在操作的时候,可能因为某些原因导致最后客户端缓存的注册表的数据和服务端的数据是不一样的,因此就需要做一下这个校验的操作
/**
* 增量拉取注册表信息
*/
private class FetchDeltaRegistryWorker extends Thread {
@Override
public void run() {
//当客户端存活的时候继续循环,没有存活时就不能继续循环了
while (registerClient.isRunning()) {
try {
//通过通信组件获取服务注册表
DeltaRegistry deltaRegistry = httpSender.fetchDeltaServiceRegistry();
//分以下几种情况,一个是注册的操作,一种是删除的
//如果是注册的话,就判断一下本地缓存的注册表中有没有这个实例,不在的话就要在本地缓存中放一份
//如果是删除,看一下缓存中实例是否存在,存在就删掉
//由于要操作同一份的缓存变量,这里要加锁的
synchronized (registry) {
mergeDeltaRegistry(deltaRegistry.getRecentlyChangedQueue());
}
//需要检查一下,服务端注册表的服务实例数和客户端的是否一样
//封装一下增量注册表的对象,拉取增量注册表的时候,一方面返回增量的实例,
// 另一方面要返回当前这个实例在服务端的注册表中有多少个
Long serviceSideTotalCount = deltaRegistry.getServiceInstanceTotalCount();
Long clientSideTotalCount = 0L;
for (Map<String, ServiceInstance> serviceInstanceMap : registry.values()) {
clientSideTotalCount += serviceInstanceMap.size();
}
if (serviceSideTotalCount != clientSideTotalCount) {
//重新拉取全量注册表进行纠正
registry = httpSender.fetchFullServiceRegistry();
}
Thread.sleep(SERVICE_REGISTRY_FETCH_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}