在本节中,我们将设计 IotHub 的设备在线状态管理功能。
如何得知一个设备是在线或离线,是大家经常问到的问题,也是在实际生产中非常必要的一个功能。

Poor man’s Solution

MQTT 协议并没有在协议级别约定如何对 Client 的在线状态进行管理,在《MQTT 协议快速入门》里我介绍过一个解决思路:

  • Client 在连接成功时向 TopicA 发送一个消息,指明 Client 已经上线;
  • Client 在连接时指定 LWT,Client 在离线时向 TopicA 发送一个 Retained 消息,表示已经离线;
  • 只要订阅 TopicA 就可以获取 Client 上线和离线的状态了。

这个解决方案在实际上是可行的,但是有一个问题就是,你始终需要保持一个接入 Broker 的 Client 来订阅 TopicA,如果说设备的数量往十万甚至几十万上去了,这个订阅 TopicA 的 Client 就很容易成为单点故障点,所以说这种解决方案的可扩展性比较差。

使用 EMQ X 的解决方案

EMQ X 提供了丰富的管理功能和接口,所以我们会使用 EMQ X 提供的功能来实现 IotHub 的设备连接状态管理功能。

系统主题

EMQ X 使用许多系统主题发布 Broker 内部的状态和事件,你可以在这里看到系统主题的列表。

其中,订阅 $SYS/brokers/${node}/clients/${clientid}/connected可以获取 Client 上线的事件;订阅$SYS/brokers/${node}/clients/${clientid}/disconnected可以获取 Client 离线的事件。

其中${node}是指 EMQ X 的节点名,你可以在< EMQ X 安装目录>/etc/emqx.conf 里找的 node.name 配置项,默认为 emqx@127.0.0.1。

那么,我们只需要订阅$SYS/brokers/+/clients/+/connected$SYS/brokers/+/clients/+/disconnected就可以获取到所有的上线和离线事件了。接下来写一点代码实验一下:

  1. // IotHub_Device/samples/sys_topics.js
  2. var mqtt = require('mqtt')
  3. var jwt = require('jsonwebtoken')
  4. require('dotenv').config()
  5. var password = jwt.sign({
  6. username: "jwt_user",
  7. exp: Math.floor(Date.now() / 1000) + 10
  8. }, process.env.JWT_SECRET)
  9. var client = mqtt.connect('mqtt://127.0.0.1:1883', {
  10. username: "jwt_user",
  11. password: password
  12. })
  13. client.on('connect', function () {
  14. console.log("connected")
  15. client.subscribe("$SYS/brokers/+/clients/+/connected")
  16. client.subscribe("$SYS/brokers/+/clients/+/disconnected")
  17. })
  18. client.on("message", function (_, message) {
  19. console.log(message.toString())
  20. })

然后先运行sys_topics.js,随后运行connect_to_server.js,接着关闭connect_to_server.js,我们会看到以下输出:

  1. {"clean_start":false,"clientid":"IotApp/V5MyuncRK","connack":0,"ipaddress":"127.0.0.1","keepalive":60,"proto_name":"MQTT","proto_ver":4,"ts":1558335733,"username":"IotApp/V5MyuncRK"}
  2. {"clientid":"IotApp/V5MyuncRK","username":"IotApp/V5MyuncRK","reason":"closed","ts":1558335752}

第一行是 Client connected 事件的信息,第二行是 Client disconnected 的信息,这些信息包含 ClientID、IP 地址、连接时间等,非常详细。
不过这种解决方案的缺点也很明显,和上面提到的一样,订阅这 2 个主题的 Client 很容易成为单点故障点。

基于 Hook 的解决方案

我比较喜欢 EMQ X 的一点就是,它设计了一套 Hook 系统,你可以通过这个 Hook 来捕获 Broker 内部的事件并进展处理。EMQ X 中 Hook 的定义如下图所示。
7.设备在线状态管理(一) - 图1
通常你需要编写一个插件来捕获并处理这些事件,不过 EMQ X 自带了一个 WebHook 插件,它的原理很简单,当像 Client 上线或下线之类的事件发生时,EMQ X 为把事件的信息 Post 到一个事先指定好的 URL 上,我们就可以进行处理了。
如何避免一个 Web 服务成为单点故障点,我想大家都应该很熟悉了,所以在这里我们使用基于 WebHook 的方式来实现设备的在线状态管理。

开启 WebHook

首先需要编辑 WebHook 的配置文件,将回调的 URL 指向本地运行 Express 应用:

  1. #< EMQ X 安装目录>/etc/plugins/emqx_web_hook.conf
  2. web.hook.api.url = http://127.0.0.1:3000/emqx_web_hook

运行 < EMQ X 安装目录>/bin/emqx_ctl plugins load emqx_web_hook
然后我们简单地实现一下这个 Hook, 把请求的参数打印出来:

  1. // IotHub_Server/routes/emqx_web_hook.js
  2. var express = require('express');
  3. var router = express.Router();
  4. router.post("/", function (req, res) {
  5. console.log(req.body)
  6. res.status(200).send("ok")
  7. })
  8. module.exports = router
  9. 复制
  10. // IotHub_Server/app.js
  11. var webHookeRouter = require('./routes/emqx_web_hook')
  12. app.use('/emqx_web_hook', webHookeRouter)

运行samples/connect_to_server,接着关闭,我们可以发现:
当 Client 连接时,EMQ X 会把以下的 JSON Post 到指定的 URL 上:

  1. {
  2. action: 'client_connected',
  3. client_id: 'IotApp/V5MyuncRK',
  4. username: 'IotApp/V5MyuncRK',
  5. keepalive: 60,
  6. ipaddress: '127.0.0.1',
  7. proto_ver: 4,
  8. connected_at: 1558338318,
  9. conn_ack: 0
  10. }

当 Client 断开连接时,EMQ X 会把以下的 JSON Post 到指定URL:

  1. {
  2. action: 'client_disconnected',
  3. client_id: 'IotApp/V5MyuncRK',
  4. username: 'IotApp/V5MyuncRK',
  5. reason: 'closed'
  6. }

connected_at 是指连接的时刻的 unix 时间戳。 这里要说明的是,/emq_web_hook 这个接口是在 Maque IotHub 内部使用的,不应该暴露给业务系统,本课程中为了让内容更紧凑,尽量跳过了这些属于 Web 编程的内容,但在实际项目中,是需要考虑的。

在 Client connect 和 disconnect 事件里面,包含了 Client 连接时使用 username, 而 username 里面包含了(ProductName, DeviceName),所以我们可以通过这些信息定位到是具体哪一个设备 connect 或者 disconnect,从而更新设备的连接状态。

在这一节里,我们讨论了设备状态管理的几种实现方式,并选择了目前最优的一种方式,下一节,我们来实现具体的功能。