MQTT概述

MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。你可以在此处找到有关MQTT的更多信息。
ThingsBoard服务器节点充当支持QoS级别0(最多一次)和QoS级别1(至少一次)以及一组预定义主题的MQTT主题。
ThingsBoard基于MQTT协议提供给设备的API是非常”灵活”的。
例如,目前提供了四种API:

  • 遥测数据上传API
  • 属性API
  • RPC API
  • 声明设备所有权API

用上述四种的API,用户可以动态调整设备监控属性,例如原先采集设备的温度,湿度及材料大小调整为温度,湿度和材料是否合格;并可以获取设备历史遥测数据和最新遥测数据;通过RPC API, 设备和服务器可以实时获取对方相应属性的变化和通过API用户可以对设备进行转让,租赁和回收操作。

关联模块一览

和MQTT设备传输协议关联的模块有Thingsboard MQTT Transport ServiceThingsboard MQTT Transport CommonThingsboard Server Queue components。前面这些名称大家可以看IDEA maven模块名称。

MQTT Transport Service MQTT Transport Common Server Queue Components
Thingsboard源码分析-MQTT协议处理上 - 图1 Thingsboard源码分析-MQTT协议处理上 - 图2 Thingsboard源码分析-MQTT协议处理上 - 图3

MQTT Transport Service

org.thingsboard.server.mqtt.ThingsboardMqttTransportApplication,MQTT服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。

  1. @SpringBootConfiguration
  2. @EnableAsync
  3. @EnableScheduling
  4. @ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})
  5. public class ThingsboardMqttTransportApplication {
  6. private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
  7. private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";
  8. public static void main(String[] args) {
  9. SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
  10. }
  11. private static String[] updateArguments(String[] args) {
  12. if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
  13. String[] modifiedArgs = new String[args.length + 1];
  14. System.arraycopy(args, 0, modifiedArgs, 0, args.length);
  15. modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
  16. return modifiedArgs;
  17. }
  18. return args;
  19. }
  20. }
  21. Copy to clipboardErrorCopied

第2-3行代码中,@EnableAsync注解使用来开启异步线程,@EnableScheduling注解使用来开启定时任务。
第4行代码@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。
第7-8行代码和updateArguments的作用是:启动时,使用 —spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。

MQTT Transport Common

Netty框架

Thingsboard的Mqtt协议逻辑实现是通过Netty实现的,Netty是一个NIO客户端、服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。

  • Netty 官网 : https://netty.io/
  • Netty GitHub仓库 : https://github.com/netty/netty
    引入依赖
    MQTT Transport common通过引入Netty 4.x版本的jar包对Mqtt进行协议逻辑实现,Netty4.x和3.x的区别还是挺大的。
    1. <dependency>
    2. <groupId>io.netty</groupId>
    3. <artifactId>netty-all</artifactId>
    4. </dependency>Copy to clipboardErrorCopied
    参数配置
    1. transport:
    2. # 本地MQTT传输参数
    3. mqtt:
    4. # 开启/关闭mqtt传输协议.
    5. enabled: "${MQTT_ENABLED:true}"
    6. bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
    7. bind_port: "${MQTT_BIND_PORT:1883}"
    8. timeout: "${MQTT_TIMEOUT:10000}"
    9. netty:
    10. leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
    11. boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
    12. worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
    13. max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
    14. so_keep_alive: "${NETTY_SO_KEEPALIVE:false}"
    15. # MQTT SSL配置
    16. ssl:
    17. # 开启/关闭 SSL支持
    18. enabled: "${MQTT_SSL_ENABLED:false}"
    19. # SSL协议: 参阅http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext
    20. protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
    21. # Path to the key store that holds the SSL certificate
    22. key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"
    23. # Password used to access the key store
    24. key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"
    25. # Password used to access the key
    26. key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
    27. # Type of the key store
    28. key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"Copy to clipboardErrorCopied

    模块目录结构

    首先我们看该模块下的目录结构:
    1. └── main
    2. └── java
    3. └── org
    4. └── thingsboard
    5. └── server
    6. └── transport
    7. └── mqtt
    8. ├── MqttSslHandlerProvider.java Mqtt Ssl逻辑处理提供类
    9. ├── MqttTopics.java Mqtt预定义主题
    10. ├── MqttTransportContext.java Mqtt传输协议上下文
    11. ├── MqttTransportHandler.java Mqttt传输协议逻辑处理类
    12. ├── MqttTransportServerInitializer.java Mqtt传输协议初始化类
    13. ├── MqttTransportService.java Mqtt传输协议启动类
    14. ├── adaptors
    15. ├── JsonMqttAdaptor.java Mqtt传输内容Json适配器
    16. └── MqttTransportAdaptor.java Mqtt协议传输适配器
    17. ├── session
    18. ├── DeviceSessionCtx.java 设备会话上下文
    19. ├── GatewayDeviceSessionCtx.java 网关设备会话上下文
    20. ├── GatewaySessionHandler.java 网关会话处理类
    21. ├── MqttDeviceAwareSessionContext.java Mqtt设备会话上下文
    22. └── MqttTopicMatcher.java Mqtt主题匹配器
    23. └── util
    24. └── SslUtil.java Ssl工具类Copy to clipboardErrorCopied

    Mqtt传输协议逻辑实现

    MqttTransportService
    1. @Service("MqttTransportService")
    2. @ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.mqtt.enabled}'=='true')")
    3. @Slf4j
    4. public class MqttTransportService {
    5. @Value("${transport.mqtt.bind_address}")
    6. private String host;
    7. @Value("${transport.mqtt.bind_port}")
    8. private Integer port;
    9. @Value("${transport.mqtt.netty.leak_detector_level}")
    10. private String leakDetectorLevel;
    11. @Value("${transport.mqtt.netty.boss_group_thread_count}")
    12. private Integer bossGroupThreadCount;
    13. @Value("${transport.mqtt.netty.worker_group_thread_count}")
    14. private Integer workerGroupThreadCount;
    15. @Value("${transport.mqtt.netty.so_keep_alive}")
    16. private boolean keepAlive;
    17. @Autowired
    18. private MqttTransportContext context;
    19. private Channel serverChannel;
    20. private EventLoopGroup bossGroup;
    21. private EventLoopGroup workerGroup;
    22. @PostConstruct
    23. public void init() throws Exception {
    24. log.info("Setting resource leak detector level to {}", leakDetectorLevel);
    25. ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
    26. log.info("Starting MQTT transport...");
    27. bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
    28. workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
    29. ServerBootstrap b = new ServerBootstrap();
    30. b.group(bossGroup, workerGroup)
    31. .channel(NioServerSocketChannel.class)
    32. .childHandler(new MqttTransportServerInitializer(context))
    33. .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
    34. serverChannel = b.bind(host, port).sync().channel();
    35. log.info("Mqtt transport started!");
    36. }
    37. @PreDestroy
    38. public void shutdown() throws InterruptedException {
    39. log.info("Stopping MQTT transport!");
    40. try {
    41. serverChannel.close().sync();
    42. } finally {
    43. workerGroup.shutdownGracefully();
    44. bossGroup.shutdownGracefully();
    45. }
    46. log.info("MQTT transport stopped!");
    47. }
    48. }Copy to clipboardErrorCopied
    第6行到18行, 通过@value来注入对应的值,直接在字段上添加@value获取application.yml文件中的值。
    MQTT服务端参数:
参数 参数名称 参数值
transport.mqtt.bind_address 绑定地址 0.0.0.0
transport.mqtt.bind_port 绑定端口 1883
transport.mqtt.netty.leak_detector_level 内存检测级别 DISABLED
transport.mqtt.netty.boss_group_thread_count boss线程组线程数 1
transport.mqtt.netty.worker_group_thread_count work线程组线程数 12
transport.mqtt.netty.so_keep_alive 心跳检测 false

其中第三个内存检测级别是源于: netty中大量使用了池化技术来减缓IO buffer的创建销毁开销。对于这些内存池管理的对象,从netty 4之后使用了引用计数来对它们进行管理。但是JVM GC和netty的内存回收机制是不同的,netty就提供了一个内存泄漏检查机制。

  1. DISABLED: 不进行内存泄露的检测;
  2. SIMPLE: 抽样检测,且只对部分方法调用进行记录,消耗较小,有泄漏时可能会延迟报告,默认级别;
  3. ADVANCED: 抽样检测,记录对象最近几次的调用记录,有泄漏时可能会延迟报告;
  4. PARANOID: 每次创建一个对象时都进行泄露检测,且会记录对象最近的详细调用记录。是比较激进的内存泄露检测级别,消耗最大,建议只在测试时使用。

第四个和第五个参数,boss线程组 用于服务端接受客户端的连接,worker线程组 用于进行SocketChannel的数据读写。
第35行到第41行,通过创建ServerBootstrap对象,设置使用的EventLoopGroup;设置要被实例化的为NioServerSocketChannel类;设置连入服务端的Client的SocketChannel的处理器,绑定端口,并同步等待成功,即启动服务器。
第46行到56行,监听服务端关闭,并阻塞等待,并优雅关闭俩个EventLoopGroup对象。

MqttTransportServerInitializer
  1. public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {
  2. private final MqttTransportContext context;
  3. public MqttTransportServerInitializer(MqttTransportContext context) {
  4. this.context = context;
  5. }
  6. @Override
  7. public void initChannel(SocketChannel ch) {
  8. ChannelPipeline pipeline = ch.pipeline();
  9. if (context.getSslHandlerProvider() != null) {
  10. SslHandler sslHandler = context.getSslHandlerProvider().getSslHandler();
  11. pipeline.addLast(sslHandler);
  12. context.setSslHandler(sslHandler);
  13. }
  14. pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
  15. pipeline.addLast("encoder", MqttEncoder.INSTANCE);
  16. MqttTransportHandler handler = new MqttTransportHandler(context);
  17. pipeline.addLast(handler);
  18. ch.closeFuture().addListener(handler);
  19. }
  20. }Copy to clipboardErrorCopied

第11-16行设置ChannelPipeLine,判断SSL处理器处理类是否为空,如果不为空,将SSL处理器加入到ChannelPipeLine。
第17-23行,添加负载内容的解编码器,Mqtt协议逻辑处理器和异步操作完成时回调。

MqttTransportHandler

Thingsboard源码分析-MQTT协议处理上 - 图4