如何订阅一个主题,并接收消息?
订阅主题的流程如下:
6. 订阅与发布(二) - 图1

  1. Client 向 Broker 发送一个 SUBSCRIBE 数据包,其中包含了 Client 想要订阅的主题以及其他一些参数;
  2. Broker 收到 SUBSCRIBE 数据包后,向 Client 发送一个 SUBACK 数据包作为应答。

看数据包的具体内容:

1. SUBSCRIBE

1.1 可变头(Variable header)

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从Sender 到Receiver 的一次消息交互中保持唯一。

    1.2 消息体(Payload)

  • 订阅列表(List of Subscriptions):SUBSCRIBE 的消息体中包含 Client 想要订阅的主题列表,列表中的每一项由订阅主题名和对应的 QoS 组成。主题名中可以包含通配符(参考后面通配符),单层通配符“+”和多层通配符“#”。使用包含通配符的主题名可以订阅满足匹配条件的所有主题。为了和 PUBLISH 中的主题区分,我们叫SUBSCRIBE 中的主题名为主题过滤器(Topic Filter)。

SUBSCRIBE 数据包中 QoS 代表针对某一个或者一组主题,Client 希望 Broker 在发送来自这些主题的消息给它时,消息使用的 QoS 级别。

2. SUBACK

为了确认每一次的订阅,Broker 收到 SUBSCRIBE 之后会回复一个 SUBACK 数据包作为应答。

2.1 可变头(Variable header)

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从Sender 到 Receiver 的一次消息交互中保持唯一。

    2.2 消息体(Payload)

  • 返回码(return codes):SUBBACK 数据包包含了一组返回码,返回码的数量和顺序和 SUBSCRIBE 数据包的订阅列表对应,用于标识订阅类别中的每一个订阅项的订阅结果。 | 返回码 | 含义 | | —- | —- | | 0 | 订阅成功,最大可用QoS为0 | | 1 | 订阅成功,最大可用QoS为1 | | 2 | 订阅成功,最大可用QoS为2 | | 128 | 订阅失败 |

返回码 0~2 代表订阅成功,同时 Broker 授予 Subscriber 不同的 QoS 等级,这个等级可能会和 Subscriber在 SUBSCRIBE 数据包中要求的不一样。
返回码 128 代表订阅失败,比如 Client 没有权限订阅某个主题,或者要求订阅的主题格式不正确等。

3. 代码实践:订阅一个主题

接下来我们来写订阅并处理消息的代码,我们订阅在之前的 publisher.js 中的主题,并通过捕获“message”事件获取接收的消息并打印出来。

通常我们在建立和 Broker 的连接之后就可以开始订阅了,但是这里有一个小小的优化,如果你建立的是持久会话的连接,那么有可能 Broker 已经保存你在之前的连接时订阅的主题,你就没有必要再发起 SUBSCRIBE 请求了,这个小优化在网络带宽或者设备处理能力较差的情况尤为重要。

  1. // subscriber.js
  2. var mqtt = require('mqtt')
  3. var client = mqtt.connect('mqtt://10.10.20.200:1883', {
  4. clientId: "mqtt_sample_subscriber_id_1",
  5. clean: false
  6. })
  7. client.on('connect', function (connack) {
  8. if(connack.returnCode == 0) {
  9. if (connack.sessionPresent == false) {
  10. console.log("subscribing")
  11. client.subscribe("home/2ndfloor/201/temperature", {
  12. qos: 1
  13. }, function (err, granted) {
  14. if (err != undefined) {
  15. console.log("subscribe failed")
  16. } else {
  17. console.log(`subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}`)
  18. }
  19. })
  20. }
  21. }else {
  22. console.log(`Connection failed: ${connack.returnCode}`)
  23. }
  24. })
  25. client.on("message", function (_, message, _) {
  26. var jsonPayload = JSON.parse(message.toString())
  27. console.log(`current temperature is ${jsonPayload.current}`)
  28. })

在终端上运行node subscriber.js 我们会得到以下输出:

  1. subscribing
  2. subscribe succeeded with home/2ndfloor/201/temperature, qos: 1

此时进程处理等待状态,关没有终止。

第一次运行这个代码的时候,Broker 上面没有保存这个 Client 的会话,所以需要进行订阅,现在 CTRL+C 终止这段代码的运行,然后重新运行,因为 Broker 上面已经保存了这个 Client 的会话,所以就不需要再订阅了,你就不会看到订阅相关的输出了。

因为之前运行过 node publisher.js,向“home/2ndfloor/201/temperature”这个主题发布过一个消息,但是这发生在 subscriber.js 订阅该主题之前,所以现在 Subscriber 不会收到任何消息,我们需要再运行一次 node publisher.js,然后在运行 subscriber.js 的终端上会输出:

  1. current temperature is 25

现在通过 MQTT 协议完成了一次点到点的消息传递,同时我们也验证了,建立持久性会话连接之后,Broker 会保存 Client 的订阅信息。

3. 取消订阅

Subcriber 也可以取消对某些主题的订阅,取消订阅的流程如下:
6. 订阅与发布(二) - 图2

  1. Client 向 Broker 发送一个 UNSUBSCRIBE 数据包,其中包含了 Client 想要取消订阅的主题;
  2. Broker 收到 UNSUBSCRIBE 数据包后,向 Client 发送一个 UNSUBACK 数据包作为应答。

看数据包的具体内容:

3.1 UNSUBSCRIBE

3.1.1 可变头(Variable header)

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从Sender 到 Receiver 的一次消息交互中保持唯一。

    3.1.2 消息体(Payload)

  • 主题列表(List of Topics):UNSUBSCRIBE 的消息体中包含 Client 想要取消订阅的主题过滤器列表,这些主题过滤器和 SUBSCRIBE 数据包中一样,可以包含通配符。UNSUBSCRIBE 消息体里面不再包含主题过滤器对应的 QoS 了。

    3.2 UNSUBACK

    Broker 收到 UNSUBSCRIBE 之后会回复一个 UNSUBACK 数据包作为应答:

    3.2.1 可变头(Variable header)

  • 数据包标识(Packet Identifier):两个字节,用来唯一标识一个数据包,数据包标识只需要保证在从Sender 到 Receiver 的一次消息交互中保持唯一。

    3.2.1 消息体(Payload)

    UNSUBACK 数据包没有消息体。

4. 代码实践:取消订阅

在建立连接之后取消对之前订阅的主题

  1. // unsubscribe.js
  2. var mqtt = require('mqtt')
  3. var client = mqtt.connect('mqtt://10.10.20.200:1883', {
  4. clientId: "mqtt_sample_subscriber_id_1",
  5. clean: false
  6. })
  7. client.on('connect', function (connack) {
  8. if (connack.returnCode == 0) {
  9. console.log("unsubscribing")
  10. client.unsubscribe("home/2ndfloor/201/temperature", function (err) {
  11. if (err != undefined) {
  12. console.log("unsubscribe failed")
  13. } else {
  14. console.log("unsubscribe succeeded")
  15. }
  16. client.end()
  17. })
  18. } else {
  19. console.log(`Connection failed: ${connack.returnCode}`)
  20. }
  21. })

在终端上运行node unsubscribe.js,会得到以下输出:

  1. unsubscribing
  2. unsubscribe succeeded

在这里取消了对“home/2ndfloor/201/temperature”的订阅,所以再运行 subscriber.js 和 publisher.js,再运行 subscribe.js 的终端不会再有消息的打印信息了。如何要使 subscriber.js 重新订阅这个主题,可以动下脑筋然后自己动手实现一下。

通配符

MQTT 的主题是具有层级概念的,不同的层级之间用“/”分割。
一个订阅可能包含特殊字符,允许你一次定义多个主题。主题层次分隔符被用来在主题中引入层次。多层的通配符和单层通配符可以被使用,但他们不能被使用来做发布者的消息。

主题层级分隔符 /

/被用来分割主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。

多层通配符

是一个匹配主题中任意层次数的通配符。比如说,如果你订阅了finance/stock/ibm/#,你就可以接收到以下这些主题的消息:

  1. finance/stock/ibm
  2. finance/stock/ibm/closingprice
  3. finance/stock/ibm/currentprice
  4. finance/stock/ibm/a/b
  5. finance/stock/ibm/hi/ok/d

多层通配符有可以表示大于等于0的层次。因此,finance/# 也可以匹配到单独的 finance,在这种情况下#代表0层。在这种语境下主题层次分隔符/就没有意义了。因为没有可以分的层次。
多层通配符只可以确定当前层或者下一层。因此,#和finance/#都是有效的,但是finance#不是有效的。多层通配符一定要是主题树的最后一个字符。比如说,finance/#是有效的,但是finance/#/closingprice是无效的。

单层通配符 +

  • 只匹配主题的一层。比如说,finance/stock/+ 匹配 finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因为单层通配符只匹配1层,finance/+不匹配finance。
    单层通配符可以被用于主题树的任意层级,连带多层通配符。它必须被用在主题层级分隔符/的右边,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+无效。单层通配符可以用在主题树的末端,也可以用在中间。比如说,finance/+和finance/+/ibm都是有效的。

    主题语法和用法

    当你建立一个应用,设计主题树的时候应该考虑以下的主题名字的语法和语义:
  • 主题至少有一个字符长。
  • 主题名字是大小写敏感的。比如说,ACCOUNTS和Accounts是两个不同的主题。
  • 主题名字可以包含空格。比如,Accounts payable是一个有效的主题。
  • 以/开头会产生一个不同的主题。比如说,/finnace与finance不同。/finance匹配”+/+”和/+,但不匹配+
  • 不要在任何主题中包含null(Unicode \x0000)字符。

以下的原则应用于主题树的建造和内容

  • 在主题树中,长度被限制于64k内但是在这以内没有限制层级的数目 。
  • 可以有任意数目的根节点;也就是说,可以有任意数目的主题树。