从这一部分开始,我们设计和实现一些 MaqueIotHub 更高维度抽象的功能,这一节我们来设计和实现 IotHub 的 RPC 式调用的服务端功能。

什么是 RPC 式调用?

在第三部分的课程里,我们实现了 IotHub 的指令下发功能。我们曾经把指令下发比作是一次函数调用 y = f(x),在目前的实现里,f(x) 的返回结果 y 是通过异步的方式告知调用者(业务系统)的,即业务系统调用下发指令接口,获得的是一个 RequestID,然后设备对指令进行回复以后,业务系统再从队列中,使用 RequestID 去获取对应的指令执行结果。
RPC 式调用是指当业务系统调用 IotHub 的发送指令接口后,IotHub 会把设备对指令的回复内容直接返回给业务系统,而不是再通过异步的方式(RabbitMQ)通知业务系统,程序执行的流程如下图所示。
23.RPC样式调用(一) - 图1

  1. 业务系统对 IotHub Server API 的下发指令接口发起 HTTP Post 请求。
  2. IotHub Server 调用 EMQ X 的 Pubish API。
  3. EMQ X 将指令 Publish 到设备。
  4. 设备执行完指令,将指令执行结果 Publish 到 EMQ X Broker。
  5. EMQ X Broker 将指令执行结果发送到 IotHub Server。
  6. IotHub Server API 将指令结果放入 HTTP Response Boby 中,完成对 HTTP Post 的响应。

这样业务系统的一次 HTTP 请求就完成了,检查 Response Body 就可以获取指令的执行结果了。
在 RPC 式调用里面,如果设备在一定时间内没有对指令进行回复,比如 5 秒,那么 IotHub Server API 不会一直等待下去,而是在 HTTP Response Boby 中放入错误信息(比如设备无响应)并返回给业务系统,所以指令一定是有有效期的,比如 5 秒。
通过这样的流程,一次 RPC 式的调用就完成了。我们可以用这样的操作来执行一些简单的、时效性要求又比较高的指令。

功能设计

我们会使用一个特定的主题来发布 RPC 式调用的指令,接着我们会使用 Redis 来实现对指令回复的等待功能,这个等待是带超时的。最后修改指令下发接口,使业务系统可以指定是否使用 RPC 式调用的方式来下发指令。

主题规划

这里我们使用下面的主题来发送 RPC 式调用的指令:
rpc/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
可以看到,这个主题和之前用于下发指令的主题相比,除了第一个层级从”cmd”变成了”rpc”之外,其他层级都是一模一样的。因为 RPC 式调用其实也是一种下发指令的操作
所以我们可以把下发指令的主题统一定义为:
:CommandType/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
CommandType 目前有两个值,”cmd”和”rpc”。
设备会把对 RPC 指令的回复发布到主题:
rpc_resp/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID
同样地,这个和之前回复指令的主题相比,除了第一个层级从”cmd_resp”变成了”rpc_resp”以外,其他层级都是一模一样的。
所以我们可以把指令回复的主题统一定义为:RespType/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID
RespType 目前有两个值:”cmd_resp”和”rpc_resp”。

等待指令回复

最后使用 Redis 来帮助 IotHub Server 等待指令的回复,步骤如下。

  1. Server API 的代码调用了 EMQ X 的 Publish 功能之后,调用 Redis 的 GET 指令来获取 Redis 中的 key: “cmd_resp/:RequestID”的value,如果 value 不为空,则将 value 作为指令的回复,返回给业务系统;如果 value 为空,则需要等待一小段时间,比如 10 毫秒以后,重复上述操作。
  2. IotHub Server 在收到设备对 RPC 指令的回复以后,调用 Redis 的 SET 指令将回复的 payload 保存到 Redis 的 key 中:”cmd_resp/:RequestID”。
  3. 如果 Server API 在指定的时间内仍然无法获取到 key:”cmd_resp/:RequestID”的 value 的话,返回”错误”给业务系统。

我们可以把检查 key:”cmd_resp/:RequestID”的过程封装起来:

  1. //IotHub_Server/services/utils_service.js
  2. const redisClient = require("../models/redis")
  3. class UtilsService {
  4. static waitKey(key, ttl, callback) {
  5. var end = Date.now() + ttl * 1000
  6. function checkKey() {
  7. if (Date.now() < end) {
  8. redisClient.get(key, function (err, val) {
  9. if (val != null) {
  10. callback(val)
  11. } else {
  12. setTimeout(checkKey, 10)
  13. }
  14. })
  15. } else {
  16. callback(null)
  17. }
  18. }
  19. checkKey()
  20. }
  21. }
  22. module.exports = UtilsService

waitKey 方法接收 ttl 参数作为等待超时时间,单位为秒。每隔 10 毫秒检查一次。
然后修改一下 Device 类的 sendCommand 方法,使它可以发送 RPC 式的指令:

  1. //IotHub_Server/model/device.js
  2. deviceSchema.methods.sendCommand = function ({commandName, data, encoding, ttl = undefined, commandType="cmd"}) {
  3. var requestId = new ObjectId().toHexString()
  4. var topic = `${commandType}/${this.product_name}/${this.device_name}/${commandName}/${encoding}/${requestId}`
  5. if (ttl != null) {
  6. topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
  7. }
  8. emqxService.publishTo({topic: topic, payload: data})
  9. return requestId
  10. }

接下来实现 RPC 式调用的 Server API 接口,我们在原有的下发指令接口上添加一个参数,来表明是否使用 RPC 式调用,如果使用RPC式调用,那么最多等待设备 5 秒,同时将指令的有效期也设为 5 秒:

  1. //IotHub_Server/routes/devices.js
  2. router.post("/:productName/:deviceName/command", function (req, res) {
  3. var productName = req.params.productName
  4. var deviceName = req.params.deviceName
  5. var useRpc = (req.body.use_rpc == "true")
  6. Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {
  7. if (err) {
  8. res.send(err)
  9. } else if (device != null) {
  10. var ttl = req.body.ttl != null ? parseInt(req.body.ttl) : null
  11. if(useRpc){
  12. ttl = 5
  13. }
  14. var requestId = device.sendCommand({
  15. commandName: req.body.command,
  16. data: req.body.data,
  17. encoding: req.body.encoding || "plain",
  18. ttl: ttl,
  19. commandType: useRpc ? "rpc" : "cmd"
  20. })
  21. if (useRpc) {
  22. UtilsService.waitKey(`cmd_resp/${requestId}`, ttl, function (val) {
  23. if(val == null){
  24. res.status(200).json({error: "device timeout"})
  25. }else{
  26. res.status(200).json({response: val.toString("base64")})
  27. }
  28. })
  29. } else {
  30. res.status(200).json({request_id: requestId})
  31. }
  32. } else {
  33. res.status(404).send("device not found")
  34. }
  35. })
  36. })

由于 IotHub 允许设备回复二进制数据,所以这里把设备的回复进行 Base64 编码以后再返回给调用者。

最后在 Hook 里面处理 RPC 式的指令回复,如果是 RPC 式的调用,那么将 payload 放入 Redis 对应的 key 中:

  1. //IotHub_Server/service/message_service
  2. static dispatchMessage({topic, payload, ts} = {}) {
  3. ...
  4. var cmdRespRule = "(cmd_resp|rpc_resp)/:productName/:deviceName/:commandName/:requestId/:messageId"
  5. const cmdRespRegx = pathToRegexp(cmdRespRule)
  6. var result = null;
  7. ...
  8. else if ((result = cmdRespRegx.exec(topic)) != null) {
  9. this.checkMessageDuplication(result[6], function (isDup) {
  10. if (!isDup) {
  11. var payloadBuffer = new Buffer(payload, 'base64');
  12. if (result[1] == "rpc_resp") {
  13. var key = `cmd_resp/${result[5]}`;
  14. redisClient.set(key, payloadBuffer)
  15. redisClient.expire(key, 5)
  16. } else {
  17. MessageService.handleCommandResp({
  18. productName: result[2],
  19. deviceName: result[3],
  20. ts: ts,
  21. command: result[4],
  22. requestId: result[5],
  23. payload: payloadBuffer
  24. })
  25. }
  26. }
  27. })
  28. }
  29. }

这里指令回复的主题的规则变成了: (cmd_resp|rpc_resp)/:productName/:deviceName/:commandName/:requestId/:messageId,多了一个变量(第一个层级,指令类型),所以之前变量在 result 数组中的 index 要依次 +1。

这一节我们定义了 RPC 式调用,并完成了 PRC 式调用的服务端功能,下一节我们将完成 RPC 式调用的设备端实现。