这一课,我们来实现 IotHub 设备分组功能的服务端实现。
服务端需要在设备标签发生变化时,将标签信息重新下发到设备;在设备发起标签数据请求时,服务端又要响应这个请求,将设备的标签信息下发到设备。
同时,服务端需要提供接口,供业务系统修改设备的标签,并通过标签批量下发指令。
添加 tags 字段
在 Device 模型中,我们添加字段保存 tags 和 tags_version。
//IotHub_Server/models/device.jsconst deviceSchema = new Schema({...tags: {type: Array,default: []},tags_version: {type: Number,default: 1}})
在查询设备信息的时候将返回设备的 tags:
//IotHub_Server/models/device.jsdeviceSchema.methods.toJSONObject = function () {return {product_name: this.product_name,device_name: this.device_name,secret: this.secret,device_status: JSON.parse(this.device_status),tags: this.tags}}
添加 ACL 列表
我们需要把设备订阅的标签主题加入设备的 ACL 列表中。
//IotHub_Server/models/device.jsdeviceSchema.methods.getACLRule = function () {...const subscribe = [`tags/${this.product_name}/+/cmd/+/+/+/#`]...}
细心的读者可能会发现,这个主题名在 tag 这一层级也用了通配符,这样会允许 Client 订阅到不属于他的标签主题,但是在 Publish 的时候 ACL 是做了严格控制的,所以安全性还是可以得到保证的。这样的话每次修改设备标签的时候不用去修改设备的 ACL 列表,这是一个 trade off。
你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。
发送 tags 指令
这里把向指定设备发送 tags 指令做一个封装:
//IotHub_Server/models/device.jsdeviceSchema.methods.sendTags = function () {this.sendCommand({commandName: "$set_tags",data: JSON.stringify({tags: this.tags || [], tags_version: tags_version || 1}),qos: 0})}
由于设备在连接到 IotHub 的时候会主动请求标签信息,离线的标签指令对设备来说没有意义,所以这里使用 QoS0 发送指令。
相应地,发送指令的方法需要加上 QoS 参数:
//IotHub_Server/models/device.jsdeviceSchema.methods.sendCommand = function ({commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {return Device.sendCommand({productName: this.product_name,deviceName: this.device_name,commandName: commandName,data: data,encoding: encoding,ttl: ttl,commandType: commandType,qos: qos})}deviceSchema.statics.sendCommand = function ({productName, deviceName, commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {var requestId = new ObjectId().toHexString()var topic = `${commandType}/${productName}/${deviceName}/${commandName}/${encoding}/${requestId}`if (ttl != null) {topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`}emqxService.publishTo({topic: topic, payload: data, qos: qos})return requestId}
处理设备标签数据请求
在处理设备的标签数据请求的时候,我们做一个小小的优化,在设备的标签数据请求中也带上设备本地的 tags_version,当服务端的 tags_version 大于设备端的 tags_version 时才下发标签指令:
//IotHub_Server/services/message_service.jsstatic handleDataRequest({productName, deviceName, resource, payload, ts}) {if (resource.startsWith("$")) {if (resource == "$ntp") {...} else if (resource == "$tags") {Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {if (device != null) {var data = JSON.parse(payload.toString())if (data.tags_version < device.tags_version) {device.sendTags()}}})}} else {...}}
修改设备标签接口
Server API 提供一个接口供业务系统修改设备的标签,标签名用逗号分隔:
//IotHub_Server/route/devices.jsrouter.put("/:productName/:deviceName/tags", function (req, res) {var productName = req.params.productNamevar deviceName = req.params.deviceNamevar tags = req.body.tags.split(",")Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {if (err != null) {res.send(err)} else if (device != null) {device.tags = tagsdevice.tags_version += 1device.save()device.sendTags()res.status(200).send("ok")} else {res.status(404).send("device not found")}})}
批量指令下发接口
最后 Server API 需要提供接口供业务系统按照标签批量下发指令:
//IotHub_Server/routes/tags.jsvar express = require('express');var router = express.Router();const emqxService = require("../services/emqx_service")const ObjectId = require('bson').ObjectID;router.post("/:productName/:tag/command", function (req, res) {var productName = req.params.productNamevar ttl = req.body.ttl != null ? parseInt(req.body.ttl) : nullvar commandName = req.body.commandvar encoding = req.body.encoding || "plain"var data = req.body.datavar requestId = new ObjectId().toHexString()var topic = `tags/${productName}/${req.params.tag}/cmd/${commandName}/${encoding}/${requestId}`if (ttl != null) {topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`}emqxService.publishTo({topic: topic, payload: data})res.status(200).json({request_id: requestId})})module.exports = router
设备在回复批量下发的指令时,其流程和普通指令下发的流程是一样的,IotHub 也会用同样的方式将设备对指令的回复传递给业务系统,不同的是,在批量下发指令时,同一个 RequestID,业务系统会收到多个回复。
由于涉及多个设备的指令回复处理,批量指令下发无法提供 RPC 式的调用。
这一节我们完成了设备分组的服务端实现,下一节我们开始实现设备分组的设备端实现。
