1. 保留消息(Retained Messages)

来看一个场景,你有一个温度传感器,它每三个小时向一个主题发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。

MQTT中,无论是发布还是订阅都不会有任何触发事件。

rabbitmq 中有事件交换器插件 amq.rabbitmq.event 就可以很方便处理你订阅的事件。

上面的场景,保留消息(Retained)就可以解决。Retained消息是指在PUBLISH数据包中保留标识设为1的消息,Broker收到这样的PUBLISH包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker会马上将这个消息发送给订阅者。
保留消息有以下一些特点:

  • 一个Topic只有唯一的retain消息,Broker会保存每个Topic的最后一条retain消息。发布新的保留消息将覆盖老的保留消息;
  • 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息;
  • 每个Client订阅Topic后会立即读取到retain消息,不必要等待发送;
  • 只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息。发送到订阅者时,消息的保留标识仍然是1,订阅者可以判断这个消息是否是保留消息,以做相应的处理。

    Retained 消息和持久性会话没有任何关系,Retained 消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。

发布消息时把retain设置为true,即为保留信息。
如果需要删除retain消息,可以发布一个空的retain消息,因为每个新的retain消息都会覆盖最后一个retain消息。

1.1 代码实践:PHP发布Retained消息

发布者 **publisher.php**

  1. $client = new Mosquitto\Client();
  2. //连接broker
  3. $client->connect('10.10.20.200', 1883, 5);
  4. /**
  5. * 发布消息
  6. * publish($topic, $payload[, $qos = 0, $retain = false])
  7. * $topic (string) – 要发表的主题
  8. * $payload (string) – 消息体
  9. * $qos (int) – 服务质量,值0,``1或2
  10. * $retain (boolean) – 是否保留此消息,默认为false
  11. */
  12. $client->publish('chan', 'test', 2, true);
  13. //删除retain消息
  14. //$client->publish('chan', '', 2, true);
  15. $client->loopForever();

订阅者 **subscriber.php**

  1. //Mosquitto客户端类
  2. $client = new Mosquitto\Client();
  3. //连接broker
  4. $client->connect('10.10.20.200', 1883, 5);
  5. //订阅的topic为chan,服务质量为2
  6. $client->subscribe('chan', 2);
  7. //打印订阅消息
  8. $client->onMessage(function($message) {
  9. var_dump($message);
  10. //var_dump(json_decode($message->payload));
  11. });
  12. $client->loopForever();

执行 publisher.php,再执行 subscriber.php ,可以看到 subscriber.php 窗口有输出,说明发布之后再订阅主题也能收到 Retained 消息。
关闭两个窗口,再执行 **subscriber.php ** 同样能收到消息。可以尝试下如何删除保留消息。

1.2 代码实践:Node 发布和接收 Retained 消息

**publisher.js**

  1. var mqtt = require('mqtt')
  2. var client = mqtt.connect('mqtt://10.10.20.200:1883', {
  3. clientId: "mqtt_sample_publisher_1",
  4. clean: false
  5. })
  6. client.on('connect', function (connack) {
  7. if(connack.returnCode == 0){
  8. client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0,
  9. retain: 1}, function (err) {
  10. if(err == undefined) {
  11. console.log("Publish finished")
  12. client.end()
  13. }else{
  14. console.log("Publish failed")
  15. }
  16. })
  17. }else{
  18. console.log(`Connection failed: ${connack.returnCode}`)
  19. }
  20. })

**subscriber.js**

  1. var mqtt = require('mqtt')
  2. var client = mqtt.connect('10.10.20.200:1883', {
  3. clientId: "mqtt_sample_subscriber_id_chapter_8",
  4. clean: false
  5. })
  6. client.on('connect', function (connack) {
  7. if(connack.returnCode == 0) {
  8. if (connack.sessionPresent == false) {
  9. console.log("subscribing")
  10. client.subscribe("home/2ndfloor/201/temperature", {
  11. qos: 0
  12. }, function (err, granted) {
  13. if (err != undefined) {
  14. console.log("subscribe failed")
  15. } else {
  16. console.log(`subscribe succeeded with ${granted[0].topic}, qos:
  17. ${granted[0].qos}`)
  18. }
  19. })
  20. }
  21. }else {
  22. console.log(`Connection failed: ${connack.returnCode}`)
  23. }
  24. })
  25. client.on("message", function (_, message, packet) {
  26. var jsonPayload = JSON.parse(message.toString())
  27. console.log(`retained: ${packet.retain}, temperature: ${jsonPayload.current}`)
  28. })

我们首先运行node publish_retained.js,再运行node subscribe_retained.js,会得到以下输出:retained: true, temperature: 25

可见我们在 Publisher 发布之后再订阅主题也能收到 Retained 消息。
然后我们再运行一次node publish_retained.js,在运行 subscribe_retained.js 的终端会有以下输出: retained: true, temperature: 25

Broker 收到 Retained 消息以后,只是保存这个消息,然后按照正常的转发逻辑转发给订阅者,所以对订阅者来说,这个是一个普通的 MQTT 消息,所以 Retain 标识为 0。

然后Ctrl+C 关闭掉 subscribe_retained.js,然后重新运行,终端不会有任何输出,可见 Retained 消息只对新订阅的订阅者有效。

2. 遗愿消息(LWT)

LWT 全称为 Last Will and Testament,也就是我们在连接到 Broker 时提到的遗愿/嘱,包括遗愿主题、遗愿QoS、遗愿消息等。

当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息(跟人写遗嘱一样,只有死了之后,遗嘱才会通知)。遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的,如下特点:

  • Will Flag:是否使用 LWT
  • Will Topic:遗愿主题名,不可使用通配符
  • Will Qos:发布遗愿消息时使用的 QoS
  • Will Retain:遗愿消息的 Retain 标识
  • Will Message:遗愿消息内容

Broker 在以下情况下认为 Client 是非正常断开连接的:
1. Broker 检测到底层的 I/O 异常;
2. Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
3. Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
4. Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。

如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。

通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT。

2.1 Node代码实践:监控 Client 连接状态

实现 Client 连接状态监控的原理很简单:

  1. Client 在连接的时候指定 Will Topic 为“client/status”,遗愿消息为“offline”,Will Retain=1;
  2. Client 在连接成功以后向同一个主题“client/status”,发布一个内容为“online”的 Retained 消息。

那么订阅者在任何时候订阅“client/status”,都会获取 Client 当前的连接状态。

**client.js**

  1. var mqtt = require('mqtt')
  2. var client = mqtt.connect('10.10.20.200:1883', {
  3. clientId: "mqtt_sample_publisher_chapter_8",
  4. clean: false,
  5. will:{
  6. topic : 'client/status',
  7. qos: 1,
  8. retain: true,
  9. payload: JSON.stringify({status: 'offline'})
  10. }
  11. })
  12. client.on('connect', function (connack) {
  13. if(connack.returnCode == 0){
  14. client.publish("client/status", JSON.stringify({status: 'online'}), {qos: 1, retain:1})
  15. }else{
  16. console.log(`Connection failed: ${connack.returnCode}`)
  17. }
  18. })

**monitor.js**

  1. var mqtt = require('mqtt')
  2. var client = mqtt.connect('10.10.20.200:1883', {
  3. clientId: "mqtt_sample_subscriber_id_chapter_8_2",
  4. clean: false
  5. })
  6. client.on('connect', function () {
  7. client.subscribe("client/status", {qos: 1})
  8. })
  9. client.on("message", function (_, message) {
  10. var jsonPayload = JSON.parse(message.toString())
  11. console.log(`client is ${jsonPayload.status}`)
  12. })

在monitor.js中,我们每次连接的时候都重新订阅“client / status”,这样的话每次运行都能收到关于Client连接状态的保留消息。
首先运行node client.js,然后运行node monitor.js,会得到以下输出:client is online

在运行client.js的终端上,使用Ctrl+C终止client.js,之后在运行monitor.js的终端上会得到以下输出:client is offline

然后重新运行node client.js,在运行monitor.js的终端上会得到以下输出:client is online

Ctrl+C终止monitor.js,然后重新运行node monitor.js,会得到以下输出:client is online

这样我们就完美地监控了Client的连接状态。

PHP实践参考 《附2. 实战模拟设备上线/下线(php-mosquitto)》