接下来的几节课,我们会来设计和实现 Maque IotHub 的下行数据处理功能

定义下行数据

在物联网应用中,下行数据一般有两种,第一种是需要同步的数据,比如平台需要把训练好的模型部署到前端的摄像头上,那平台下发给设备的消息里面就包含模型数据的信息;第二种是指令,平台下发给设备,要求设备完成某种操作,比如共享单车的服务端下发给单车开锁的指令。
在 Maque IotHub 里,我们会把这两种下行数据统称为指令,因为第一种数据也可以被当作是要求设备完成”同步数据”这个操作的指令。 在大多数情况下,设备在收到指令后都应该向业务系统回复指令执行的结果(注意,不是回复指令已收到,因为使用 QoS>1 的消息在 MQTT 协议层面就已经保证设备一定能收到指令),比如文件有没有下载完毕、继电器有没有打开等。 是否回复以及怎么回复应该由业务逻辑决定,这个是业务系统和设备之间的约定, IotHub 只负责将业务系统下发的指令发送到设备,同时将设备对指令的回复再传送回业务系统。 IotHub 在实现一些功能时,也会向设备发送一些内部的指令。
和上行数据一样,指令也由元数据和负载组成,指令的元数据也是放在主题名中的,一般来说指令有以下一些元数据:

  • ProductName、DeviceName:用于标识指令发给哪个设备。
  • MessageID:作为消息的唯一标识,用于消息去重,也用于标识指令,设备回复指令时会用到。
  • 指令名称:用于标识指令的名称,比如单车开锁的指令可以叫作 unlock。
  • 指令类别:用于标识指令的类别,在后面的章节实现一些特殊的指令时会用到。
  • 过期时间:有些指令具有时效性,设备不应该执行超过有效时间的指令,比如单车开锁的指令,假设单车的网络断开,一个小时又恢复了,那么单车会收到在断线期间用户发出的开锁指令,单车不应该执行这些指令,所以需要给开锁指令加一个过期时间,比如 1 分钟。

在负载中则包含了执行指令需要的额外数据,比如前面说的部署模型到前端摄头的情景,指令的负载就应该是模型的数据,比如下载模型的 url,指令名称可以叫做”update_model”。
我们可以这样来理解 IotHub 里面的指令,下发指令相当于远程的在设备上执行一个函数 f(x),那么在主题名里的指令名称就相当于函数名 f,指令的负载就相当于函数的参数 x,设备还会将 f(x) 的返回值回复给业务系统或者 IotHub(如果 f(x) 有返回值的话)。

发送下行数据

在 MaqueIotHub 里,下行数据分为两种:

  • 业务系统下发给设备的消息,比如需要同步的数据、需要执行的指令等,这些数据会经由 IotHub 发送给设备;
  • IotHub 的和设备之间发送的消息,通常是为了实现 IotHub 相关的功能的内部消息,我们会在后面了解到。

就像我们在第二部分:上行数据处理开头说的那样,在 MQTT 的架构里面没有”服务端”和”客户端”的概念,所谓的上行和下行只是我们在实现 IotHub 时抽象出来的一个概念。从 IotHub Server 发送到设备的数据就是下行数据,反之就是上行数据; 而对 MQTT Broker 来说,IotHub Server和设备一样,都是 MQTT Client。 接下来我们来看一下发送下行数据可能的一些方案。

完全基于 MQTT 的方案

这种方案的逻辑非常直接,IotHub Server 以 MQTT Client 的身份接入 EMQ X Broker,使用 Publish 将数据发布到设备订阅的主题上。

  1. 设备端订阅某特定的主题,比如/cmd/ProductNameA/DeviceNameA
  2. IotHub Server 启动一个 MQTT Client 接入 EMQ X Broker
  3. 业务系统通过 IotHub Server API 的接口告知 IotHub Server 要发送的数据和设备
  4. IotHub 的 MQTT Client 将数据 Publish 到/cmd/ProductNameA/DeviceNameA

这是一个可行的方案,只不过目前存在一个单点故障的问题,如果 IotHub Server 用于 Publish 的 MQTT Client 挂了怎么办?
我们可以更进一步,IotHub Server 可以同时启用多个用于 Publish 的 MQTT Client,这些 Client 可以从一个 Work Queue(比如 RabbitMQ、Redis 等)里获取要发布的消息,然后将其发布到对应的设备,在每次 IotHub Server 需要发送数据到设备时,只需要往这个 Queue 里投递一条消息就可以了。具体流程如下图所示:
18.选择下行数据处理方案 - 图1
现在这个方案就具有较好的扩展性,也不存在单点故障了。 实际上我在几个项目中使用过这种方案,效果还是不错的。
这种方案的唯一缺点就是引入了多个 MQTT Client 这样额外的实体,提高了系统的复杂性,稍微提高了开发、部署和运维监控的成本。

基于 EMQ X 的方案

EMQ X 的管理监控 API (REST API) 中提供了一个 API, 可以供外部的应用系统向某个主题发布消息:
API 定义POST api/v3/mqtt/publish
API请求参数:

  1. {
  2. "topic": "test_topic",
  3. "payload": "hello",
  4. "qos": 1,
  5. "retain": false,
  6. "client_id": "mqttjs_ab9069449e"
  7. }

这种方案不需要维护多个用于发布的 MQTT Client,在开发和部署上的复杂度要低一些,这本课程中,我们会使用此方案向设备发送下行数据,如下图所示:
18.选择下行数据处理方案 - 图2

你可以从上述两个方案中选择一个更切合项目实际情况的方案,本课程选择基于 EMQ X REST API 方案的原因是这种方案开发和部署更简单。

这一节我们确定了下行数据的方案,并且定义了 IotHub 指令的内容,下一节我们开始设计 IotHub 的下发指令功能