这一节我们来设计 IotHub 的下行数据处理功能。

功能设计

Maque IotHub 的指令下发系统有以下一些功能。

  1. 业务系统可以通过 IotHub Server API 提供的接口向指定的设备发送指令,指令可以包含任意格式的数据,比如字符串和二进制数据。
  2. 指令可设置过期时间,过期的指令将不会被执行。
  3. 业务系统可在设备离线时下发指令,设备在上线以后可以接收到离线时由业务系统下发的指令。
  4. 设备可以向业务系统回复指令的执行结果,IotHub 会把设备的回复通知到业务系统,通知包括:哪个设备回复了哪条指令,回复的内容是什么。

    DeviceSDK 端实现

    主题规划

    和上行数据处理一样, IotHub 会把指令的元数据放在主题名中,为了接收下发的指令,设备将订阅以下的主题:
    cmd/:ProductName/:DeviceName/:CommandName/:Encoding/:RequestID/:ExpiresAt
    这个主题的第一层级代表的是指令的类别,目前固定为 cmd,代表普通的下行指令,后面我们还会看到其他类型的指令。后面的各个层级代表一种指令的元数据,下面把这些元数据的意义都解释下:
  • ProductName、DeviceName:这两个元数据很好理解,代表接受指令的设备名称,设备用自己的 ProductName 和 DeviceName 进行订阅。
  • CommandName指令的名称,比如重启设备的名称可叫做 reboot。
  • Encoding指令数据的编码格式,由于 IotHub 提供给业务系统的接口是 HTTP 的,同时 IotHub Server 也是调用 EMQ X 的 RestAPI 发布指令,所以如果发布的指令携带的是二进制数据,就需要对这个二进制数据进行编码,让它变成一个字符串。当 DeviceSDK 接收到二进制的指令数据时,需要按照相应的编码方式解析出原始的二进制数据。Encoding 可以有两种值,plain 或者 base64,plain 代表未编码,指令数据为字符串的时候使用,base64 代表 Base64 编码,指令数据为二进制数据时使用。
  • RequestID指令的编号,有两层意义,第一,和上行数据的 MessageID 一样,用来做消息去重;第二,用于唯一标识一条指令,设备在回复指令的时候需要带上指令的 RequestID
  • ExpiresAt可选,指令的过期时间,格式为 Unix 时间戳。如果指定了 ExpiresAt,那么 DeviceSDK 在收到指令时会检查当前时间是否大于 ExpiresAt,如果是,就直接丢弃掉这条指令。

    如果说我们统一都对 Payload 进行 Base64 编码的话就可以省去 Encoding 这个层级,我们增加这个层级的目的是尽量减少设备端不必要的计算,如果发送的指令数据是 ASCII 字符串就不用再 Decode 了。你可以根据具体情况来决定是否需要这个层级。

如何订阅主题

一般来说,按照 MQTT 的方式,DeviceSDK 可以这样来订阅上面的主题:

  1. client.on('connect', function (connack) {
  2. if(connack.returnCode == 0) {
  3. if (connack.sessionPresent == false) {
  4. client.subscribe(`cmd/${this.productName}/${this.DeviceName}/+/+/+/#`, {
  5. qos: 1
  6. }, function (err, granted) {
  7. if (err != undefined) {
  8. console.log("subscribe failed")
  9. } else {
  10. console.log(`subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}`)
  11. }
  12. })
  13. }
  14. }else {
  15. console.log(`Connection failed: ${connack.returnCode}`)
  16. }
  17. })

cmd/${this.productName}/${this.DeviceName}/+/+/+/#正好可以匹配到接受指令用的主题,因为 ExpiresAt 这个层级是可选的,所以用放在最后的#号来匹配,我们以后在设计主题的时候,尽量把可选的层级放在最后面。

除了这种方式以外,我们还可以利用 EMQ X 的”服务端订阅”功能更高效和灵活地进行订阅。服务端订阅指的是,当 MQTT Client 连接到 EMQ X Broker 时候,EMQ X 会按照预先定义好的规则自动为 Client 订阅主题。用这种方式设备不需要再发送 Subscribe 了,增加和减少设备订阅的主题也不需要改动设备的代码了。
在后面实现的课程中,我们会看到如何使用 EMQ X 的”服务端订阅”功能。

消息去重

回想一下在处理上行数据的时候 IotHub 是怎么实现消息去重的:IotHub 会把已收到消息的 MessageID 存入 Redis 中,每次收到新消息的时候都会拿新消息的 MessageID 去和已收到的比较,如果找到有相同的,就丢弃收到的消息。同时,限制 MessageID 的存储,不会让存储空间无限增大。
同样,在 DeviceSDK 这端,我们也需要找到这样一个存储 RequestID 的缓存来帮助我们进行消息去重,这个缓存最好有以下的特性:

  • 是 key-value 存储;
  • 可以设置 key 的有效期;
  • 可持久化,设备掉电也不会丢失已存储的 RequestID。

由于本课程使用的是 Node.js 来实现 DeviceSDK,我们会用一个 Node.js 版本并满足上述要求的缓存实现。如果你是在其他语言或者平台上进行设备端的开发,你需要找到或者自己实现一个类似的缓存。

回复指令

当设备需要回复指令时,它需要向一个特定的主题发布一个消息。同样地,我们把这个回复的元数据放在主题名中,回复的数据放在消息的负载中:
cmd_resp/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID

  • ProductName、DeviceName:标识回复来自于哪个设备。
  • CommandName:所回复的指令名称。
  • RequestID:所回复的指令的 RequestID。
  • MessageID:因为回复本身也是一条上行数据,也需要 MessageID 来唯一标识自己,以实现消息去重。

    IotHub Server 端实现

    IotHub Server 有个功能,提供接口让业务系统调用来向指定的设备发送指令;将设备对指令的回复转发回业务系统。

    IotHub Server API

    IotHub Server API 提供一个 HTTP 接口,业务系统通过该接口向设备下发指令。业务系统需要提供设备的 ProductName 和 DeviceName,指令名称和指令数据。这里要主要的是,因为 IotHub Server 提供的是基于 HTTP 的 REST API,所以在下发二进制指令数据时,业务系统需要预先将二进制数据进行 Base64 编码,并且提供指令数据的编码格式 Base64 或 Plain(未编码,用于传输字符串类型的指令数据)

    我的建议是尽量使用字符串类型的指令数据,简单、好调试,如果要下发较大的二进制文件,可以用另外的途径,在 OTA 升级这一节我们再作具体介绍。

指令回复通知业务系统

和处理上行数据一样, IotHub 使用 RabbitMQ 对业务系统进行通知,当 IotHub 收到设备对指令的回复时,会向名为 “iothub.events.cmd_resp” 的 Exchange 发布一条消息,包含 DeviceName、指令名、指令的 RequestID 及回复数据等内容。 Exchange 的类型为 Direct,RoutingKey 为设备的 ProductName。

这一节设计了 IotHub 的下发指令功能,接下来我们开始来实现这些功能。