以下的分析环境基于内存消息队列和无注册中心配置以及按照默认配置
1.Clustering mode
官网对集群模式有一部分介绍这可以帮助我们理解代码为什么会这么做:
ThingsBoard adopts consistent hashing to ensure scalability and availability. Message from Device A that is received on a particular node may be forwarded to the other node based on the hash of the device ID. Although this introduces certain networking overhead, it allows to process all messages from a particular device using corresponding device actor on a determined server, which introduces the following advantages: improve cache hit rate. Device attributes and other device related data are fetched by device actor on a specific server. avoid race conditions. All messages for a particular device are processed on a determined server. allows targeting server-side api calls based on the device id.
2. TbServiceInfoProvider
DefaultTbServiceInfoProvider
init()
方法由@PostConstruct
标记,spring 启动的时候会自动调用该方法:
public void init() {
if (StringUtils.isEmpty(serviceId)) {
try {
//获取本机的HostName作为serviceId
serviceId = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
serviceId = org.apache.commons.lang3.RandomStringUtils.randomAlphabetic(10);
}
}
log.info("Current Service ID: {}", serviceId);
//serviceType是配置文件下service.type的值,默认为monolith
//serviceTypes将会是一个List包含TB_CORE, TB_RULE_ENGINE, TB_TRANSPORT, JS_EXECUTOR ①
if (serviceType.equalsIgnoreCase("monolith")) {
serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
} else {
serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
}
ServiceInfo.Builder builder = ServiceInfo.newBuilder()
.setServiceId(serviceId)
.addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()));
UUID tenantId;
//tenantIdStr是配置文件中service.tenant_id的值,默认情况下为空,isolatedTenant也就为空了
if (!StringUtils.isEmpty(tenantIdStr)) {
tenantId = UUID.fromString(tenantIdStr);
isolatedTenant = new TenantId(tenantId);
} else {
tenantId = TenantId.NULL_UUID;
}
//返回此 uuid 的 128 位值中的最高有效 64 位和最低64位
builder.setTenantIdMSB(tenantId.getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getLeastSignificantBits());
//ruleEngineSettings是一个TbQueueRuleEngineSettings的一个实例,读取queue.rule-engine下的值
//ruleEngineSettings包含topic是tb_rule_engine,queue队列有三个分别是②:
// 1. name: Main topic: tb_rule_engine.main partition: 10
// 2. name: HighPriority topic: tb_rule_engine.hp partition: 10
// 3. name: SequentialByOriginator topic: tb_rule_engine.sq partition: 10
if (serviceTypes.contains(ServiceType.TB_RULE_ENGINE) && ruleEngineSettings != null) {
for (TbRuleEngineQueueConfiguration queue : ruleEngineSettings.getQueues()) {
TransportProtos.QueueInfo queueInfo = TransportProtos.QueueInfo.newBuilder()
.setName(queue.getName())
.setTopic(queue.getTopic())
.setPartitions(queue.getPartitions()).build();
builder.addRuleEngineQueues(queueInfo);
}
}
serviceInfo = builder.build();
}
3. PartitionService
PartitionService
的默认实现是HashPartitionService
:
@PostConstruct
public void init() {
//根据queue.partitions.hash_function_name的配置选择以后做partition的hash方法,默认值是murmur3_128
this.hashFunction = forName(hashFunctionName);
//ConcurrentMap<ServiceQueue, Integer> partitionSizes
//ServiceQueue 类成员ServiceType和字符串类型的queue name,构造函数如果不提供queue name或者queue name是null的话,ServiceQueue对象的的queue name是"Main"
//corePartitions 是queue.core.partitions默认值10
partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
//coreTopic对应的配置文件键是queue.core.topic,默认值tb_core
partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
//根据DefaultTbServiceInfoProvider②的分析可以得出partitionTopics,partitionSizes的具体内容
tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getPartitions());
});
}
4. DiscoveryService
因为没有使用 Zookeeper 做注册中心,DiscoveryService
的实现由DummyDiscoveryService
实现,在收到 Spring 发送的ApplicationReadyEvent
事件后,调用partitionService.recalculatePartitions
方法:
public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
//日志记录
logServiceInfo(currentService);
//dummy Discovery将不包含otherService,Zookeeper注册中心的实现将会有otherService
otherServices.forEach(this::logServiceInfo);
Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
//展开ServiceInfo的serviceTypes和RuleEngineQueue,并添加到queueServicesMap
addNode(queueServicesMap, currentService);
for (ServiceInfo other : otherServices) {
addNode(queueServicesMap, other);
}
queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
myPartitions = new ConcurrentHashMap<>();
//创建了ServiceQueueKey和partitionIndex的list组合
partitionSizes.forEach((serviceQueue, size) -> {
ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
for (int i = 0; i < size; i++) {
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
if (currentService.equals(serviceInfo)) {
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
}
}
});
oldPartitions.forEach((serviceQueueKey, partitions) -> {
if (!myPartitions.containsKey(serviceQueueKey)) {
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet()));
}
});
//发送PartitionChangeEvent,创建的TopicPartitionInfo的topic是partitionTopics的topic,fullTopicName是topic+Index, DummyDiscovery每次tpiList包含了所有的partitionIndex, 此例0-9.例:tpiList其中的fullTopicName是tb_core.9,tb_rule_engine.main.3
myPartitions.forEach((serviceQueueKey, partitions) -> {
if (!partitions.equals(oldPartitions.get(serviceQueueKey))) {
log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions);
Set<TopicPartitionInfo> tpiList = partitions.stream()
.map(partition -> buildTopicPartitionInfo(serviceQueueKey, partition))
.collect(Collectors.toSet());
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
}
});
tpiCache.clear();
if (currentOtherServices == null) {
currentOtherServices = new ArrayList<>(otherServices);
} else {
Set<ServiceQueueKey> changes = new HashSet<>();
Map<ServiceQueueKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
Map<ServiceQueueKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
currentOtherServices = otherServices;
currentMap.forEach((key, list) -> {
if (!list.equals(newMap.get(key))) {
changes.add(key);
}
});
currentMap.keySet().forEach(newMap::remove);
changes.addAll(newMap.keySet());
if (!changes.isEmpty()) {
applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
}
}
}
5. DefaultTbCoreConsumerService
DefaultTbCoreConsumerService
的父类是AbstractConsumerService
, 而父类接收到ApplicationReadyEvent
时,会调用子类的launchMainConsumers()
应用设计模式模板方法模式,启动了消费者线;其作用是:消费者线程按照固定的延时,无限循环去消息队列提取消息,由于此时并未 subscribed,也即未订阅,将暂时不从消息队列里取消息;前面讨论到
DiscoveryService
发布了PartitionChangeEvent
,DefaultTbCoreConsumerService
实现了ApplicationListener<PartitionChangeEvent>
接口,在接收到PartitionChangeEvent
时,会做出相应的反应,调用onApplicationEvent
,订阅了的 fullTopicName 是tb_core.0-9 每次从消息队列进行 poll 的时候,都会检查这些 fullTopicName 里面是否有消息准备就绪;6. DefaultTbRuleEngineConsumerService
@PostConstruct
注解初始化了 ruleEngine 的消费者,按照TbServiceInfoProvider
bean 的初始化②,可以知道,初始化了三个 consumer, 放到了consumers
的 map 变量里,key 是 queue 的 name;DefaultTbRuleEngineConsumerService
的父类也是AbstractConsumerService
, 跟DefaultTbCoreConsumerService
一样,父类接收到ApplicationReadyEvent
时,调用该类的launchMainConsumers()
方法,启动了三个消费者线程,由于还未订阅,所以暂时不从消息队列里面获取消息;- 同
DefaultTbCoreConsumerService
一样,收到PartitionChangeEvent
时,启动订阅主题tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9:public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
//此时serviceType是ServiceType.TB_RULE_ENGINE,过滤了其他的type
if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
//根绝queueName做区分,让三个consumer分别订阅了三个主题的列表tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9
consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
}
}
总结
通过上面类的初始化一系列过程,我们有了大概的印象:
- 创建了
ServiceInfo
对象,并根据配置文件我们得知了 tb_core,和 tb_rule_engine 的一些配置信息; DiscoveryService
根据注册中心,和其他的服务提供者,会计算相应的 partition index,并发布PartitionChangeEvent
;AbstractConsumerService
的两个实现类在接收到PartitionChangeEvent
之前,都启动了一个或多个线程,在接收到此消息的时候, 都会使自己的 consumer 订阅相应的主题等待消息的到来。