在这一节中,我们会按照上一节的设计,编写代码来实现 IotHub 的设备在线状态管理功能。
首先来设计一下设备状态管理功能。

  1. 设备不保存在线与否这个 boolean 值,而是保存一个 connection 的列表,包含了所有用这个设备的三元组接入的 connection,connection 的信息由 WebHook 捕获的 client_connected 事件提供。
  2. 当 client_connected 时,通过 username 里面 ProductName 和 DeviceName 查找到 Device 记录,然后用 ClientID 查找 Device 的 connection 列表,如果不存在该 ClientID 的 connection 记录,就新增一条 connection 记录;如果存在,则更新这条 connection 记录,状态为 connected。
  3. 当 client_disconnected时,通过 username 里面 ProductName 和 DeviceName 查找到 Device 记录,然后用 ClientID 查找 Device 的 connection 列表,如果存在该 ClientID 的 connection 记录,则更新这条connection 记录,状态为 disconnected。
  4. 业务系统可以通过查询设备详情来获取设备的连接状态。

通过这样的设计,我们不仅可以知道一个设备是否在线,还能知道其连接的具体信息。

Connection模型

我们定义一个 Connection 模型来存储连接信息:

  1. // IotHub_Server/models/connection.js
  2. const connectionSchema = new Schema({
  3. connected: Boolean,
  4. client_id: String,
  5. keepalive: Number,
  6. ipaddress: String,
  7. proto_ver: Number,
  8. connected_at: Number,
  9. disconnect_at: Number,
  10. conn_ack: Number,
  11. device: {type: Schema.Types.ObjectId, ref: 'Device'}
  12. })
  13. const Connection = mongoose.model("Connection", connectionSchema);

实现 Hook

我们在 Device 类里用两个方法来实现处理 connect 和 disconnect 事件:

  1. // IotHub_Server/models/device.js
  2. //connected
  3. deviceSchema.statics.addConnection = function (event) {
  4. var username_arr = event.username.split("/")
  5. this.findOne({product_name: username_arr[0], device_name: username_arr[1]}, function (err, device) {
  6. if (err == null && device != null) {
  7. Connection.findOneAndUpdate({
  8. client_id: event.client_id,
  9. device: device._id
  10. }, {
  11. connected: true,
  12. client_id: event.client_id,
  13. keepalive: event.keepalive,
  14. ipaddress: event.ipaddress,
  15. proto_ver: event.proto_ver,
  16. connected_at: event.connected_at,
  17. conn_ack: event.conn_ack,
  18. device: device._id
  19. }, {upsert: true, useFindAndModify: false, new: true}).exec()
  20. }
  21. })
  22. }
  23. //disconnect
  24. deviceSchema.statics.removeConnection = function (event) {
  25. var username_arr = event.username.split("/")
  26. this.findOne({product_name: username_arr[0], device_name: username_arr[1]}, function (err, device) {
  27. if (err == null && device != null) {
  28. Connection.findOneAndUpdate({client_id: event.client_id, device: device._id},
  29. {
  30. connected: false,
  31. disconnect_at: Math.floor(Date.now() / 1000)
  32. }, {useFindAndModify: false}).exec()
  33. }
  34. })
  35. }

接着在对应事件的时候调用上面现实的方法:

  1. //IotHub_Server/routes/emqx_web_hook
  2. router.post("/", function (req, res) {
  3. if (req.body.action == "client_connected") {
  4. Device.addConnection(req.body)
  5. }else if(req.body.action == "client_disconnected"){
  6. Device.removeConnection(req.body)
  7. }
  8. res.status(200).send("ok")
  9. })

然后我们修改一下设备详情接口,返回 connection 信息:

  1. // IotHub_Server/models/connection.js
  2. connectionSchema.methods.toJSONObject = function () {
  3. return {
  4. connected: this.connected,
  5. client_id: this.client_id,
  6. ipaddress: this.ipaddress,
  7. connected_at: this.connected_at,
  8. disconnect_at: this.disconnect_at
  9. }
  10. }
  1. // IotHub_Server/routes/devices.js
  2. router.get("/:productName/:deviceName", function (req, res) {
  3. var productName = req.params.productName
  4. var deviceName = req.params.deviceName
  5. Device.findOne({"product_name": productName, "device_name": deviceName}).exec(function (err, device) {
  6. if (err) {
  7. res.send(err)
  8. } else {
  9. if (device != null) {
  10. Connection.find({device: device._id}, function (_, connections) {
  11. res.json(Object.assign(device.toJSONObject(), {
  12. connections: connections.map(function (conn) {
  13. return conn.toJSONObject()
  14. })
  15. }))
  16. })
  17. } else {
  18. res.status(404).json({error: "Not Found"})
  19. }
  20. }
  21. })
  22. })

这个时候运行IotHub_Device/samples/connect_to_server.js,然后调用设备详情接口curl http://localhost:3000/devices/IotApp/c-jOc-2qq,会得到以下输出:

  1. {"product_name":"IotApp","device_name":"c-jOc-2qq","secret":"m0PfE0DcNC","connections":[{"connected":true,"client_id":"IotApp/c-jOc-2qq","ipaddress":"127.0.0.1","connected_at":1558354603}]}

然后关闭connect_to_server.js,再一次调用设备详情接口curl http://localhost:3000/devices/IotApp/c-jOc-2qq,会得到以下输出:

  1. {"product_name":"IotApp","device_name":"c-jOc-2qq","secret":"m0PfE0DcNC","connections":[{"connected":false,"client_id":"IotApp/c-jOc-2qq","ipaddress":"127.0.0.1","connected_at":1558354603,"disconnect_at":1558355260}]}

由于 DeviceName 是随机生成的,在使用 curl 时,应该用电脑上生成的 DeviceName 替代相应参数或者路径。

这样我们就完成了设备状态管理,业务系统通过查询设备详情,就可以知道设备是否在线了(有 connected==true 的 connection 记录),以及设备连接的一些附加信息。
细心的读者也许已经发现了,我们这样的解决方案是有一点瑕疵的:

  • 性能问题,每次设备上下线,包括 Publish/Subscribe 等,EMQ X 都发起一个 HTTP POST,这肯定是有损耗的,至于多大损耗以及能不能接受这损耗,取决于你的业务和数据量;
  • 由于 Web 服务是并发的,所以说有可能出现在很短时间内发生的一对 connect/disconnect 事件, disconnect 会比 connect 先处理,导致设备的连接状态不正确;
  • 设备下线时间我们取的是处理这个事件的时间,不准确。

在本课程的后面,我们会实现一个基于 RabbitMQ 的插件来替换 WebHook, 在那个时候,我们再来尝试解决这些问题,在这之前,我们都将用 WebHook 来完成功能验证。

这一节我们完成设备在线状态管理的全部功能,接下来我们完成设备的禁用和删除。