这一课,我们来实现 IotHub 设备分组功能的服务端实现。
服务端需要在设备标签发生变化时,将标签信息重新下发到设备;在设备发起标签数据请求时,服务端又要响应这个请求,将设备的标签信息下发到设备。
同时,服务端需要提供接口,供业务系统修改设备的标签,并通过标签批量下发指令。

添加 tags 字段

在 Device 模型中,我们添加字段保存 tags 和 tags_version。

  1. //IotHub_Server/models/device.js
  2. const deviceSchema = new Schema({
  3. ...
  4. tags: {
  5. type: Array,
  6. default: []
  7. },
  8. tags_version: {
  9. type: Number,
  10. default: 1
  11. }
  12. })

在查询设备信息的时候将返回设备的 tags:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.toJSONObject = function () {
  3. return {
  4. product_name: this.product_name,
  5. device_name: this.device_name,
  6. secret: this.secret,
  7. device_status: JSON.parse(this.device_status),
  8. tags: this.tags
  9. }
  10. }

添加 ACL 列表

我们需要把设备订阅的标签主题加入设备的 ACL 列表中。

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.getACLRule = function () {
  3. ...
  4. const subscribe = [`tags/${this.product_name}/+/cmd/+/+/+/#`]
  5. ...
  6. }

细心的读者可能会发现,这个主题名在 tag 这一层级也用了通配符,这样会允许 Client 订阅到不属于他的标签主题,但是在 Publish 的时候 ACL 是做了严格控制的,所以安全性还是可以得到保证的。这样的话每次修改设备标签的时候不用去修改设备的 ACL 列表,这是一个 trade off。

你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。

发送 tags 指令

这里把向指定设备发送 tags 指令做一个封装:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.sendTags = function () {
  3. this.sendCommand({
  4. commandName: "$set_tags",
  5. data: JSON.stringify({tags: this.tags || [], tags_version: tags_version || 1}),
  6. qos: 0
  7. })
  8. }

由于设备在连接到 IotHub 的时候会主动请求标签信息,离线的标签指令对设备来说没有意义,所以这里使用 QoS0 发送指令。
相应地,发送指令的方法需要加上 QoS 参数:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.sendCommand = function ({commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {
  3. return Device.sendCommand({
  4. productName: this.product_name,
  5. deviceName: this.device_name,
  6. commandName: commandName,
  7. data: data,
  8. encoding: encoding,
  9. ttl: ttl,
  10. commandType: commandType,
  11. qos: qos
  12. })
  13. }
  14. deviceSchema.statics.sendCommand = function ({productName, deviceName, commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {
  15. var requestId = new ObjectId().toHexString()
  16. var topic = `${commandType}/${productName}/${deviceName}/${commandName}/${encoding}/${requestId}`
  17. if (ttl != null) {
  18. topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
  19. }
  20. emqxService.publishTo({topic: topic, payload: data, qos: qos})
  21. return requestId
  22. }

处理设备标签数据请求

在处理设备的标签数据请求的时候,我们做一个小小的优化,在设备的标签数据请求中也带上设备本地的 tags_version,当服务端的 tags_version 大于设备端的 tags_version 时才下发标签指令:

  1. //IotHub_Server/services/message_service.js
  2. static handleDataRequest({productName, deviceName, resource, payload, ts}) {
  3. if (resource.startsWith("$")) {
  4. if (resource == "$ntp") {
  5. ...
  6. } else if (resource == "$tags") {
  7. Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
  8. if (device != null) {
  9. var data = JSON.parse(payload.toString())
  10. if (data.tags_version < device.tags_version) {
  11. device.sendTags()
  12. }
  13. }
  14. })
  15. }
  16. } else {
  17. ...
  18. }
  19. }

修改设备标签接口

Server API 提供一个接口供业务系统修改设备的标签,标签名用逗号分隔:

  1. //IotHub_Server/route/devices.js
  2. router.put("/:productName/:deviceName/tags", function (req, res) {
  3. var productName = req.params.productName
  4. var deviceName = req.params.deviceName
  5. var tags = req.body.tags.split(",")
  6. Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {
  7. if (err != null) {
  8. res.send(err)
  9. } else if (device != null) {
  10. device.tags = tags
  11. device.tags_version += 1
  12. device.save()
  13. device.sendTags()
  14. res.status(200).send("ok")
  15. } else {
  16. res.status(404).send("device not found")
  17. }
  18. })
  19. }

批量指令下发接口

最后 Server API 需要提供接口供业务系统按照标签批量下发指令:

  1. //IotHub_Server/routes/tags.js
  2. var express = require('express');
  3. var router = express.Router();
  4. const emqxService = require("../services/emqx_service")
  5. const ObjectId = require('bson').ObjectID;
  6. router.post("/:productName/:tag/command", function (req, res) {
  7. var productName = req.params.productName
  8. var ttl = req.body.ttl != null ? parseInt(req.body.ttl) : null
  9. var commandName = req.body.command
  10. var encoding = req.body.encoding || "plain"
  11. var data = req.body.data
  12. var requestId = new ObjectId().toHexString()
  13. var topic = `tags/${productName}/${req.params.tag}/cmd/${commandName}/${encoding}/${requestId}`
  14. if (ttl != null) {
  15. topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
  16. }
  17. emqxService.publishTo({topic: topic, payload: data})
  18. res.status(200).json({request_id: requestId})
  19. })
  20. module.exports = router

设备在回复批量下发的指令时,其流程和普通指令下发的流程是一样的,IotHub 也会用同样的方式将设备对指令的回复传递给业务系统,不同的是,在批量下发指令时,同一个 RequestID,业务系统会收到多个回复。
由于涉及多个设备的指令回复处理,批量指令下发无法提供 RPC 式的调用。

这一节我们完成了设备分组的服务端实现,下一节我们开始实现设备分组的设备端实现。