这一节我们开始实现 IotHub 指令下发的 DeviceSDK 端的功能。首先进行消息去重, 接着使用正则表达式提取出元数据, 然后通过事件的方式将指令的数据传递给设备应用代码, 最后提供一个接口供设备对指令进行回复。
消息去重
在本课程里面我们会使用 node-persist 来存储已收到指令的 RequestID。
首先初始化存储:
//IotHub_Device/sdk/iot_device.jsconst storage = require('node-persist');class IotDevice extends EventEmitter {constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {...storage.init({dir: `${storePath}/message_cache`})...}
然后实现检查指令是否重复的函数:
//IotHub_Device/sdk/iot_device.jsclass IotDevice extends EventEmitter {...checkRequestDuplication(requestID, callback) {var key = `requests/${requestID}`storage.getItem(key, function (err, value) {if (value == null) {storage.setItem(key, 1, {ttl: 1000 * 3600 * 6})callback(false)} else {callback(false)}})}...}
当 DeviceSDK 收到消息的时候,使用正则表达式匹配消息主题,并去重:
//IotHub_Device/sdk/iot_device.jsclass IotDevice extends EventEmitter {connect() {...this.client.on("message", function (topic, message) {self.dispatchMessage(topic, message)})}dispatchMessage(topic, payload){var cmdTopicRule = "cmd/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"var resultif((result = pathToRegexp(cmdTopicRule).exec(topic)) != null){this.checkRequestDuplication(result[6], function (isDup) {if (!isDup) {self.handleCommand({commandName: result[3],encoding: result[4],requestID: result[5],expiresAt: result[6] != null ? parseInt(result[6]) : null,payload: payload})}})}}...}
因为 expiredAt 层级是可选的,所以用
:expiresAt?来表示。
处理指令
DeviceSDK 对指令的处理流程如下:
- 检查指令是否过期;
- 根据 Encoding 对指令数据进行解码;
- 通过 Emit Event 的方式将指令传递给设备应用代码。
设备应用代码可以通过下面的方式来获取指令的内容://IotHub_Device/sdk/iot_device.jsclass IotDevice extends EventEmitter {...handleCommand({commandName, requestID, encoding, payload, expiresAt}){if(expiresAt == null || expiresAt > Math.floor(Date.now() / 1000)){var data = payload;if(encoding == "base64"){data = Buffer.from(payload.toString(), "base64")}this.emit("command", commandName, data)}}...}
device.on("command", function(commandName, data){//处理指令})
指令回复
是否回复指令,以及什么时候回复指令是由设备的应用代码来决定的,DeviceSDK 没法强制约定,但是可以提供帮助函数来屏蔽掉回复指令所需要的的细节。
这里可以通过闭包的方式来达成这个目的:
在”command”事件里面我们额外传递出去一个闭包,包含了回复这个指令的具体代码,设备应用代码可以通过下面的方式来回复指令://IotHub_Device/sdk/iot_device.jsclass IotDevice extends EventEmitter {...handleCommand({commandName, requestID, encoding, payload, expiresAt}) {if (expiresAt == null || expiresAt < Math.floor(Date.now() / 1000)) {var data = payload;if (encoding == "base64") {data = Buffer.from(payload, "base64")}var respondCommand = function (respData) {var topic = `cmd_resp/${this.productName}/${this.deviceName}/${commandName}/${requestID}/${new ObjectId().toHexString()}`this.client.publish(topic, respData, {qos: 1})}this.emit("command", commandName, data, respondCommand)}}...}
这样 IotHub 内部的指令下发、回复的流程和细节对设备应用代码是完全透明的,符合我们对 DeviceSDK 的期望。device.on("command", function(commandName, data, respondCommand){//处理指令...respondCommand("ok") //处理完毕后回复,可以带任何格式的数据,字符串或者二进制数据})
在非 Node.js 的语言环境下,你也可以使用类似的编程技巧来完成,比如方法对象(Method Object)、匿名函数、内部类、Lambda、block 等。
总结一下,在 DeviceSDK 端,我们用 node-persist 来存储 RequestID,通过正则表达式对主题名进行模式匹配的方式提取出元数据,对指令的过期时间进行检查后,通过事件的方式将指令传递给设备应用代码,并使用闭包的方式提供接口供设备应用代码回复指令。
这一节我们基本完成了指令下发 DeviceSDK 端的功能代码,接下来我们开始实现 IotHub Server 端的功能。
