这一节我们来实现 IotHub 上行数据处理剩下的一下功能:通知业务系统以及 Server API 消息查询接口。然后完善一些细节。

通知业务系统

当上行数据到达 IotHub 时,IotHub 可以通过 RabbitMQ 来通知并发送新的上行数据给业务系统。这里我们做一个约定,当有新的上行数据达到时,IotHub 会向 RabbitMQ 名为”iothub.events.upload_data”的 Direct Exchage 的发送一条消息,RoutingKey 为设备的 ProductName。本课程使用 ampqlib 作为 RabbitMQ Client 端实现。

关于 RabbitMQ Routing 相关的概念可以查看 RabbitMQ Tutorials,本课程就不赘述了。

首先初始化 RabbitMQ Client,并确保对应的 Exchange 存在:

  1. //IotHub_Server/services/notify_service.js
  2. var amqp = require('amqplib/callback_api');
  3. var uploadDataExchange = "iothub.events.upload_data"
  4. var currentChannel = null;
  5. amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) {
  6. if (error0) {
  7. console.log(error0);
  8. } else {
  9. connection.createChannel(function (error1, channel) {
  10. if (error1) {
  11. console.log(error1)
  12. } else {
  13. currentChannel = channel;
  14. channel.assertExchange(uploadDataExchange, 'direct', {durable: true})
  15. }
  16. });
  17. }
  18. });

然后实现通知业务系统的功能:

  1. //IotHub_Server/services/notify_service.js
  2. const bson = require('bson')
  3. class NotifyService {
  4. static notifyUploadData(message) {
  5. var data = bson.serialize({
  6. device_name: message.device_name,
  7. payload: message.payload,
  8. send_at: message.sendAt,
  9. data_type: message.dataType,
  10. message_id: message.message_id
  11. })
  12. if(currentChannel != null) {
  13. currentChannel.publish(uploadDataExchange, message.product_name, data, {
  14. persistent: true
  15. })
  16. }
  17. }
  18. }
  19. module.exports = NotifyService

这里使用的是 Bson 对上传数据的相关信息进行序列化之后,再发送到相应的 Exchange 上,所以业务系统获取到这个数据以后需要先用 Bson 反序列化。这个是 IotHub 和业务系统之间的约定。
最后在接收到上行数据的时候调用这个接口:

  1. //IotHub_Server/service/message_service.js
  2. static handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {
  3. var message = new Message({
  4. product_name: productName,
  5. device_name: deviceName,
  6. payload: payload,
  7. message_id: messageId,
  8. data_type: dataType,
  9. sent_at: ts
  10. })
  11. message.save()
  12. NotifyService.notifyUploadData(message)

接下来我们可以写一小段代码来模拟业务系统从 IotHub 获取通知:

  1. //IotHub_Server/business_sim.js
  2. require('dotenv').config()
  3. const bson = require('bson')
  4. var amqp = require('amqplib/callback_api');
  5. var uploadDataExchange = "iothub.events.upload_data"
  6. amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) {
  7. if (error0) {
  8. console.log(error0);
  9. } else {
  10. connection.createChannel(function (error1, channel) {
  11. if (error1) {
  12. console.log(error1)
  13. } else {
  14. channel.assertExchange(uploadDataExchange, 'direct', {durable: true})
  15. var queue = "iotapp_upload_data";
  16. channel.assertQueue(queue, {
  17. durable: true
  18. })
  19. channel.bindQueue(queue, uploadDataExchange, "IotApp")
  20. channel.consume(queue, function (msg) {
  21. var data = bson.deserialize(msg.content)
  22. console.log(`received from ${data.device_name}, messageId: ${data.message_id},payload: ${data.payload.toString()}`)
  23. channel.ack(msg)
  24. })
  25. }
  26. });
  27. }
  28. });

首先运行这段代码,然后再运行IotHub_Device/samples/upload_data.js,可以看到在运行business_sim.js的终端上会输出:

  1. received from QcdJPHjDR, messageId: 5ceb788f80124804aa1ea95b,payload: this is a sample data

那么通知业务系统的功能就完成了。

消息查询 Server API

消息查询 Server API 的实现就很简单了,可以根据产品、设备和 MessageID 进行查询:

//IotHub_Server/routes/messages.js
var express = require('express');
var router = express.Router();
var Message = require('../models/message')
router.get("/:productName", function (req, res) {
    var messageId = req.query.message_id
    var deviceName = req.query.device_name
    var productName = req.params.productName
    var query = {product_name: productName}
    if (messageId != null) {
        query.message_id = messageId
    }
    if (deviceName != null) {
        query.device_name = deviceName
    }
    Message.find(query, function (error, messages) {
        res.json({
            messages: messages.map(function (message) {
                return message.toJSONObject()
            })
        })
    })
})
module.exports = router
//IotHub_Server/app.js
var messageRouter = require('./routes/messages')
app.use('/messages', messageRouter)
//IotHub_Server/models/message.js
messageSchema.methods.toJSONObject = function () {
    return {
        product_name: this.product_name,
        device_name: this.device_name,
        send_at: this.send_at,
        data_type: this.data_type,
        message_id: this.message_id,
        payload: this.payload.toString("base64")
    }
}

调用接口curl http://localhost:3000/messages/IotApp\?device_name\=QcdJPHjDR\&message_id\=5ceb788f80124804aa1ea95b 会有以下输出:

{"messages":[{"product_name":"IotApp","device_name":"QcdJPHjDR","data_type":"sample","message_id":"5ceb788f80124804aa1ea95b","payload":"dGhpcyBpcyBhIHNhbXBsZSBkYXRh"}]}

唯一要注意的是,接口返回的 payload 字段是用 Base64 编码的。

使用可持久化的 Message Store

MQTT Client 在发布 QoS>1 的消息时,会先在本地存储这条消息,等收到 Receiver 的 ACK 之后,再删除这条消息。 同时,在现在大部分的 Client 实现里,也会把还没有发布出去的消息也缓存在本地,这样的话即使 Client 和Broker 的连接因为网络问题短信,也可以调用 publish 方法,在恢复连接之后,这部分消息再依次发布出去。 这些消息被称为 in-flight 消息,用于存储 in-flight 消息的叫 Message Store。
在 DeviceSDK 里面,in-flight 消息是存储在内存里面的,这是有问题的:设备断电之后,in-flight 消息就都丢了。所以我们需要可持久化的 Message Store。
Node.js 版的 MQTT Client 有几种可用的 Persistent Message Store:mqtt-level-storemqtt-nedbb-storemqtt-localforage-store,这里我们选择 mqtt-level-store 作为可持久化的 Message Store:

//IotHub_Device/sdk/iot_device.js
var levelStore = require('mqtt-level-store');
constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
        ...
        if(storePath != null) {
            this.manager = levelStore(storePath);
        }
    }
connect() {
        var opts = {
            rejectUnauthorized: false,
            username: this.username,
            password: this.secret,
            clientId: this.clientIdentifier,
            clean: false
        };
        if(this.manager != null){
            opts.incomingStore = this.manager.incoming
            opts.outgoingStore = this.manager.outgoing
        }
        this.client = mqtt.connect(this.serverAddress, opts)
        ...
}

然后修改一下samples/upload_data.js:

var device = new IotDevice({
    productName: process.env.PRODUCT_NAME,
    deviceName: process.env.DEVICE_NAME,
    secret: process.env.SECRET,
    clientID: path.basename(__filename, ".js"),
    storePath: `../tmp/${path.basename(__filename, ".js")}`
})

运行 upload_data.js,可以看到IotHub_Device/tmp/upload_data目录下生成了一些文件。 现在 in-flight 消息就可以实现可持久化了。

这里我们使用 JavaScript 的文件名来命名 Message Store 和 ClientID,这样的话 sample 目录下不同的 JavaScript 文件在运行时就不会产生冲突了。 在大多数语言的 MQTT Client 库都有类似的持久化 Message Store 实现,所以你在其他语言或者平台上开发的时,需要找到或者实现对应的持久化 Message Store。

这一节我们完成了上行数据处理的剩余功能,并完善了细节。下一节我们讨论并处理另外一种上行数据:设备状态上报