接下来的的几节课,我们会来设计和实现 Maque IotHub 的上行数据处理和存储的功能

接收上行数据

在《MQTT 协议快速入门》的讨论群里,问的一个比较多的一个问题就是:”服务端怎么接收客户端发送的数据呢?”

这里我想先做一个说明,在 MQTT 协议的架构里面,是没有”服务端”和”客户端”的概念的,只有”Broker”和”Client”,所以 EMQ X 说自己是一个 MQTT Broker,而不是 MQTT 的 Server。而服务端和客户端,是我们在 MQTT 协议的基础上构建的 C/S 结构的平台或者业务系统里面的概念,所以我们需要做一些抽象,让这两组不太相干的逻辑实体能匹配起来。

在我们的架构里,IotHub Server 就是服务端,设备就是客户端,IotHub Server 有一个最基础的功能就是接收设备的数据并存储,那么怎么来实现呢?我们来看一下可能的几种方案。

Pure MQTT 的方案

这个方案使用 MQTT 协议框架内的实体来实现设备上行数据的接收功能。

像前面说的一样,MQTT 协议架构里没有”服务端”和”客户端”,那么 IotHub Server 需要接收设备端的数据,它需要和设备一样,以 MQTT Client 的身份接入 EMQ X Broker,订阅相关的主题来获取数据:

  1. 设备端发布消息到特定主题,例如 “data/client/:DeviceName”;
  2. IotHub Server 启动一个 MQTT Client,接入 EMQ X Broker,并订阅主题 “data/client/+”;
  3. IotHub Server 的 MQTT Client 接收到消息后,将消息存入数据库。

这是一个可行的方案,但是存在单点故障的问题。MQTT Client 挂了怎么办?当数据量很大时,这个 Client 是否能够处理得过来,这会不会成为系统的瓶颈?

我们可以往前走一步,将设备分片,设备在订阅的时候先随机生成一个 1 ~ 20 的随机整数 SliceID(假设这里我们要分为 20 片),然后设备将数据发布到 data/client/:SliceId/:DeviceName。在 IotHub Server 端,可以启动最多 20 个 MQTT Client,分别订阅 data/client/1/+ 到 data/client/20/+ 这20个主题。通过这样的方式,我们将数据流从一个 Client,分散到了最多 20 个 Client,减少了这块成为系统瓶颈的可能性。但是这个解决方案仍然有一个问题,当 IotHub Server 端的某一个 MQTT Client 挂了以后,有一部分的设备数据上传会受到影响,直到这个 MQTT Client 恢复为止。

我们还可以更进一步。EMQ X Broker支持一个功能,称为共享订阅,可以在这里找到关于共享订阅的文档。共享订阅的实现很简单,只需要订阅具有特殊前缀的的题即可,目前共享订阅支持2种前缀"$queue/""$share/<group>/",且支持通配符”#”和”+”,我们可以来做个实验:
在两个终端上运行:mosquitto_sub -t '$share/group/topic/+' ,并在第三个终端上运行: mosquitto_pub -t "topic/1" -m "test" --repeat 10
我们会发现再运行 mosquitto_sub 的两个终端上会分别打印出 test,加起来一共十次。

‘$share/group/topic/+’ 中 group 可以为任何有意义字符串; 在 Publish 的时候不再需要加上共享订阅的前缀; 这里为了方便验证,将 EMQ X 设置成允许匿名登录; 可以在 mosquitto.org 找到如何在你的系统上安装 mosquitto MQTT client。

EMQ X Broker 会按照某种顺序依次把消息分发给共享订阅者,实现某种意义上的订阅者负载均衡,可以在< EMQ X 安装目录>/etc/emqx.conf中修改配置项broker.shared_subscription_strategy,对分发的策略进行配置。

  • random:默认值,所有共享订阅者随机选择分发。
  • round_robin:按照共享订阅者订阅的顺序分发。
  • sticky:分发给上次分发的订阅者
  • hash:根据发布 Client 的 ClientID 进行分发。

如果使用共享订阅的方式来实现 Server 端接收设备端数据,我们就可以根据数据量动态的增添共享订阅者,这就不存在单点故障,也具有良好的扩展性。唯一的缺点是,这种方案引入了多个 MQTT Client 这样额外的实体,提高了系统的复杂性,稍稍提高了开发、部署和运维监控的成本。

Hook based 方案

这个方案会使用 EMQ X 的 Hook 机制实现设备上行数据的接收功能
在第一部分,讲设备连接状态时,我们已经了解 EMQ X 的 Hook 设计,并用到了 EMQ X 自带的 WebHook 插件。和设备上线和下线一样,EMQ X 会在有 Publish 的时候将 Publish 的信息通过 Hook 传递出来:

  1. {
  2. "action":"message_publish",
  3. "from_client_id":"C_1492410235117",
  4. "from_username":"C_1492410235117",
  5. "topic":"world",
  6. "qos":0,
  7. "retain":true,
  8. "payload":"Hello world!",
  9. "ts":1492412774
  10. }

这时,我们就可以对数据进行存储和处理,实现接收设备的上行数据。基于 Hook 的方案不用在 Server 端建立和管理连接到 Broker 的 MQTT Client,系统复杂度要低一些。在本课程中,Maque IotHub 会使用基于 Hook 的方案来实现上线数据的接收。

基于共享订阅和基于 Hook 的方式都是在生产环境可以用的解决方案,Maque IotHub 使用基于 Hook 的方式只是因为这样开发和部署要简单一些,而不是因为基于 Hook 的方案相较于共享订阅有明显的,决定性的优势。

定义数据格式

最后让我们来定义一下上行数据的格式,在 IotHub 里,上行数据由两部分组成:负载和元数据

  • 负载: 负载(Payload)是指消息所携带的数据本身,比如传感器在某一时刻的读数。负载可以是任意格式,例如 JSON 字符,或者二进制数据。它的格式是由业务系统和设备之间约定的,IotHub 不对负载进行解析,只负责在业务系统和设备之前传递负载数据并存储。这部分数据包含在 Publish 包的消息体中。
  • 元数据:元数据(Metadata),是指描述消息的数据,包括消息发布者的身份(ProductName, DeviceName)、数据类型等,在 IotHub 中,上线数据使用 QoS1,所以还需要在接收端对消息进行手动去重,那么在元数据中还会包含消息的唯一 ID,在后面我们还会看到更多的元数据类型。元数据的内容包含在 Publish 包的 Topic Name 里面。 IotHub 会对元数据进行解析,以便做后续的处理。

这一节,我们选择了接收上行数据的实现方案,并定义了上行数据的格式。下一节,我们开始设计 IotHub 的上行数据处理功能