1. MQTT server

需要接收设备的 MQTT 连接,那么 thingsboard 中必然有 MQTT 服务器,MQTT 服务器创建的类是MqttTransportService
基于 netty 的 mqtt server,添加了MqttTransportServerInitializer的处理类,并向ChannelPipeline添加了 netty 的MqttDecoderMqttEncoder让我们可以忽略 MQTT 消息的编解码工作,重要的是添加了MqttTransportHandler

2. MqttTransportHandler 处理连接

此例中,我们首先需要创建租户,租户管理员,并添加设备,使用 MQTT Box 模拟硬件设备,拷贝 ACCESS TOKEN 做为 MQTT Box 的 Username 开始连接我们的 thingsboard 后台

ThingsBoard 源码分析 -如何接收 MQTT 连接 - 图1

由于没有使用 ssl,收到连接请求以后,便会调用

  1. private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
  2. String userName = msg.payload().userName();
  3. log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
  4. if (StringUtils.isEmpty(userName)) {
  5. ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
  6. ctx.close();
  7. } else {
  8. //取出userName,构造protobuf的类(方便传输与解析),交给transportService处理。此时会使用到源码解析第三篇DefaultTransportService的解析的相关信息了解process的处理。参阅下方①的详细解析。
  9. transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
  10. new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
  11. @Override
  12. public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
  13. onValidateDeviceResponse(msg, ctx);
  14. }
  15. @Override
  16. public void onError(Throwable e) {
  17. log.trace("[{}] Failed to process credentials: {}", address, userName, e);
  18. ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
  19. ctx.close();
  20. }
  21. });
  22. }
  23. }
  1. DefaultTransportServiceprocess方法构造了异步任务,成功调用onSuccessConsumer,失败调用onFailureConsumer
  2. 将验证用户的任务交由transportApiRequestTemplate.send

    1. public ListenableFuture<Response> send(Request request) {
    2. if (tickSize > maxPendingRequests) {
    3. return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
    4. }
    5. UUID requestId = UUID.randomUUID();
    6. request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
    7. //由第三篇文章的分析得出,此topic时tb_transport.api.responses.localHostName
    8. request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
    9. request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
    10. //参阅第一篇基础知识的介绍,来自谷歌的库,settableFuture,可设置结果的完成
    11. SettableFuture<Response> future = SettableFuture.create();
    12. ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
    13. //将future放到pendingRequests中②
    14. pendingRequests.putIfAbsent(requestId, responseMetaData);
    15. log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
    16. //将消息发送给消息队列topic是tb_transport.api.requests
    17. requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
    18. @Override
    19. public void onSuccess(TbQueueMsgMetadata metadata) {
    20. log.trace("[{}] Request sent: {}", requestId, metadata);
    21. }
    22. @Override
    23. public void onFailure(Throwable t) {
    24. pendingRequests.remove(requestId);
    25. future.setException(t);
    26. }
    27. });
    28. return future;
    29. }
  3. 根据第三篇TbCoreTransportApiService的分析,我们发现DefaultTbQueueResponseTemplate的成员变量requestTemplateconsumer刚好是订阅的 tb_transport.api.requests 的消息:

    1. ......
    2. requests.forEach(request -> {
    3. long currentTime = System.currentTimeMillis();
    4. long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
    5. if (requestTime + requestTimeout >= currentTime) {
    6. byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
    7. if (requestIdHeader == null) {
    8. log.error("[{}] Missing requestId in header", request);
    9. return;
    10. }
    11. //获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localHostName
    12. byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
    13. if (responseTopicHeader == null) {
    14. log.error("[{}] Missing response topic in header", request);
    15. return;
    16. }
    17. UUID requestId = bytesToUuid(requestIdHeader);
    18. String responseTopic = bytesToString(responseTopicHeader);
    19. try {
    20. pendingRequestCount.getAndIncrement();
    21. //调用handler进行处理消息
    22. AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
    23. response -> {
    24. pendingRequestCount.decrementAndGet();
    25. response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
    26. //handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localHostName
    27. responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
    28. },
    29. e -> {
    30. pendingRequestCount.decrementAndGet();
    31. if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
    32. log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
    33. } else {
    34. log.trace("[{}] Failed to process the request: {}", requestId, request, e);
    35. }
    36. },
    37. requestTimeout,
    38. timeoutExecutor,
    39. callbackExecutor);
    40. .......
  4. 具体验证逻辑:

    1. @Override
    2. public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
    3. TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
    4. // protobuf构造的类中判定是否包含需要验证的信息块
    5. if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
    6. ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
    7. //调用validateCredentials,具体内容就是查询deviceInfo,并将结果交由第二个Function进行进一步处理
    8. return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
    9. }
    10. ......
  5. 当通过设备的 acess token 找到了 deviceInfo,便会通过消息中间件将 DeviceInfo 发出来,topic 是tb_transport.api.responses.localHostName,在第三篇的分析中,DefaultTransportServicetransportApiRequestTemplate即订阅此 topic:

    1. List<Response> responses = responseTemplate.poll(pollInterval);
    2. if (responses.size() > 0) {
    3. log.trace("Polling responses completed, consumer records count [{}]", responses.size());
    4. } else {
    5. continue;
    6. }
    7. responses.forEach(response -> {
    8. byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
    9. UUID requestId;
    10. if (requestIdHeader == null) {
    11. log.error("[{}] Missing requestId in header and body", response);
    12. } else {
    13. requestId = bytesToUuid(requestIdHeader);
    14. log.trace("[{}] Response received: {}", requestId, response);
    15. //参见上②,将验证的future放入到pendingRequests中,现在通过设置的requestId取出来
    16. ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
    17. if (expectedResponse == null) {
    18. log.trace("[{}] Invalid or stale request", requestId);
    19. } else {
    20. //设置settableFuture的结果
    21. expectedResponse.future.set(response);
    22. }
    23. }
    24. ......
  6. DefaultTransportServiceprocess异步请求获得了返回的结果,此时调用onSuccess回调,即调用MqttTransportHandleronValidateDeviceResponse

    1. private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
    2. if (!msg.hasDeviceInfo()) {
    3. ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
    4. ctx.close();
    5. } else {
    6. deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
    7. sessionInfo = SessionInfoProto.newBuilder()
    8. .setNodeId(context.getNodeId())
    9. .setSessionIdMSB(sessionId.getMostSignificantBits())
    10. .setSessionIdLSB(sessionId.getLeastSignificantBits())
    11. .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
    12. .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
    13. .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
    14. .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
    15. .setDeviceName(msg.getDeviceInfo().getDeviceName())
    16. .setDeviceType(msg.getDeviceInfo().getDeviceType())
    17. .build();
    18. //创建SessionEvent.OPEN的消息,调用sendToDeviceActor方法,包含sessionInfo
    19. transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
    20. .......
  7. sendToDeviceActor 的实现:

    1. protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
    2. //创建tpi,此时会选择一个固定的partition Id,组成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1
    3. TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
    4. ......
    5. //使用tbCoreMsgProducer发送到消息队列,设置了toDeviceActorMsg
    6. tbCoreMsgProducer.send(tpi,
    7. new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
    8. ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
    9. new TransportTbQueueCallback(callback) : null);
    10. }
  8. 此时第二篇基于DefaultTbCoreConsumerService可以知道DefaultTbCoreConsumerService 的消费者订阅该主题的消息:

    1. try {
    2. ToCoreMsg toCoreMsg = msg.getValue();
    3. if (toCoreMsg.hasToSubscriptionMgrMsg()) {
    4. log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
    5. forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
    6. } else if (toCoreMsg.hasToDeviceActorMsg()) {
    7. log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
    8. //交由此方法进行处理
    9. forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
    10. }
  9. forwardToDeviceActor对消息的处理

    1. private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
    2. if (statsEnabled) {
    3. stats.log(toDeviceActorMsg);
    4. }
    5. //创建type为TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,并交给AppActor处理
    6. actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
    7. }
  10. 通过第四篇的总结 3,我们可以直接去看AppActordoProcess方法对此类型消息的处理,跟踪发现AppActor将消息转给了TenantActor, TenantActor创建了DeviceActor,并将消息转给了DeviceActor;

  11. DeviceActor 拿到此类型的消息,进行了如下的处理:

    1. protected boolean doProcess(TbActorMsg msg) {
    2. switch (msg.getMsgType()) {
    3. case TRANSPORT_TO_DEVICE_ACTOR_MSG:
    4. //包装成TransportToDeviceActorMsgWrapper交由processor处理,并继续调用processSessionStateMsgs
    5. processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
    6. break;
    7. case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
  12. processSessionStateMsgs的处理:

    1. private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
    2. UUID sessionId = getSessionId(sessionInfo);
    3. if (msg.getEvent() == SessionEvent.OPEN) {
    4. .....
    5. sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
    6. if (sessions.size() == 1) {
    7. // 将调用pushRuleEngineMessage(stateData, CONNECT_EVENT);
    8. reportSessionOpen();
    9. }
    10. //将调用pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
    11. systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());
    12. dumpSessions();
    13. }
    14. ....
  13. 由于CONNECT_EVENTACTIVITY_EVENT仅仅类型不同,以下暂时只分析CONNECT_EVENT

    1. public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
    2. if (tenantId.isNullUid()) {
    3. if (entityId.getEntityType().equals(EntityType.TENANT)) {
    4. tenantId = new TenantId(entityId.getId());
    5. } else {
    6. log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
    7. return;
    8. }
    9. }
    10. //和第7点类似,创建的tpi的fullTopicName的例子 tb_rule_engine.main.1
    11. TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
    12. log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
    13. ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
    14. .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
    15. .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
    16. .setTbMsg(TbMsg.toByteString(tbMsg)).build();
    17. producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
    18. toRuleEngineMsgs.incrementAndGet();
    19. }
  14. 通过第二篇的分析DefaultTbRuleEngineConsumerService订阅了此 topic: tb_rule_engine.main.1 的消息,收到消息以后,调用forwardToRuleEngineActor方法,包裹成QUEUE_TO_RULE_ENGINE_MSG类型的消息,交由 AppActor 进行分发处理;

  15. AppActor交给TenantActor处理,TenantActor交给RootRuleChain处理,RuleChainActor交给firstRuleNode处理,也就是某一个RuleNodeActor;
  16. 打开前端 RULE CHAINS 的界面,会发现,MESSAGE TYPE SWITCH 是接收 input 的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是TbMsgTypeSwitchNode
  17. 进入TbMsgTypeSwitchNodeonMsg方法 (实际上所有的 ruleNode 处理消息的方法都是onMsg),发现根据messageType(此时是CONNECT_EVENT)定义了 relationtype 并调用ctx.tellNext(msg, relationType);
  18. 此时DefaultTbContext创建一个RuleNodeToRuleChainTellNextMsg,类型是RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,交给RuleChainActor处理;
  19. 接下来将会进入到RuleChainActorMessageProcessoronTellNext方法:

    1. private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
    2. try {
    3. checkActive(msg);
    4. //消息来源
    5. EntityId entityId = msg.getOriginator();
    6. //创建一个tpi,可能会使用
    7. TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
    8. //查询有关系的RuleNode,其实就是从relation表中查询,该消息来源的id,relation_type和在TbMsgTypeSwitchNode定义的relationType一直的节点id,如上Connect Event就没有找到相应的relation的RuleNodeId
    9. List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
    10. .filter(r -> contains(relationTypes, r.getType()))
    11. .collect(Collectors.toList());
    12. int relationsCount = relations.size();
    13. //Connect Event就没有找到相应的relation的RuleNodeId,消息通过规则引擎,已经处理完成
    14. if (relationsCount == 0) {
    15. log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
    16. if (relationTypes.contains(TbRelationTypes.FAILURE)) {
    17. RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
    18. if (ruleNodeCtx != null) {
    19. msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
    20. } else {
    21. log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
    22. msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
    23. }
    24. } else {
    25. msg.getCallback().onSuccess();
    26. }
    27. //举例:Post telemetry的type可以找到相应的ruleNode,实现类是:TbMsgTimeseriesNode,那么此消息将会交给TbMsgTimeseriesNode处理
    28. } else if (relationsCount == 1) {
    29. for (RuleNodeRelation relation : relations) {
    30. log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
    31. pushToTarget(tpi, msg, relation.getOut(), relation.getType());
    32. }
    33. } else {
    34. MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
    35. log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
    36. for (RuleNodeRelation relation : relations) {
    37. EntityId target = relation.getOut();
    38. putToQueue(tpi, msg, callbackWrapper, target);
    39. }
    40. }
    41. } catch (RuleNodeException rne) {
    42. msg.getCallback().onFailure(rne);
    43. } catch (Exception e) {
    44. msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
    45. }
    46. }
  20. What’s more:
    如上面的举例,比如是遥测数据 Post telemetry,将会使用TbMsgTimeseriesNodeonMsg做进一步的处理,比如存储数据,再通过 webSocket 进行数据的更新如果有 webSocket 的 session 的话,或者其他通知消息,就不详细展开了。

    总结:

  21. 处理 MQTT 的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,RPC 请求发送与接收,大体流程大同小异;

  22. 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;
  23. Actor 的模型就是根据消息的类型,使用 AppActor 进行一步步的分发,最终交由合适的 RuleNode 进行处理;
  24. Protobuf 类型的消息容易序列化传输与解析,所以在 thingsboard 中大量使用,但是生成的类可读性不是很高,可以选择直接读 queue.proto 文件,对类有感性的认知。