这一节我们开始实现 IotHub 指令下发的 DeviceSDK 端的功能。首先进行消息去重, 接着使用正则表达式提取出元数据, 然后通过事件的方式将指令的数据传递给设备应用代码, 最后提供一个接口供设备对指令进行回复。
消息去重
在本课程里面我们会使用 node-persist 来存储已收到指令的 RequestID。
首先初始化存储:
//IotHub_Device/sdk/iot_device.js
const storage = require('node-persist');
class IotDevice extends EventEmitter {
constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
...
storage.init({dir: `${storePath}/message_cache`})
...
}
然后实现检查指令是否重复的函数:
//IotHub_Device/sdk/iot_device.js
class IotDevice extends EventEmitter {
...
checkRequestDuplication(requestID, callback) {
var key = `requests/${requestID}`
storage.getItem(key, function (err, value) {
if (value == null) {
storage.setItem(key, 1, {ttl: 1000 * 3600 * 6})
callback(false)
} else {
callback(false)
}
})
}
...
}
当 DeviceSDK 收到消息的时候,使用正则表达式匹配消息主题,并去重:
//IotHub_Device/sdk/iot_device.js
class IotDevice extends EventEmitter {
connect() {
...
this.client.on("message", function (topic, message) {
self.dispatchMessage(topic, message)
})
}
dispatchMessage(topic, payload){
var cmdTopicRule = "cmd/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
var result
if((result = pathToRegexp(cmdTopicRule).exec(topic)) != null){
this.checkRequestDuplication(result[6], function (isDup) {
if (!isDup) {
self.handleCommand({
commandName: result[3],
encoding: result[4],
requestID: result[5],
expiresAt: result[6] != null ? parseInt(result[6]) : null,
payload: payload
})
}
})
}
}
...
}
因为 expiredAt 层级是可选的,所以用
:expiresAt?
来表示。
处理指令
DeviceSDK 对指令的处理流程如下:
- 检查指令是否过期;
- 根据 Encoding 对指令数据进行解码;
- 通过 Emit Event 的方式将指令传递给设备应用代码。
设备应用代码可以通过下面的方式来获取指令的内容://IotHub_Device/sdk/iot_device.js
class IotDevice extends EventEmitter {
...
handleCommand({commandName, requestID, encoding, payload, expiresAt}){
if(expiresAt == null || expiresAt > Math.floor(Date.now() / 1000)){
var data = payload;
if(encoding == "base64"){
data = Buffer.from(payload.toString(), "base64")
}
this.emit("command", commandName, data)
}
}
...
}
device.on("command", function(commandName, data){
//处理指令
})
指令回复
是否回复指令,以及什么时候回复指令是由设备的应用代码来决定的,DeviceSDK 没法强制约定,但是可以提供帮助函数来屏蔽掉回复指令所需要的的细节。
这里可以通过闭包的方式来达成这个目的:
在”command”事件里面我们额外传递出去一个闭包,包含了回复这个指令的具体代码,设备应用代码可以通过下面的方式来回复指令://IotHub_Device/sdk/iot_device.js
class IotDevice extends EventEmitter {
...
handleCommand({commandName, requestID, encoding, payload, expiresAt}) {
if (expiresAt == null || expiresAt < Math.floor(Date.now() / 1000)) {
var data = payload;
if (encoding == "base64") {
data = Buffer.from(payload, "base64")
}
var respondCommand = function (respData) {
var topic = `cmd_resp/${this.productName}/${this.deviceName}/${commandName}/${requestID}/${new ObjectId().toHexString()}`
this.client.publish(topic, respData, {
qos: 1
})
}
this.emit("command", commandName, data, respondCommand)
}
}
...
}
这样 IotHub 内部的指令下发、回复的流程和细节对设备应用代码是完全透明的,符合我们对 DeviceSDK 的期望。device.on("command", function(commandName, data, respondCommand){
//处理指令
...
respondCommand("ok") //处理完毕后回复,可以带任何格式的数据,字符串或者二进制数据
})
在非 Node.js 的语言环境下,你也可以使用类似的编程技巧来完成,比如方法对象(Method Object)、匿名函数、内部类、Lambda、block 等。
总结一下,在 DeviceSDK 端,我们用 node-persist 来存储 RequestID,通过正则表达式对主题名进行模式匹配的方式提取出元数据,对指令的过期时间进行检查后,通过事件的方式将指令传递给设备应用代码,并使用闭包的方式提供接口供设备应用代码回复指令。
这一节我们基本完成了指令下发 DeviceSDK 端的功能代码,接下来我们开始实现 IotHub Server 端的功能。