从这一节开始,我们来实现 IotHub 的上行数据处理功能:包括设备端和服务端的实现。在设备端我们会实现数据上传的接口,在服务端我们会实现元数据提取、消息去重、存储等功能

DeviceSDK 端实现

DeviceSDK 端的实现比较简单,实现 uploadData 接口,按照上一节约定的主题名发布消息就可以了。在发布数据时,DeviceSDK 负责为消息生成唯一的 MessageID,生成的算法有很多选择,最简单的就是用 UUID,在本课程中,我们使用 BSON 的 ObjectID 作为 MessageID。

BSON,Binary JSON 的简写,是一种类 JSON 的二进制存储格式,我们在本课程的其他地方还会使用 BSON。

  1. //IotHub_Device/sdk/iot_device.js
  2. const objectId = require('bson').ObjectID;
  3. class IotDevice extends EventEmitter{
  4. .....
  5. uploadData(data, type="default"){
  6. if(this.client != null){
  7. var topic = `upload_data/${this.productName}/${this.deviceName}/${type}/${new ObjectId().toHexString()}`
  8. this.client.publish(topic, data, {
  9. qos: 1
  10. })
  11. }
  12. }
  13. }

本课程里面统一使用 QoS1。

IotHub Server 端实现

IotHub Server 端在接收到上行数据时候需要以下几步:

  • 从主题名中提取出上行数据的元数据;
  • 消息去重;
  • 将消息进行存储;
  • 通过 RabbitMQ 通知业务系统。

    配置 Payload 编码

    在前面我们提到,因为我们使用的是 WebHook,所以需要把 payload 进行 Base64 编码,以应对 Payload 是二进制的情况,我们需要对emqx/_web/_hook插件进行配置:
    1. ## <EMQ X 安装目录>/etc/plugins/emqx_web_hook.conf
    2. web.hook.encode_payload = base64
    然后运行:<EMQ X 安装目录>/bin/emqx_ctl plugins reload emqx_web_hook

    提取元数据

    接下来我们需要在类似于"upload_data/IotApp/ODrvBHNaY/temperature/5ce4e36de3522c03b48a8f7f"的主题名中将消息的元数据提取出来,这样的操作用正则表达式进行模式匹配是最好的,不太会写正则表达式的同学也不用担心,这里我们使用 path-to-regexp 来按照预先定义好的规则生成对应的正则表达式:
    //IotHub_Server/services/message_service.js
    const pathToRegexp = require('path-to-regexp')
    class MessageService {
         static dispatchMessage({topic, payload, ts} = {}){
               var dataTopicRule = "upload_data/:productName/:deviceName/:dataType/:messageId";
               const topicRegx = pathToRegexp(dataTopicRule)
               var result = null;
               if ((result = topicRegx.exec(topic)) != null) {
                   var productName = result[1]
                   var deviceName = result[2]
                   var dataType = result[3]
                   var messageId = result[4]
                   //接下来对上行数据进行处理
                   ...
          }
      }
    } 
    module.exports = MessageService
    

    path-to-regexp 的输出是一个通用的正则表达式,所以你不用 Node.js 的话也可以用 path-to-regex 按照主题名规则预先生成正则表达式,再复制到你的代码里面使用,这里每次都重新生成只是为了代码更好读一些。

消息去重

消息去重的原理比较简单,我们使用 Redis 来存储已接收到消息的 MessageID,当收到一条新消息时,先检查 Redis,如果 MessageID 已存在,则丢弃该消息,如果 MessageID 不存在,则将该消息的 MessageID 存入 Redis,然后进行后续处理。
创建 Redis 连接

//IotHub_Server/models/redis.js
const redis = require('redis');
const client = redis.createClient(process.env.REDIS_URL);
client.on("error", function (err) {
    console.log("Error " + err);
});
module.exports = client;

消息去重

//IotHub_Server/services/message_service.js
const redisClient = require("../models/redis")
class MessageService {
    ...
    static checkMessageDuplication(messageId, callback) {
        var key = `/messageIDs/${messageId}`
        redisClient.setnx(key, "", function (err, res) {
            if (res == 1) {
                redisClient.expire(key, 60 * 60 * 6)
                callback.call(this, false)
            } else {
                callback.call(this, true)
            }
        })
    }
}
...

首先使用 setnx 命令往 Redis 插入由MessageID组成的 key,如果说 setnx 返回 1,说明 key 不存在,那么消息可以进行后续处理;如果返回 0,则说明 MessageID 已经存在,应该要丢掉这个消息。在这里我们给 key 设置了 6个小时的有效期,理论上来你应该永久保存已接收到 MessageID,但是存储 MessageID 的空间不是无限的,需要一个清理的方式,在实际项目中,6 个小时的有效期已经可以应对 99.9% 的情况了,你也可以根据实际情况自行调整。

消息存储

消息存储就非常简单了,这里我们仍然做一个封装:

//IotHub_Server/services/message_service.js
static handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {
        var message = new Message({
            product_name: productName,
            device_name: deviceName,
            payload: payload,
            message_id: messageId,
            data_type: dataType,
            sent_at: ts
        })
        message.save()
    }

代码联调

现在,我们把之前的代码都合并到一起。

//IotHub_Service/services/message_service.js
static dispatchMessage({topic, payload, ts} = {}) {
        var dataTopicRule = "upload_data/:productName/:deviceName/:dataType/:messageId";
        const topicRegx = pathToRegexp(dataTopicRule)
        var result = null;
        if ((result = topicRegx.exec(topic)) != null) {
            this.checkMessageDuplication(result[4], function (isDup) {
                if (!isDup) {
                    MessageService.handleUploadData({
                        productName: result[1],
                        deviceName: result[2],
                        dataType: result[3],
                        messageId: result[4],
                        ts: ts,
                        payload: Buffer.from(payload, 'base64')
                    })
                }
            })
        }
    }

使用Buffer.from(payload, 'base64')进行 Base64 解码。

然后在 WebHook 接口里面调用这个方法:

//IotHub_Server/routes/emqx_web_hook.js
router.post("/", function (req, res) {
    switch (req.body.action){
        case "client_connected":
            Device.addConnection(req.body)
            break
        case "client_disconnected":
            Device.removeConnection(req.body)
            break;
        case "message_publish":
            messageService.dispatchMessage({
                topic: req.body.topic,
                payload: req.body.payload,
                ts: req.body.ts
            })
    }
    res.status(200).send("ok")
})

最后,我们需要在设备的 ACL 列表里面加入这个主题,使设备有权限发布数据到这个主题:

//IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
    const publish = [
        `upload_data/${this.productName}/${this.deviceName}/+/+`
    ]
    ...
}

这些步骤做完后,我们可以写一端代码来验证下。我们需要事先重新注册一个设备来上传数据(或者你需要手动在 MongoDB 里更新老设备的 ACL 列表):

//IotHub_Device/samples/upload_data.js
var IotDevice = require("../sdk/iot_device")
require('dotenv').config()
var device = new IotDevice({
    productName: process.env.PRODUCT_NAME,
    deviceName: process.env.DEVICE_NAME,
    secret: process.env.SECRET,
    clientID: "upload_data.js"
})
device.on("online", function () {
    console.log("device is online")
})
device.connect()
device.uploadData("this is a sample data", "sample")

运行upload_data.js之后,查询 IotHub 数据库的 Messages collection,可以发现刚才发送的消息已经存入进来了,如下图所示。
14.实现(一) - 图1

这一节,我们完成了上行数据处理的大部分功能,但还缺少通知业务系统,以及消息数据的查询功能。下一节,我们来完成这些功能,并完善细节。