功能设计
IotHub 的上行数据处理有以下一些功能。
- 存储上行数据: IotHub 接收设备端上传的数据,并将数据来源(设备的 ProductName 和 DeviceName)、消息 ID、消息类型、Payload 进行存储。
- 通知业务系统:当有新的上行数据到达时,IotHub 通知并将上行数据发送给业务系统,业务系统可以自行处理这些数据,例如通知用户,将数据和其他业务数据融合后存储在业务系统的数据库等。
设备数据查询:业务系统可以通过 IotHub Server API 查询某个设备上传的历史数据。
主题规划
从这里开始,我们需要对 IotHub 的设备可以订阅或者发布的主题进行规划,设备会发布和订阅很多主题,这里不一起规划完,而是一节一节、一个案例一个案例地说明。
如上一节所说,我们会把元数据放在主题名里面。设备用于发布数据的主题名格式为:upload_data/:ProductName/:DeviceName/:DataType/:MessageID
ProductName:设备的产品名。
- DeviceName:设备名。
- DataType:上传数据的类型,这个由业务系统和设备约定,比如传感器的温度数据可以设 DataType=”temperature”,在主题名中添加这一个层级的目的是为了使主题名尽量精确。(这是一个 MQTT 系统设计的 best practice)。
- MessageID:每一个消息的唯一 ID。
假设设备发布消息的主题名为:upload_data/IotApp/ODrvBHNaY/temperature/5ce4e36de3522c03b48a8f7f
,那么 IotHub Server 通过解析主题名,就可以获取该条消息的元数据:消息为设备上传的数据,来自设备(IotAPP, ODrvBHNaY),数据类型为 temperature,消息的 ID 为 5ce4e36de3522c03b48a8f7f。
存储上行数据
IotHub 会把消息存储在 MongoDB 中,这里我们来定义存储消息的 MongoDB collection:
//IotHub_Server/models/message.js
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
const messageSchema = new Schema({
message_id: String,
product_name: String,
device_name: String,
data_type: String,
payload: Buffer,
sent_at: Number
})
const Message = mongoose.model("Message", messageSchema);
module.exports = Message
复制
消息可以根据 message_id 或者(ProductName, Device)查询,所以这里创建相应的索引:
# MongoDB Shell
use iothub
db.messages.createIndex({
"production_name" : 1,
"device_name" : 1
})
db.messages.createIndex({
"message_id" : 1
})
复制
因为 payload 可以是任意类型的数据,例如字符串或者二进制,所以这里将它定义为 buffer 类型。
通知业务系统
实际上有很多种方式可以在新的上行数据到达时通知业务系统,比如调用业务系统预先注册的回调 URL,使用队列系统等,这属于软件层面的架构设计,所以本课程选择一种简单的方式来进行演示就可以了,在本课程中,我们使用 RabbitMQ 进行通知,当有新上线数据到达时, IotHub 会向相应的 Exchange 中发布一条包含消息内容的数据。
阿里云 IoT 的规则引擎就可以很方便配置上行数据在到达物联网平台之后的后续流向,不需要写代码。但这属于软件层面的抽象和设计,所以在本课程不讨论规则引擎的设计和实现,让内容专注于协议级别的抽象。
数据查询
Server API 将提供接口供业务系统查询存储在 IotHub 的设备上行数据,可以通过 MessageID、(ProductName, DeviceName)进行查询。
由于 payload 可以是任意的二进制数据,所以当通过 HTTP 接口返回 payload 内容时,需要进行编码,本课程使用 Base64 进行编码。
上行数据处理流程
综合上面的设计,我们可以画出 IotHub 上行数据处理的流程:
- 物联网设备调用 DeviceSDK 的接口将数据发布到
upload_data/:ProductName/:DeviceName/:DataType/:MessageID
(MessageID 由 DeviceSDK 生成)。 - EMQ X Broker 通过 WebHook 将消息传递给 IotHub Server。
- IotHub 将消息存储到 MongoDB。
- IotHub 将数据放入对应的 RabbitMQ 队列。
- 业务系统从 RabbitMQ 获取新的上行数据;业务系统也可以调用 Server API 提供的接口查询设备的上行数据。
这一节我们完成了 IotHub 的上行数据处理功能,下面让我们开始实现这些功能并完善细节。