这一节我们开始实现 IotHub 指令下发的 DeviceSDK 端的功能。首先进行消息去重, 接着使用正则表达式提取出元数据, 然后通过事件的方式将指令的数据传递给设备应用代码, 最后提供一个接口供设备对指令进行回复。

消息去重

在本课程里面我们会使用 node-persist 来存储已收到指令的 RequestID。
首先初始化存储:

  1. //IotHub_Device/sdk/iot_device.js
  2. const storage = require('node-persist');
  3. class IotDevice extends EventEmitter {
  4. constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
  5. ...
  6. storage.init({dir: `${storePath}/message_cache`})
  7. ...
  8. }

然后实现检查指令是否重复的函数:

  1. //IotHub_Device/sdk/iot_device.js
  2. class IotDevice extends EventEmitter {
  3. ...
  4. checkRequestDuplication(requestID, callback) {
  5. var key = `requests/${requestID}`
  6. storage.getItem(key, function (err, value) {
  7. if (value == null) {
  8. storage.setItem(key, 1, {ttl: 1000 * 3600 * 6})
  9. callback(false)
  10. } else {
  11. callback(false)
  12. }
  13. })
  14. }
  15. ...
  16. }

当 DeviceSDK 收到消息的时候,使用正则表达式匹配消息主题,并去重:

  1. //IotHub_Device/sdk/iot_device.js
  2. class IotDevice extends EventEmitter {
  3. connect() {
  4. ...
  5. this.client.on("message", function (topic, message) {
  6. self.dispatchMessage(topic, message)
  7. })
  8. }
  9. dispatchMessage(topic, payload){
  10. var cmdTopicRule = "cmd/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
  11. var result
  12. if((result = pathToRegexp(cmdTopicRule).exec(topic)) != null){
  13. this.checkRequestDuplication(result[6], function (isDup) {
  14. if (!isDup) {
  15. self.handleCommand({
  16. commandName: result[3],
  17. encoding: result[4],
  18. requestID: result[5],
  19. expiresAt: result[6] != null ? parseInt(result[6]) : null,
  20. payload: payload
  21. })
  22. }
  23. })
  24. }
  25. }
  26. ...
  27. }

因为 expiredAt 层级是可选的,所以用:expiresAt?来表示。

处理指令

DeviceSDK 对指令的处理流程如下:

  1. 检查指令是否过期;
  2. 根据 Encoding 对指令数据进行解码;
  3. 通过 Emit Event 的方式将指令传递给设备应用代码。
    1. //IotHub_Device/sdk/iot_device.js
    2. class IotDevice extends EventEmitter {
    3. ...
    4. handleCommand({commandName, requestID, encoding, payload, expiresAt}){
    5. if(expiresAt == null || expiresAt > Math.floor(Date.now() / 1000)){
    6. var data = payload;
    7. if(encoding == "base64"){
    8. data = Buffer.from(payload.toString(), "base64")
    9. }
    10. this.emit("command", commandName, data)
    11. }
    12. }
    13. ...
    14. }
    设备应用代码可以通过下面的方式来获取指令的内容:
    1. device.on("command", function(commandName, data){
    2. //处理指令
    3. })

    指令回复

    是否回复指令,以及什么时候回复指令是由设备的应用代码来决定的,DeviceSDK 没法强制约定,但是可以提供帮助函数来屏蔽掉回复指令所需要的的细节。
    这里可以通过闭包的方式来达成这个目的:
    1. //IotHub_Device/sdk/iot_device.js
    2. class IotDevice extends EventEmitter {
    3. ...
    4. handleCommand({commandName, requestID, encoding, payload, expiresAt}) {
    5. if (expiresAt == null || expiresAt < Math.floor(Date.now() / 1000)) {
    6. var data = payload;
    7. if (encoding == "base64") {
    8. data = Buffer.from(payload, "base64")
    9. }
    10. var respondCommand = function (respData) {
    11. var topic = `cmd_resp/${this.productName}/${this.deviceName}/${commandName}/${requestID}/${new ObjectId().toHexString()}`
    12. this.client.publish(topic, respData, {
    13. qos: 1
    14. })
    15. }
    16. this.emit("command", commandName, data, respondCommand)
    17. }
    18. }
    19. ...
    20. }
    在”command”事件里面我们额外传递出去一个闭包,包含了回复这个指令的具体代码,设备应用代码可以通过下面的方式来回复指令:
    1. device.on("command", function(commandName, data, respondCommand){
    2. //处理指令
    3. ...
    4. respondCommand("ok") //处理完毕后回复,可以带任何格式的数据,字符串或者二进制数据
    5. })
    这样 IotHub 内部的指令下发、回复的流程和细节对设备应用代码是完全透明的,符合我们对 DeviceSDK 的期望。

    在非 Node.js 的语言环境下,你也可以使用类似的编程技巧来完成,比如方法对象(Method Object)、匿名函数、内部类、Lambda、block 等。

总结一下,在 DeviceSDK 端,我们用 node-persist 来存储 RequestID,通过正则表达式对主题名进行模式匹配的方式提取出元数据,对指令的过期时间进行检查后,通过事件的方式将指令传递给设备应用代码,并使用闭包的方式提供接口供设备应用代码回复指令。

这一节我们基本完成了指令下发 DeviceSDK 端的功能代码,接下来我们开始实现 IotHub Server 端的功能。