MQTT概述
MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。你可
ThingsBoard服务器节点充当支持QoS级别0(最多一次)和QoS级别1(至少一次)以及一组预定义主题的MQTT主题。
ThingsBoard基于MQTT协议提供给设备的API是非常”灵活”的。
关联模块
查找思路:application 启动项目,pom 文件中找到 mqtt-transport-common,再查找 mqtt-transport-common的引用模块
1. mqtt-transport-service |
2. mqtt-transport-common |
3. 微服务版本(忽略) |
---|---|---|
MQTT Transport Service
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.Arrays;
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue", "org.thingsboard.server.cache"})
public class ThingsboardMqttTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";
public static void main(String[] args) {
SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
}
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
String[] modifiedArgs = new String[args.length + 1];
System.arraycopy(args, 0, modifiedArgs, 0, args.length);
modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
return modifiedArgs;
}
return args;
}
}
MQTT Transport Common
└── main
└── java
└── org.thingsboard.server.transport.mqtt
└── adaptors
│ ├── BackwardCompatibilityAdaptor.java 兼容json与Protobuf协议适配器
│ ├── JsonMqttAdaptor.java Mqtt传输内容Json适配器
│ ├── MqttTransportAdaptor.java Mqtt协议传输适配器
│ └── ProtoMqttAdaptor.java Mqtt传输Protobuf适配器
└── limits
│ ├── IpFilter.java IP过滤器
│ └── ProxyIpFilter.java 代理IP过滤器
└── session
│ ├── DeviceSessionCtx.java 设备会话上下文
│ ├── GatewayDeviceSessionCtx.java 网关设备会话上下文
│ ├── GatewaySessionHandler.java 网关会话处理类
│ ├── MqttDeviceAwareSessionContext.java Mqtt设备会话上下文
│ └── MqttTopicMatcher.java Mqtt主题匹配器
└── util
│ ├── AlwaysTrueTopicFilter.java always true 过滤器
│ ├── EqualsTopicFilter.java Mqtt euqals过滤器
│ ├── MqttTopicFilter.java Mqtt Topic过滤器
│ ├── MqttTopicFilterFactory.java Mqtt Topic过滤器工厂
│ └── RegexTopicFilter.java 正则topic过滤器
├── MqttSslHandlerProvider.java Mqtt Ssl逻辑处理提供类
├── MqttTransportContext.java Mqtt传输协议上下文
├── MqttTransportHandler.java Mqttt传输协议逻辑处理类
├── MqttTransportServerInitializer.java Mqtt传输协议初始化类
├── MqttTransportService.java Mqtt传输协议启动类
└── TopicType.java Mqtt topic类型枚举