在这一节我们来实现设备分组的设备端实现,设备在连接到 IotHub 时,需要主动请求标签数据,在收到来自服务端的标签数据时,需要对比本地存储的标签数据,然后 subscribe 或者 unsubscribe 对应的主题。最后我们会把代码连在一起进行测试。
设备端的持久性存储
由于需要和服务端的标签进行对比,设备需要在本地使用持久性的存储来保存已订阅的标签。一般来说,DeviceSDK 需要根据自身平台的特点来提供存储的接口,这里为了演示起见,我们使用存储 in-flight 消息的 Message Store 所使用的 levelDB 作为 DeviceSDK 的本地存储。这里我们把标签数据的存取进行封装:
//IotHub_Device/sdk/persistent_store.js
var level = require('level')
class PersistentStore {
constructor(dbPath) {
this.db = level(`${dbPath}/device_db/`)
}
getTags(callback) {
this.db.get("tags", function (error, value) {
if (error != null) {
callback({tags: [], tags_version: 0})
} else {
callback(JSON.parse(value))
}
})
}
saveTags(tags) {
this.db.put("tags", Buffer.from(JSON.stringify(tags)))
}
close() {
this.db.close()
}
}
module.exports = PersistentStore;
然后在初始化的时候加载持久性存储:
//IotHub_Device/sdk/iot_device.js
const PersistentStore = require("./persistent_storage")
constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
....
this.persistent_store = new PersistentStore(storePath)
}
标签数据处理
当收到 IotHub 下发的$set_tags
指令时,DeviceSDK 需要进行以下操作:
- 将指令数据里的 tags_version 和本地存储的 tags_version 进行比较,如果指令的 tags_version 不大于本地的 tags_version,忽略该指令,否则进入下一步;
- 比较本地保存的 tags 和指令数据里的 tags,对本地有而指令里没有的 tag,unsubscribe 相应的主题;
- 比较本地保存的 tags 和指令数据里的 tags,对本地没有而指令里有的 tag,subscribe相应的主题;
- 将指令里的 tags 和 tags_version 存入本地存储。
然后在接收//IotHub_Device/sdk/iot_device.js
setTags(serverTags) {
var self = this
var subscribe = []
var unsubscribe = []
this.persistent_store.getTags(function (localTags) {
if (localTags.tags_version < serverTags.tags_version) {
serverTags.tags.forEach(function (tag) {
if (localTags.tags.indexOf(tag) == -1) {
subscribe.push(`tags/${self.productName}/${tag}/cmd/+/+/+/#`)
}
})
localTags.tags.forEach(function (tag) {
if (serverTags.tags.indexOf(tag) == -1) {
unsubscribe.push(`tags/${self.productName}/${tag}/cmd/+/+/+/#`)
}
})
if(subscribe.length > 0) {
self.client.subscribe(subscribe, {qos: 1}, function (err, granted) {
console.log(granted)
})
}
if(unsubscribe.length > 0) {
self.client.unsubscribe(unsubscribe)
}
self.persistent_store.saveTags(serverTags)
}
})
}
$set_tags
命令时调用这个方法://IotHub_Device/sdk/iot_device.js
handleCommand({commandName, requestID, encoding, payload, expiresAt, commandType = "cmd"}) {
...
if (commandName.startsWith("$")) {
payload = JSON.parse(data.toString())
if (commandName == "$set_ntp") {
this.handleNTP(payload)
} else if (commandName == "$set_tags") {
this.setTags(payload)
}
} else {
...
}
}
}
标签数据请求
在设备连接到 IotHub 时,应该发起标签数据的请求:sendTagsRequest(){
this.sendDataRequest("$tags")
}
connect() {
...
this.client.on("connect", function () {
self.sendTagsRequest()
self.emit("online")
})
...
}
处理批量下发指令
设备在处理批量下发指令时,其流程和普通的指令下发没有区别,只是需要匹配批量指令下发的主题即可:
设备分组的的设备端实现就完成了。dispatchMessage(topic, payload) {
var cmdTopicRule = "(cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
var tagTopicRule = "tags/:productName/:tag/cmd/:commandName/:encoding/:requestID/:expiresAt?"
var result
if ((result = pathToRegexp(cmdTopicRule).exec(topic)) != null) {
...
}else if ((result = pathToRegexp(tagTopicRule).exec(topic)) != null) {
if (this.checkRequestDuplication(result[5])) {
this.handleCommand({
commandName: result[3],
encoding: result[4],
requestID: result[5],
expiresAt: result[6] != null ? parseInt(result[6]) : null,
payload: payload,
})
}
}
}
代码联调
设备获取标签信息
我们写一段简单的设备端代码,收到指令时,将指令的名称打印出来:
首先修改设备的标签为 test1、test2://IotHub_Device/samples/print_cmd.js ... device.on("online", function () { console.log("device is online") }) device.on("command", function (command) { console.log(`received cmd: ${command}`) }) device.connect()
然后运行curl -d "tags=test1,test2" http://localhost:3000/devices/IotApp/K-zHGEEmT/tags -X PUT
print_command.js
:
通过 EMQ X Web Management Console 检查 Client 的订阅情况,可以看到:
然后将设备的标签设为 “test1”:
通过EMQ X Web Management Console检查Client的订阅情况,可以看到:curl -d "tags=test1" http://localhost:3000/devices/IotApp/K-zHGEEmT/tags -X PUT
设备不再订阅到标签 test2 的主题了。指令批量下发
调用 Server API 向标签为 test1 的设备发送指令:
可以看到curl -d "command=echo" http://localhost:3000/tags/IotApp/test1/command -X POST
print_command.js
的输出为:
到这里,IotHub 的设备分组功能就完成了。device is online received cmd: echo
这一节我们完成 IotHub 的设备分组功能,并用代码进行了测试和验证。下一节我们来实现设备间通信功能。