从这一节开始,我们来实现 IotHub 的上行数据处理功能:包括设备端和服务端的实现。在设备端我们会实现数据上传的接口,在服务端我们会实现元数据提取、消息去重、存储等功能。
DeviceSDK 端实现
DeviceSDK 端的实现比较简单,实现 uploadData 接口,按照上一节约定的主题名发布消息就可以了。在发布数据时,DeviceSDK 负责为消息生成唯一的 MessageID,生成的算法有很多选择,最简单的就是用 UUID,在本课程中,我们使用 BSON 的 ObjectID 作为 MessageID。
BSON,Binary JSON 的简写,是一种类 JSON 的二进制存储格式,我们在本课程的其他地方还会使用 BSON。
//IotHub_Device/sdk/iot_device.js
const objectId = require('bson').ObjectID;
class IotDevice extends EventEmitter{
.....
uploadData(data, type="default"){
if(this.client != null){
var topic = `upload_data/${this.productName}/${this.deviceName}/${type}/${new ObjectId().toHexString()}`
this.client.publish(topic, data, {
qos: 1
})
}
}
}
本课程里面统一使用 QoS1。
IotHub Server 端实现
IotHub Server 端在接收到上行数据时候需要以下几步:
- 从主题名中提取出上行数据的元数据;
- 消息去重;
- 将消息进行存储;
- 通过 RabbitMQ 通知业务系统。
配置 Payload 编码
在前面我们提到,因为我们使用的是 WebHook,所以需要把 payload 进行 Base64 编码,以应对 Payload 是二进制的情况,我们需要对emqx/_web/_hook
插件进行配置:
然后运行:## <EMQ X 安装目录>/etc/plugins/emqx_web_hook.conf
web.hook.encode_payload = base64
<EMQ X 安装目录>/bin/emqx_ctl plugins reload emqx_web_hook
。提取元数据
接下来我们需要在类似于"upload_data/IotApp/ODrvBHNaY/temperature/5ce4e36de3522c03b48a8f7f"
的主题名中将消息的元数据提取出来,这样的操作用正则表达式进行模式匹配是最好的,不太会写正则表达式的同学也不用担心,这里我们使用 path-to-regexp 来按照预先定义好的规则生成对应的正则表达式://IotHub_Server/services/message_service.js const pathToRegexp = require('path-to-regexp') class MessageService { static dispatchMessage({topic, payload, ts} = {}){ var dataTopicRule = "upload_data/:productName/:deviceName/:dataType/:messageId"; const topicRegx = pathToRegexp(dataTopicRule) var result = null; if ((result = topicRegx.exec(topic)) != null) { var productName = result[1] var deviceName = result[2] var dataType = result[3] var messageId = result[4] //接下来对上行数据进行处理 ... } } } module.exports = MessageService
path-to-regexp 的输出是一个通用的正则表达式,所以你不用 Node.js 的话也可以用 path-to-regex 按照主题名规则预先生成正则表达式,再复制到你的代码里面使用,这里每次都重新生成只是为了代码更好读一些。
消息去重
消息去重的原理比较简单,我们使用 Redis 来存储已接收到消息的 MessageID,当收到一条新消息时,先检查 Redis,如果 MessageID 已存在,则丢弃该消息,如果 MessageID 不存在,则将该消息的 MessageID 存入 Redis,然后进行后续处理。
创建 Redis 连接
//IotHub_Server/models/redis.js
const redis = require('redis');
const client = redis.createClient(process.env.REDIS_URL);
client.on("error", function (err) {
console.log("Error " + err);
});
module.exports = client;
消息去重
//IotHub_Server/services/message_service.js
const redisClient = require("../models/redis")
class MessageService {
...
static checkMessageDuplication(messageId, callback) {
var key = `/messageIDs/${messageId}`
redisClient.setnx(key, "", function (err, res) {
if (res == 1) {
redisClient.expire(key, 60 * 60 * 6)
callback.call(this, false)
} else {
callback.call(this, true)
}
})
}
}
...
首先使用 setnx 命令往 Redis 插入由MessageID组成的 key,如果说 setnx 返回 1,说明 key 不存在,那么消息可以进行后续处理;如果返回 0,则说明 MessageID 已经存在,应该要丢掉这个消息。在这里我们给 key 设置了 6个小时的有效期,理论上来你应该永久保存已接收到 MessageID,但是存储 MessageID 的空间不是无限的,需要一个清理的方式,在实际项目中,6 个小时的有效期已经可以应对 99.9% 的情况了,你也可以根据实际情况自行调整。
消息存储
消息存储就非常简单了,这里我们仍然做一个封装:
//IotHub_Server/services/message_service.js
static handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {
var message = new Message({
product_name: productName,
device_name: deviceName,
payload: payload,
message_id: messageId,
data_type: dataType,
sent_at: ts
})
message.save()
}
代码联调
现在,我们把之前的代码都合并到一起。
//IotHub_Service/services/message_service.js
static dispatchMessage({topic, payload, ts} = {}) {
var dataTopicRule = "upload_data/:productName/:deviceName/:dataType/:messageId";
const topicRegx = pathToRegexp(dataTopicRule)
var result = null;
if ((result = topicRegx.exec(topic)) != null) {
this.checkMessageDuplication(result[4], function (isDup) {
if (!isDup) {
MessageService.handleUploadData({
productName: result[1],
deviceName: result[2],
dataType: result[3],
messageId: result[4],
ts: ts,
payload: Buffer.from(payload, 'base64')
})
}
})
}
}
使用
Buffer.from(payload, 'base64')
进行 Base64 解码。
然后在 WebHook 接口里面调用这个方法:
//IotHub_Server/routes/emqx_web_hook.js
router.post("/", function (req, res) {
switch (req.body.action){
case "client_connected":
Device.addConnection(req.body)
break
case "client_disconnected":
Device.removeConnection(req.body)
break;
case "message_publish":
messageService.dispatchMessage({
topic: req.body.topic,
payload: req.body.payload,
ts: req.body.ts
})
}
res.status(200).send("ok")
})
最后,我们需要在设备的 ACL 列表里面加入这个主题,使设备有权限发布数据到这个主题:
//IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
const publish = [
`upload_data/${this.productName}/${this.deviceName}/+/+`
]
...
}
这些步骤做完后,我们可以写一端代码来验证下。我们需要事先重新注册一个设备来上传数据(或者你需要手动在 MongoDB 里更新老设备的 ACL 列表):
//IotHub_Device/samples/upload_data.js
var IotDevice = require("../sdk/iot_device")
require('dotenv').config()
var device = new IotDevice({
productName: process.env.PRODUCT_NAME,
deviceName: process.env.DEVICE_NAME,
secret: process.env.SECRET,
clientID: "upload_data.js"
})
device.on("online", function () {
console.log("device is online")
})
device.connect()
device.uploadData("this is a sample data", "sample")
运行upload_data.js
之后,查询 IotHub 数据库的 Messages collection,可以发现刚才发送的消息已经存入进来了,如下图所示。
这一节,我们完成了上行数据处理的大部分功能,但还缺少通知业务系统,以及消息数据的查询功能。下一节,我们来完成这些功能,并完善细节。