以下的分析环境基于内存消息队列和默认配置

1. DefaultTransportService

ThingsBoard源码分析之启动分析 2 - 图1
分析初始化方法:

  1. @PostConstruct
  2. public void init() {
  3. //根据配置判断是否创建限流
  4. if (rateLimitEnabled) {
  5. //Just checking the configuration parameters
  6. new TbRateLimits(perTenantLimitsConf);
  7. new TbRateLimits(perDevicesLimitsConf);
  8. }
  9. this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
  10. this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
  11. this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
  12. //transportApiRequestTemplate的创建见下分析①,transportApiRequestTemplate中包含了
  13. //一个生产者producerTemplate(requestTemplate) topic:tb_transport.api.responses ②
  14. //和一个消费者consumerTemplate (responseTemplate) topic:tb_transport.api.responses.localHostName ③
  15. transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
  16. //此处的producerProvider bean的创建是按照配置文件的值创建的,TbQueueProducerProvider有三个实现类,使用ConditionalOnExpression注解,读取service.type的值(默认monolith),所以该Bean的实现类是TbCoreQueueProducerProvider,此类的@PostConstruct标记的init()方法初始化的,该类TbCoreQueueProducerProvider初始化了一下变量:
  17. // 1.toTbCore topic:tb_core
  18. // 2.toTransport topic:tb_transport.notifications
  19. // 3.toRuleEngine topic:tb_rule_engine
  20. // 4.toRuleEngineNotifications topic:tb_rule_engine
  21. // 5.toTbCoreNotifications topic:tb_core
  22. ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
  23. tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
  24. transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
  25. //fullTopic = topic:tb_transport.notifications.localHostName
  26. TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
  27. transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
  28. //见④分析
  29. transportApiRequestTemplate.init();
  30. mainConsumerExecutor.execute(() -> {
  31. while (!stopped) {
  32. try {
  33. List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
  34. if (records.size() == 0) {
  35. continue;
  36. }
  37. records.forEach(record -> {
  38. try {
  39. processToTransportMsg(record.getValue());
  40. } catch (Throwable e) {
  41. log.warn("Failed to process the notification.", e);
  42. }
  43. });
  44. transportNotificationsConsumer.commit();
  45. } catch (Exception e) {
  46. if (!stopped) {
  47. log.warn("Failed to obtain messages from queue.", e);
  48. try {
  49. Thread.sleep(notificationsPollDuration);
  50. } catch (InterruptedException e2) {
  51. log.trace("Failed to wait until the server has capacity to handle new requests", e2);
  52. }
  53. }
  54. }
  55. }
  56. });
  57. }

createTransportApiRequestTemplate In InMemoryTbTransportQueueFactory,因为我们没有启用相应的消息队列中间件,我们分析InMemoryTbTransportQueueFactory:

  1. public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
  2. //根据配置文件值queue.transport_api.requests_topic获取到的topic是tb_transport.api.requests创建了生产者
  3. InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
  4. new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic());
  5. //根据配置文件值queue.transport_api.responses_topic获取到的topic是tb_transport.api.responses
  6. //加上serviceId(我们在第二篇分析中提到,本机的HostName作为serviceId,其topic就是tb_transport.api.responses.localHostName
  7. InMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
  8. new InMemoryTbQueueConsumer<>(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
  9. //使用建造者模式返回了TbQueueRequestTemplate实例,其中包含了一个消费者和一个生产者
  10. DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  11. <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
  12. templateBuilder.queueAdmin(new TbQueueAdmin() {
  13. @Override
  14. public void createTopicIfNotExists(String topic) {}
  15. @Override
  16. public void destroy() {}
  17. });
  18. templateBuilder.requestTemplate(producerTemplate);
  19. templateBuilder.responseTemplate(consumerTemplate);
  20. templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
  21. templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
  22. templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
  23. return templateBuilder.build();
  24. }

init() in DefaultTbQueueRequestTemplate:

  1. public void init() {
  2. queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
  3. //按照是使用的中间件,实现不同的初始化方法,Inmemory该方法体为空
  4. this.requestTemplate.init();
  5. tickTs = System.currentTimeMillis();
  6. //见③,订阅主题为 tb_transport.api.responses.localHostName
  7. responseTemplate.subscribe();
  8. executor.submit(() -> {
  9. long nextCleanupMs = 0L;
  10. while (!stopped) {
  11. try {
  12. //从消息队列里面获取消息
  13. List<Response> responses = responseTemplate.poll(pollInterval);
  14. ...........

2.TbCoreTransportApiService

ThingsBoard源码分析之启动分析 2 - 图2

  • PostConstruct注解方法:

    1. @PostConstruct
    2. public void init() {
    3. this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads);
    4. //topic是配置文件queue.transport_api.responses_topic的值默认为:tb_transport.api.responses ⑤
    5. TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer();
    6. //topic是配置文件queue.transport_api.requests_topic的值,默认为:tb_transport.api.requests ⑥
    7. TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
    8. DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
    9. <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
    10. builder.requestTemplate(consumer);
    11. builder.responseTemplate(producer);
    12. builder.maxPendingRequests(maxPendingRequests);
    13. builder.requestTimeout(requestTimeout);
    14. builder.pollInterval(responsePollDuration);
    15. builder.executor(transportCallbackExecutor);
    16. builder.handler(transportApiService);
    17. transportApiTemplate = builder.build();
  • @EventListener(ApplicationReadyEvent.class)注解方法,调用了transportApiTemplate.init(transportApiService);``transportApiTemplate即上一步创建的DefaultTbQueueResponseTemplate对象init()方法为:

    1. @Override
    2. public void init(TbQueueHandler<Request, Response> handler) {
    3. //按照是使用的中间件,实现不同的初始化方法,Inmemory该方法体为空
    4. this.responseTemplate.init();
    5. //见⑥,订阅主题为tb_transport.api.requests
    6. requestTemplate.subscribe();
    7. loopExecutor.submit(() -> {
    8. while (!stopped) {
    9. try {
    10. while (pendingRequestCount.get() >= maxPendingRequests) {
    11. try {
    12. Thread.sleep(pollInterval);
    13. } catch (InterruptedException e) {
    14. log.trace("Failed to wait until the server has capacity to handle new requests", e);
    15. }
    16. }
    17. List<Request> requests = requestTemplate.poll(pollInterval);
    18. ...........

    总结

    DefaultTransportServiceTbCoreTransportApiService方法的启动并不是很复杂,我们需要将主要的关注点放在两个 Bean 初始化消费者和生产者的 topic 上面,thingsboard 将使用中间件将消息解耦,如果按照传统的调试方法很容易找不到消息的流向,此时我们将 topic 作为关键的切入点,方便后面整个数据流的分析。