在这一节我们来实现设备分组的设备端实现,设备在连接到 IotHub 时,需要主动请求标签数据,在收到来自服务端的标签数据时,需要对比本地存储的标签数据,然后 subscribe 或者 unsubscribe 对应的主题。最后我们会把代码连在一起进行测试。

设备端的持久性存储

由于需要和服务端的标签进行对比,设备需要在本地使用持久性的存储来保存已订阅的标签。一般来说,DeviceSDK 需要根据自身平台的特点来提供存储的接口,这里为了演示起见,我们使用存储 in-flight 消息的 Message Store 所使用的 levelDB 作为 DeviceSDK 的本地存储。这里我们把标签数据的存取进行封装:

  1. //IotHub_Device/sdk/persistent_store.js
  2. var level = require('level')
  3. class PersistentStore {
  4. constructor(dbPath) {
  5. this.db = level(`${dbPath}/device_db/`)
  6. }
  7. getTags(callback) {
  8. this.db.get("tags", function (error, value) {
  9. if (error != null) {
  10. callback({tags: [], tags_version: 0})
  11. } else {
  12. callback(JSON.parse(value))
  13. }
  14. })
  15. }
  16. saveTags(tags) {
  17. this.db.put("tags", Buffer.from(JSON.stringify(tags)))
  18. }
  19. close() {
  20. this.db.close()
  21. }
  22. }
  23. module.exports = PersistentStore;

然后在初始化的时候加载持久性存储:

  1. //IotHub_Device/sdk/iot_device.js
  2. const PersistentStore = require("./persistent_storage")
  3. constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
  4. ....
  5. this.persistent_store = new PersistentStore(storePath)
  6. }

标签数据处理

当收到 IotHub 下发的$set_tags指令时,DeviceSDK 需要进行以下操作:

  1. 将指令数据里的 tags_version 和本地存储的 tags_version 进行比较,如果指令的 tags_version 不大于本地的 tags_version,忽略该指令,否则进入下一步;
  2. 比较本地保存的 tags 和指令数据里的 tags,对本地有而指令里没有的 tag,unsubscribe 相应的主题;
  3. 比较本地保存的 tags 和指令数据里的 tags,对本地没有而指令里有的 tag,subscribe相应的主题;
  4. 将指令里的 tags 和 tags_version 存入本地存储。
    1. //IotHub_Device/sdk/iot_device.js
    2. setTags(serverTags) {
    3. var self = this
    4. var subscribe = []
    5. var unsubscribe = []
    6. this.persistent_store.getTags(function (localTags) {
    7. if (localTags.tags_version < serverTags.tags_version) {
    8. serverTags.tags.forEach(function (tag) {
    9. if (localTags.tags.indexOf(tag) == -1) {
    10. subscribe.push(`tags/${self.productName}/${tag}/cmd/+/+/+/#`)
    11. }
    12. })
    13. localTags.tags.forEach(function (tag) {
    14. if (serverTags.tags.indexOf(tag) == -1) {
    15. unsubscribe.push(`tags/${self.productName}/${tag}/cmd/+/+/+/#`)
    16. }
    17. })
    18. if(subscribe.length > 0) {
    19. self.client.subscribe(subscribe, {qos: 1}, function (err, granted) {
    20. console.log(granted)
    21. })
    22. }
    23. if(unsubscribe.length > 0) {
    24. self.client.unsubscribe(unsubscribe)
    25. }
    26. self.persistent_store.saveTags(serverTags)
    27. }
    28. })
    29. }
    然后在接收$set_tags命令时调用这个方法:
    1. //IotHub_Device/sdk/iot_device.js
    2. handleCommand({commandName, requestID, encoding, payload, expiresAt, commandType = "cmd"}) {
    3. ...
    4. if (commandName.startsWith("$")) {
    5. payload = JSON.parse(data.toString())
    6. if (commandName == "$set_ntp") {
    7. this.handleNTP(payload)
    8. } else if (commandName == "$set_tags") {
    9. this.setTags(payload)
    10. }
    11. } else {
    12. ...
    13. }
    14. }
    15. }

    标签数据请求

    在设备连接到 IotHub 时,应该发起标签数据的请求:
    1. sendTagsRequest(){
    2. this.sendDataRequest("$tags")
    3. }
    4. connect() {
    5. ...
    6. this.client.on("connect", function () {
    7. self.sendTagsRequest()
    8. self.emit("online")
    9. })
    10. ...
    11. }

    处理批量下发指令

    设备在处理批量下发指令时,其流程和普通的指令下发没有区别,只是需要匹配批量指令下发的主题即可:
    1. dispatchMessage(topic, payload) {
    2. var cmdTopicRule = "(cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
    3. var tagTopicRule = "tags/:productName/:tag/cmd/:commandName/:encoding/:requestID/:expiresAt?"
    4. var result
    5. if ((result = pathToRegexp(cmdTopicRule).exec(topic)) != null) {
    6. ...
    7. }else if ((result = pathToRegexp(tagTopicRule).exec(topic)) != null) {
    8. if (this.checkRequestDuplication(result[5])) {
    9. this.handleCommand({
    10. commandName: result[3],
    11. encoding: result[4],
    12. requestID: result[5],
    13. expiresAt: result[6] != null ? parseInt(result[6]) : null,
    14. payload: payload,
    15. })
    16. }
    17. }
    18. }
    设备分组的的设备端实现就完成了。

    代码联调

    设备获取标签信息

    我们写一段简单的设备端代码,收到指令时,将指令的名称打印出来:
    //IotHub_Device/samples/print_cmd.js
    ...
    device.on("online", function () {
     console.log("device is online")
    })
    device.on("command", function (command) {
     console.log(`received cmd: ${command}`)
    })
    device.connect()
    
    首先修改设备的标签为 test1、test2:
    curl -d "tags=test1,test2" http://localhost:3000/devices/IotApp/K-zHGEEmT/tags -X PUT
    
    然后运行 print_command.js:
    通过 EMQ X Web Management Console 检查 Client 的订阅情况,可以看到:
    29.设备分组——设备端实现 - 图1
    然后将设备的标签设为 “test1”:
    curl -d "tags=test1" http://localhost:3000/devices/IotApp/K-zHGEEmT/tags -X PUT
    
    通过EMQ X Web Management Console检查Client的订阅情况,可以看到:
    29.设备分组——设备端实现 - 图2
    设备不再订阅到标签 test2 的主题了。

    指令批量下发

    调用 Server API 向标签为 test1 的设备发送指令:
    curl -d "command=echo" http://localhost:3000/tags/IotApp/test1/command -X POST
    
    可以看到print_command.js的输出为:
    device is online
    received cmd: echo
    
    到这里,IotHub 的设备分组功能就完成了。

这一节我们完成 IotHub 的设备分组功能,并用代码进行了测试和验证。下一节我们来实现设备间通信功能。