这一节我们来实现 IotHub 上行数据处理剩下的一下功能:通知业务系统以及 Server API 消息查询接口。然后完善一些细节。
通知业务系统
当上行数据到达 IotHub 时,IotHub 可以通过 RabbitMQ 来通知并发送新的上行数据给业务系统。这里我们做一个约定,当有新的上行数据达到时,IotHub 会向 RabbitMQ 名为”iothub.events.upload_data”的 Direct Exchage 的发送一条消息,RoutingKey 为设备的 ProductName。本课程使用 ampqlib 作为 RabbitMQ Client 端实现。
关于 RabbitMQ Routing 相关的概念可以查看 RabbitMQ Tutorials,本课程就不赘述了。
首先初始化 RabbitMQ Client,并确保对应的 Exchange 存在:
//IotHub_Server/services/notify_service.js
var amqp = require('amqplib/callback_api');
var uploadDataExchange = "iothub.events.upload_data"
var currentChannel = null;
amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) {
if (error0) {
console.log(error0);
} else {
connection.createChannel(function (error1, channel) {
if (error1) {
console.log(error1)
} else {
currentChannel = channel;
channel.assertExchange(uploadDataExchange, 'direct', {durable: true})
}
});
}
});
然后实现通知业务系统的功能:
//IotHub_Server/services/notify_service.js
const bson = require('bson')
class NotifyService {
static notifyUploadData(message) {
var data = bson.serialize({
device_name: message.device_name,
payload: message.payload,
send_at: message.sendAt,
data_type: message.dataType,
message_id: message.message_id
})
if(currentChannel != null) {
currentChannel.publish(uploadDataExchange, message.product_name, data, {
persistent: true
})
}
}
}
module.exports = NotifyService
这里使用的是 Bson 对上传数据的相关信息进行序列化之后,再发送到相应的 Exchange 上,所以业务系统获取到这个数据以后需要先用 Bson 反序列化。这个是 IotHub 和业务系统之间的约定。
最后在接收到上行数据的时候调用这个接口:
//IotHub_Server/service/message_service.js
static handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {
var message = new Message({
product_name: productName,
device_name: deviceName,
payload: payload,
message_id: messageId,
data_type: dataType,
sent_at: ts
})
message.save()
NotifyService.notifyUploadData(message)
接下来我们可以写一小段代码来模拟业务系统从 IotHub 获取通知:
//IotHub_Server/business_sim.js
require('dotenv').config()
const bson = require('bson')
var amqp = require('amqplib/callback_api');
var uploadDataExchange = "iothub.events.upload_data"
amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) {
if (error0) {
console.log(error0);
} else {
connection.createChannel(function (error1, channel) {
if (error1) {
console.log(error1)
} else {
channel.assertExchange(uploadDataExchange, 'direct', {durable: true})
var queue = "iotapp_upload_data";
channel.assertQueue(queue, {
durable: true
})
channel.bindQueue(queue, uploadDataExchange, "IotApp")
channel.consume(queue, function (msg) {
var data = bson.deserialize(msg.content)
console.log(`received from ${data.device_name}, messageId: ${data.message_id},payload: ${data.payload.toString()}`)
channel.ack(msg)
})
}
});
}
});
首先运行这段代码,然后再运行IotHub_Device/samples/upload_data.js
,可以看到在运行business_sim.js
的终端上会输出:
received from QcdJPHjDR, messageId: 5ceb788f80124804aa1ea95b,payload: this is a sample data
消息查询 Server API
消息查询 Server API 的实现就很简单了,可以根据产品、设备和 MessageID 进行查询:
//IotHub_Server/routes/messages.js
var express = require('express');
var router = express.Router();
var Message = require('../models/message')
router.get("/:productName", function (req, res) {
var messageId = req.query.message_id
var deviceName = req.query.device_name
var productName = req.params.productName
var query = {product_name: productName}
if (messageId != null) {
query.message_id = messageId
}
if (deviceName != null) {
query.device_name = deviceName
}
Message.find(query, function (error, messages) {
res.json({
messages: messages.map(function (message) {
return message.toJSONObject()
})
})
})
})
module.exports = router
//IotHub_Server/app.js
var messageRouter = require('./routes/messages')
app.use('/messages', messageRouter)
//IotHub_Server/models/message.js
messageSchema.methods.toJSONObject = function () {
return {
product_name: this.product_name,
device_name: this.device_name,
send_at: this.send_at,
data_type: this.data_type,
message_id: this.message_id,
payload: this.payload.toString("base64")
}
}
调用接口curl http://localhost:3000/messages/IotApp\?device_name\=QcdJPHjDR\&message_id\=5ceb788f80124804aa1ea95b
会有以下输出:
{"messages":[{"product_name":"IotApp","device_name":"QcdJPHjDR","data_type":"sample","message_id":"5ceb788f80124804aa1ea95b","payload":"dGhpcyBpcyBhIHNhbXBsZSBkYXRh"}]}
唯一要注意的是,接口返回的 payload 字段是用 Base64 编码的。
使用可持久化的 Message Store
MQTT Client 在发布 QoS>1 的消息时,会先在本地存储这条消息,等收到 Receiver 的 ACK 之后,再删除这条消息。 同时,在现在大部分的 Client 实现里,也会把还没有发布出去的消息也缓存在本地,这样的话即使 Client 和Broker 的连接因为网络问题短信,也可以调用 publish 方法,在恢复连接之后,这部分消息再依次发布出去。 这些消息被称为 in-flight 消息,用于存储 in-flight 消息的叫 Message Store。
在 DeviceSDK 里面,in-flight 消息是存储在内存里面的,这是有问题的:设备断电之后,in-flight 消息就都丢了。所以我们需要可持久化的 Message Store。
Node.js 版的 MQTT Client 有几种可用的 Persistent Message Store:mqtt-level-store、mqtt-nedbb-store 和 mqtt-localforage-store,这里我们选择 mqtt-level-store 作为可持久化的 Message Store:
//IotHub_Device/sdk/iot_device.js
var levelStore = require('mqtt-level-store');
constructor({serverAddress = "127.0.0.1:8883", productName, deviceName, secret, clientID, storePath} = {}) {
...
if(storePath != null) {
this.manager = levelStore(storePath);
}
}
connect() {
var opts = {
rejectUnauthorized: false,
username: this.username,
password: this.secret,
clientId: this.clientIdentifier,
clean: false
};
if(this.manager != null){
opts.incomingStore = this.manager.incoming
opts.outgoingStore = this.manager.outgoing
}
this.client = mqtt.connect(this.serverAddress, opts)
...
}
然后修改一下samples/upload_data.js:
var device = new IotDevice({
productName: process.env.PRODUCT_NAME,
deviceName: process.env.DEVICE_NAME,
secret: process.env.SECRET,
clientID: path.basename(__filename, ".js"),
storePath: `../tmp/${path.basename(__filename, ".js")}`
})
运行 upload_data.js,可以看到IotHub_Device/tmp/upload_data
目录下生成了一些文件。 现在 in-flight 消息就可以实现可持久化了。
这里我们使用 JavaScript 的文件名来命名 Message Store 和 ClientID,这样的话 sample 目录下不同的 JavaScript 文件在运行时就不会产生冲突了。 在大多数语言的 MQTT Client 库都有类似的持久化 Message Store 实现,所以你在其他语言或者平台上开发的时,需要找到或者实现对应的持久化 Message Store。
这一节我们完成了上行数据处理的剩余功能,并完善了细节。下一节我们讨论并处理另外一种上行数据:设备状态上报。