这一节我们来实现 RPC 式调用的设备端代码,首先会使用 EMQ X 的服务端订阅功能自动订阅对应的主题,然后在 DeviceSDK 中修改用于匹配主题名的正则表达式,从 RPC 式调用的主题中提取指令的元数据,最后将服务端和设备端连在一起进行测试。

添加ACL列表

由于设备端需要将回复发布到:
rpc_resp/:productName/:deviceName/:commandName/:requestId/:messageId,所以需要把这个新的主题加入到设备的 ACL 列表里:

  1. //IotHub_Server/models/devices
  2. deviceSchema.methods.getACLRule = function () {
  3. const publish = [
  4. `upload_data/${this.product_name}/${this.device_name}/+/+`,
  5. `update_status/${this.product_name}/${this.device_name}/+`,
  6. `cmd_resp/${this.product_name}/${this.device_name}/+/+/+`,
  7. `rpc_resp/${this.product_name}/${this.device_name}/+/+/+`,
  8. ]
  9. ...
  10. }

你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。

新增订阅主题

IotHub 会将 RPC 式的指令 Publish 到:
rpc/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt,所以我们需要在 EMQ X 的服务器订阅列表里面添加这个主题:

  1. ## <EMQ X 安装目录>/etc/emqx.conf
  2. module.subscription.1.topic = cmd/%u/+/+/+/#
  3. module.subscription.1.qos = 1
  4. module.subscription.2.topic = rpc/%u/+/+/+/#
  5. module.subscription.2.qos = 1

然后重启 EMQ X Broker:<EMQ X 安装目录>/bin/emqx restart

注意这里不能用”+/%u/+/+/+/#”来代替”rpc/%u/+/+/+/#”和”cmd/%u/+/+/+/#”,因为这样设备会订阅到其他不应该订阅到的主题。

DeviceSDK

DeviceSDK 的实现非常简单,只需要保证可以匹配到相应的RPC指令的主题名,并将回复发布到正确的主题上就可以了。

//IotHub_Device/sdk/iot_device.js
dispatchMessage(topic, payload) {
        var cmdTopicRule = "(cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?"
        var result
        if ((result = pathToRegexp(cmdTopicRule).exec(topic)) != null) {
            this.checkRequestDuplication(result[6], function (isDup) {
                if (!isDup) {
                    self.handleCommand({
                        commandName: result[4],
                        encoding: result[5],
                        requestID: result[6],
                        expiresAt: result[7] != null ? parseInt(result[7]) : null,
                        payload: payload,
                        commandType: result[1]
                    })
                }
            })
        }
    }

这里指令回复的主题的规则变成: (cmd|rpc)/:productName/:deviceName/:commandName/:encoding/:requestID/:expiresAt?",多了一个变量(第一个层级,指令类型),所以之前变量在 result 数组中的 index 要依次 +1。

然后在指令处理的代码中,将 RPC 式指令回复到相应的主题上:

//IotHub_Device/sdk/iot_device.js
handleCommand({commandName, requestID, encoding, payload, expiresAt, commandType = "cmd"}) {
        if (expiresAt == null || expiresAt > Math.floor(Date.now() / 1000)) {
            var data = payload;
            if (encoding == "base64") {
                data = Buffer.from(payload.toString(), "base64")
            }
            var self = this
            var respondCommand = function (respData) {
                var topic = `${commandType}_resp/${self.productName}/${self.deviceName}/${commandName}/${requestID}/${new ObjectId().toHexString()}`
                self.client.publish(topic, respData, {
                    qos: 1
                })
            }
            this.emit("command", commandName, data, respondCommand)
        }
    }

对设备应用代码来说,它并不知道指令是否是 RPC式调用,不管是RPC式调用,还是普通的指令下发,设备应用代码的处理都是一样的,执行指令,然后回复结果,这是我们想要的效果。
设备端的实现就完成了,接下来我们写一些代码来验证这个功能。

代码联调

这里我们仍然用之前的 ping/pong 的例子来演示,不过这次我们实现的是一个 RPC 式调用:

//IotHub_Server/samples/rpc_ping.js
require('dotenv').config({path: "../.env"})
const request = require("request")
const buf = Buffer.alloc(4);
buf.writeUInt32BE(Math.floor(Date.now())/1000, 0);
var formData = {
    command: "ping",
    data: buf.toString("base64"),
    encoding: "base64",
    use_rpc: true
}
request.post(`http://127.0.0.1:3000/devices/${process.env.TARGET_PRODUCT_NAME}/${process.env.TARGET_DEVICE_NAME}/command`, {
    form: formData
}, function (error, response, body) {
    if (error) {
        console.log(error)
    } else {
        console.log('statusCode:', response && response.statusCode);
        var result = JSON.parse(body)
        if(result.error != null){
            console.log(result.error)
        }else{
            console.log('response:', Buffer.from(result.response, "base64").readUInt32BE(0));
        }
    }
})

首先运行 IotHub_Device/samples/pong.js,然后再运行 IotHub_Server/samples/rpc_ping.js,会得到以下输出:

statusCode: 200
response: 1559532366

说明调用 RPC 接口已经正确获得了设备对指令的回复。
然后关闭 IotHub_Device/samples/pong.js,再运行IotHub_Server/samples/rpc_ping.js,大概 5 秒后,会得到以下输出:

statusCode: 200
device timeout

说明 RPC 式调用可以正确处理设备执行指令超时的情况。
那么我们就完成了 IotHub 的 RPC 式调用功能。大家可以看到,使用 RPC 式调用,业务系统的代码会更少,逻辑更简单。不过 RPC 式调用的缺点是,它不能用于执行时间比较长的指令。
RPC 式调用和我们之前实现的指令下发流程相比,在本质上不存在谁比谁更好、更优越,就好像一个功能的同步接口和异步接口一样,需要按照你的实际情况去使用就可以了。

到这节为止,我们完成了 IotHub 的 RPC 式调用功能。接下来,我们实现设备主动数据请求功能。