这一节我们来实现 RPC 式调用的设备端代码,首先会使用 EMQ X 的服务端订阅功能自动订阅对应的主题,然后在 DeviceSDK 中修改用于匹配主题名的正则表达式,从 RPC 式调用的主题中提取指令的元数据,最后将服务端和设备端连在一起进行测试。
添加ACL列表
由于设备端需要将回复发布到:rpc_resp/:productName/:deviceName/:commandName/:requestId/:messageId
,所以需要把这个新的主题加入到设备的 ACL 列表里:
//IotHub_Server/models/devices
deviceSchema.methods.getACLRule = function () {
const publish = [
`upload_data/${this.product_name}/${this.device_name}/+/+`,
`update_status/${this.product_name}/${this.device_name}/+`,
`cmd_resp/${this.product_name}/${this.device_name}/+/+/+`,
`rpc_resp/${this.product_name}/${this.device_name}/+/+/+`,
]
...
}
你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。
新增订阅主题
IotHub 会将 RPC 式的指令 Publish 到:rpc/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
,所以我们需要在 EMQ X 的服务器订阅列表里面添加这个主题:
## <EMQ X 安装目录>/etc/emqx.conf
module.subscription.1.topic = cmd/%u/+/+/+/#
module.subscription.1.qos = 1
module.subscription.2.topic = rpc/%u/+/+/+/#
module.subscription.2.qos = 1
然后重启 EMQ X Broker:<EMQ X 安装目录>/bin/emqx restart
注意这里不能用”+/%u/+/+/+/#”来代替”rpc/%u/+/+/+/#”和”cmd/%u/+/+/+/#”,因为这样设备会订阅到其他不应该订阅到的主题。
DeviceSDK
DeviceSDK 的实现非常简单,只需要保证可以匹配到相应的RPC指令的主题名,并将回复发布到正确的主题上就可以了。
//IotHub_Device/sdk/iot_device.js
dispatchMessage(topic, payload) {
var cmdTopicRule = "(cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
var result
if ((result = pathToRegexp(cmdTopicRule).exec(topic)) != null) {
this.checkRequestDuplication(result[6], function (isDup) {
if (!isDup) {
self.handleCommand({
commandName: result[4],
encoding: result[5],
requestID: result[6],
expiresAt: result[7] != null ? parseInt(result[7]) : null,
payload: payload,
commandType: result[1]
})
}
})
}
}
这里指令回复的主题的规则变成:
(cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
,多了一个变量(第一个层级,指令类型),所以之前变量在 result 数组中的 index 要依次 +1。
然后在指令处理的代码中,将 RPC 式指令回复到相应的主题上:
//IotHub_Device/sdk/iot_device.js
handleCommand({commandName, requestID, encoding, payload, expiresAt, commandType = "cmd"}) {
if (expiresAt == null || expiresAt > Math.floor(Date.now() / 1000)) {
var data = payload;
if (encoding == "base64") {
data = Buffer.from(payload.toString(), "base64")
}
var self = this
var respondCommand = function (respData) {
var topic = `${commandType}_resp/${self.productName}/${self.deviceName}/${commandName}/${requestID}/${new ObjectId().toHexString()}`
self.client.publish(topic, respData, {
qos: 1
})
}
this.emit("command", commandName, data, respondCommand)
}
}
对设备应用代码来说,它并不知道指令是否是 RPC式调用,不管是RPC式调用,还是普通的指令下发,设备应用代码的处理都是一样的,执行指令,然后回复结果,这是我们想要的效果。
设备端的实现就完成了,接下来我们写一些代码来验证这个功能。
代码联调
这里我们仍然用之前的 ping/pong 的例子来演示,不过这次我们实现的是一个 RPC 式调用:
//IotHub_Server/samples/rpc_ping.js
require('dotenv').config({path: "../.env"})
const request = require("request")
const buf = Buffer.alloc(4);
buf.writeUInt32BE(Math.floor(Date.now())/1000, 0);
var formData = {
command: "ping",
data: buf.toString("base64"),
encoding: "base64",
use_rpc: true
}
request.post(`http://127.0.0.1:3000/devices/${process.env.TARGET_PRODUCT_NAME}/${process.env.TARGET_DEVICE_NAME}/command`, {
form: formData
}, function (error, response, body) {
if (error) {
console.log(error)
} else {
console.log('statusCode:', response && response.statusCode);
var result = JSON.parse(body)
if(result.error != null){
console.log(result.error)
}else{
console.log('response:', Buffer.from(result.response, "base64").readUInt32BE(0));
}
}
})
首先运行 IotHub_Device/samples/pong.js
,然后再运行 IotHub_Server/samples/rpc_ping.js
,会得到以下输出:
statusCode: 200
response: 1559532366
说明调用 RPC 接口已经正确获得了设备对指令的回复。
然后关闭 IotHub_Device/samples/pong.js
,再运行IotHub_Server/samples/rpc_ping.js
,大概 5 秒后,会得到以下输出:
statusCode: 200
device timeout
说明 RPC 式调用可以正确处理设备执行指令超时的情况。
那么我们就完成了 IotHub 的 RPC 式调用功能。大家可以看到,使用 RPC 式调用,业务系统的代码会更少,逻辑更简单。不过 RPC 式调用的缺点是,它不能用于执行时间比较长的指令。
RPC 式调用和我们之前实现的指令下发流程相比,在本质上不存在谁比谁更好、更优越,就好像一个功能的同步接口和异步接口一样,需要按照你的实际情况去使用就可以了。
到这节为止,我们完成了 IotHub 的 RPC 式调用功能。接下来,我们实现设备主动数据请求功能。