这一课,我们来实现 IotHub 设备分组功能的服务端实现。
服务端需要在设备标签发生变化时,将标签信息重新下发到设备;在设备发起标签数据请求时,服务端又要响应这个请求,将设备的标签信息下发到设备。
同时,服务端需要提供接口,供业务系统修改设备的标签,并通过标签批量下发指令。
添加 tags 字段
在 Device 模型中,我们添加字段保存 tags 和 tags_version。
//IotHub_Server/models/device.js
const deviceSchema = new Schema({
...
tags: {
type: Array,
default: []
},
tags_version: {
type: Number,
default: 1
}
})
在查询设备信息的时候将返回设备的 tags:
//IotHub_Server/models/device.js
deviceSchema.methods.toJSONObject = function () {
return {
product_name: this.product_name,
device_name: this.device_name,
secret: this.secret,
device_status: JSON.parse(this.device_status),
tags: this.tags
}
}
添加 ACL 列表
我们需要把设备订阅的标签主题加入设备的 ACL 列表中。
//IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
...
const subscribe = [`tags/${this.product_name}/+/cmd/+/+/+/#`]
...
}
细心的读者可能会发现,这个主题名在 tag 这一层级也用了通配符,这样会允许 Client 订阅到不属于他的标签主题,但是在 Publish 的时候 ACL 是做了严格控制的,所以安全性还是可以得到保证的。这样的话每次修改设备标签的时候不用去修改设备的 ACL 列表,这是一个 trade off。
你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。
发送 tags 指令
这里把向指定设备发送 tags 指令做一个封装:
//IotHub_Server/models/device.js
deviceSchema.methods.sendTags = function () {
this.sendCommand({
commandName: "$set_tags",
data: JSON.stringify({tags: this.tags || [], tags_version: tags_version || 1}),
qos: 0
})
}
由于设备在连接到 IotHub 的时候会主动请求标签信息,离线的标签指令对设备来说没有意义,所以这里使用 QoS0 发送指令。
相应地,发送指令的方法需要加上 QoS 参数:
//IotHub_Server/models/device.js
deviceSchema.methods.sendCommand = function ({commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {
return Device.sendCommand({
productName: this.product_name,
deviceName: this.device_name,
commandName: commandName,
data: data,
encoding: encoding,
ttl: ttl,
commandType: commandType,
qos: qos
})
}
deviceSchema.statics.sendCommand = function ({productName, deviceName, commandName, data, encoding = "plain", ttl = undefined, commandType = "cmd", qos = 1}) {
var requestId = new ObjectId().toHexString()
var topic = `${commandType}/${productName}/${deviceName}/${commandName}/${encoding}/${requestId}`
if (ttl != null) {
topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
}
emqxService.publishTo({topic: topic, payload: data, qos: qos})
return requestId
}
处理设备标签数据请求
在处理设备的标签数据请求的时候,我们做一个小小的优化,在设备的标签数据请求中也带上设备本地的 tags_version,当服务端的 tags_version 大于设备端的 tags_version 时才下发标签指令:
//IotHub_Server/services/message_service.js
static handleDataRequest({productName, deviceName, resource, payload, ts}) {
if (resource.startsWith("$")) {
if (resource == "$ntp") {
...
} else if (resource == "$tags") {
Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
if (device != null) {
var data = JSON.parse(payload.toString())
if (data.tags_version < device.tags_version) {
device.sendTags()
}
}
})
}
} else {
...
}
}
修改设备标签接口
Server API 提供一个接口供业务系统修改设备的标签,标签名用逗号分隔:
//IotHub_Server/route/devices.js
router.put("/:productName/:deviceName/tags", function (req, res) {
var productName = req.params.productName
var deviceName = req.params.deviceName
var tags = req.body.tags.split(",")
Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {
if (err != null) {
res.send(err)
} else if (device != null) {
device.tags = tags
device.tags_version += 1
device.save()
device.sendTags()
res.status(200).send("ok")
} else {
res.status(404).send("device not found")
}
})
}
批量指令下发接口
最后 Server API 需要提供接口供业务系统按照标签批量下发指令:
//IotHub_Server/routes/tags.js
var express = require('express');
var router = express.Router();
const emqxService = require("../services/emqx_service")
const ObjectId = require('bson').ObjectID;
router.post("/:productName/:tag/command", function (req, res) {
var productName = req.params.productName
var ttl = req.body.ttl != null ? parseInt(req.body.ttl) : null
var commandName = req.body.command
var encoding = req.body.encoding || "plain"
var data = req.body.data
var requestId = new ObjectId().toHexString()
var topic = `tags/${productName}/${req.params.tag}/cmd/${commandName}/${encoding}/${requestId}`
if (ttl != null) {
topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
}
emqxService.publishTo({topic: topic, payload: data})
res.status(200).json({request_id: requestId})
})
module.exports = router
设备在回复批量下发的指令时,其流程和普通指令下发的流程是一样的,IotHub 也会用同样的方式将设备对指令的回复传递给业务系统,不同的是,在批量下发指令时,同一个 RequestID,业务系统会收到多个回复。
由于涉及多个设备的指令回复处理,批量指令下发无法提供 RPC 式的调用。
这一节我们完成了设备分组的服务端实现,下一节我们开始实现设备分组的设备端实现。