从这一部分开始,我们设计和实现一些 MaqueIotHub 更高维度抽象的功能,这一节我们来设计和实现 IotHub 的 RPC 式调用的服务端功能。
什么是 RPC 式调用?
在第三部分的课程里,我们实现了 IotHub 的指令下发功能。我们曾经把指令下发比作是一次函数调用 y = f(x),在目前的实现里,f(x) 的返回结果 y 是通过异步的方式告知调用者(业务系统)的,即业务系统调用下发指令接口,获得的是一个 RequestID,然后设备对指令进行回复以后,业务系统再从队列中,使用 RequestID 去获取对应的指令执行结果。
RPC 式调用是指当业务系统调用 IotHub 的发送指令接口后,IotHub 会把设备对指令的回复内容直接返回给业务系统,而不是再通过异步的方式(RabbitMQ)通知业务系统,程序执行的流程如下图所示。
- 业务系统对 IotHub Server API 的下发指令接口发起 HTTP Post 请求。
- IotHub Server 调用 EMQ X 的 Pubish API。
- EMQ X 将指令 Publish 到设备。
- 设备执行完指令,将指令执行结果 Publish 到 EMQ X Broker。
- EMQ X Broker 将指令执行结果发送到 IotHub Server。
- IotHub Server API 将指令结果放入 HTTP Response Boby 中,完成对 HTTP Post 的响应。
这样业务系统的一次 HTTP 请求就完成了,检查 Response Body 就可以获取指令的执行结果了。
在 RPC 式调用里面,如果设备在一定时间内没有对指令进行回复,比如 5 秒,那么 IotHub Server API 不会一直等待下去,而是在 HTTP Response Boby 中放入错误信息(比如设备无响应)并返回给业务系统,所以指令一定是有有效期的,比如 5 秒。
通过这样的流程,一次 RPC 式的调用就完成了。我们可以用这样的操作来执行一些简单的、时效性要求又比较高的指令。
功能设计
我们会使用一个特定的主题来发布 RPC 式调用的指令,接着我们会使用 Redis 来实现对指令回复的等待功能,这个等待是带超时的。最后修改指令下发接口,使业务系统可以指定是否使用 RPC 式调用的方式来下发指令。
主题规划
这里我们使用下面的主题来发送 RPC 式调用的指令:rpc/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
。
可以看到,这个主题和之前用于下发指令的主题相比,除了第一个层级从”cmd”变成了”rpc”之外,其他层级都是一模一样的。因为 RPC 式调用其实也是一种下发指令的操作。
所以我们可以把下发指令的主题统一定义为::CommandType/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
。
CommandType 目前有两个值,”cmd”和”rpc”。
设备会把对 RPC 指令的回复发布到主题:rpc_resp/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID
。
同样地,这个和之前回复指令的主题相比,除了第一个层级从”cmd_resp”变成了”rpc_resp”以外,其他层级都是一模一样的。
所以我们可以把指令回复的主题统一定义为:RespType/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID
。
RespType 目前有两个值:”cmd_resp”和”rpc_resp”。
等待指令回复
最后使用 Redis 来帮助 IotHub Server 等待指令的回复,步骤如下。
- Server API 的代码调用了 EMQ X 的 Publish 功能之后,调用 Redis 的 GET 指令来获取 Redis 中的 key: “cmd_resp/:RequestID”的value,如果 value 不为空,则将 value 作为指令的回复,返回给业务系统;如果 value 为空,则需要等待一小段时间,比如 10 毫秒以后,重复上述操作。
- IotHub Server 在收到设备对 RPC 指令的回复以后,调用 Redis 的 SET 指令将回复的 payload 保存到 Redis 的 key 中:”cmd_resp/:RequestID”。
- 如果 Server API 在指定的时间内仍然无法获取到 key:”cmd_resp/:RequestID”的 value 的话,返回”错误”给业务系统。
我们可以把检查 key:”cmd_resp/:RequestID”的过程封装起来:
//IotHub_Server/services/utils_service.js
const redisClient = require("../models/redis")
class UtilsService {
static waitKey(key, ttl, callback) {
var end = Date.now() + ttl * 1000
function checkKey() {
if (Date.now() < end) {
redisClient.get(key, function (err, val) {
if (val != null) {
callback(val)
} else {
setTimeout(checkKey, 10)
}
})
} else {
callback(null)
}
}
checkKey()
}
}
module.exports = UtilsService
waitKey 方法接收 ttl 参数作为等待超时时间,单位为秒。每隔 10 毫秒检查一次。
然后修改一下 Device 类的 sendCommand 方法,使它可以发送 RPC 式的指令:
//IotHub_Server/model/device.js
deviceSchema.methods.sendCommand = function ({commandName, data, encoding, ttl = undefined, commandType="cmd"}) {
var requestId = new ObjectId().toHexString()
var topic = `${commandType}/${this.product_name}/${this.device_name}/${commandName}/${encoding}/${requestId}`
if (ttl != null) {
topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
}
emqxService.publishTo({topic: topic, payload: data})
return requestId
}
接下来实现 RPC 式调用的 Server API 接口,我们在原有的下发指令接口上添加一个参数,来表明是否使用 RPC 式调用,如果使用RPC式调用,那么最多等待设备 5 秒,同时将指令的有效期也设为 5 秒:
//IotHub_Server/routes/devices.js
router.post("/:productName/:deviceName/command", function (req, res) {
var productName = req.params.productName
var deviceName = req.params.deviceName
var useRpc = (req.body.use_rpc == "true")
Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {
if (err) {
res.send(err)
} else if (device != null) {
var ttl = req.body.ttl != null ? parseInt(req.body.ttl) : null
if(useRpc){
ttl = 5
}
var requestId = device.sendCommand({
commandName: req.body.command,
data: req.body.data,
encoding: req.body.encoding || "plain",
ttl: ttl,
commandType: useRpc ? "rpc" : "cmd"
})
if (useRpc) {
UtilsService.waitKey(`cmd_resp/${requestId}`, ttl, function (val) {
if(val == null){
res.status(200).json({error: "device timeout"})
}else{
res.status(200).json({response: val.toString("base64")})
}
})
} else {
res.status(200).json({request_id: requestId})
}
} else {
res.status(404).send("device not found")
}
})
})
由于 IotHub 允许设备回复二进制数据,所以这里把设备的回复进行 Base64 编码以后再返回给调用者。
最后在 Hook 里面处理 RPC 式的指令回复,如果是 RPC 式的调用,那么将 payload 放入 Redis 对应的 key 中:
//IotHub_Server/service/message_service
static dispatchMessage({topic, payload, ts} = {}) {
...
var cmdRespRule = "(cmd_resp|rpc_resp)/:productName/:deviceName/:commandName/:requestId/:messageId"
const cmdRespRegx = pathToRegexp(cmdRespRule)
var result = null;
...
else if ((result = cmdRespRegx.exec(topic)) != null) {
this.checkMessageDuplication(result[6], function (isDup) {
if (!isDup) {
var payloadBuffer = new Buffer(payload, 'base64');
if (result[1] == "rpc_resp") {
var key = `cmd_resp/${result[5]}`;
redisClient.set(key, payloadBuffer)
redisClient.expire(key, 5)
} else {
MessageService.handleCommandResp({
productName: result[2],
deviceName: result[3],
ts: ts,
command: result[4],
requestId: result[5],
payload: payloadBuffer
})
}
}
})
}
}
这里指令回复的主题的规则变成了:
(cmd_resp|rpc_resp)/:productName/:deviceName/:commandName/:requestId/:messageId
,多了一个变量(第一个层级,指令类型),所以之前变量在 result 数组中的 index 要依次 +1。
这一节我们定义了 RPC 式调用,并完成了 PRC 式调用的服务端功能,下一节我们将完成 RPC 式调用的设备端实现。