设备状态数据
上一节,我们完成了对设备上行数据的处理,假设说我们有一台装有温度传感器的设备,那么它可以使用这个功能将每个时刻统计到的温度数据上报到 IotHub,IotHub 会记录每一条温度数据并通知业务系统,业务系统可以自行存储温度数据也可以使用 IotHub 提供的接口来查询不同时刻的温度数据。
除了温度读数,设备可能还会需要上报一些其他数据,比如当前使用的软件/硬件版本、传感器状态(有没有坏掉)、电池电量等,这些属于设备的状态数据,通常我们不会关心这些数据的记录,只关心当前的状态,那么用我们在前面实现的上报数据功能来管理设备的状态就稍微有点不合适了。
设备状态上报
IotHub 需要对设备的状态进行单独处理,我们这样来设计 IotHub 的设备状态管理功能。
- 设备用 JSON 的格式将当前的状态发布到主题:
update_status/:productName/:deviceName/:messageId
。 - IotHub 将设备的状态用 JSON 的格式存储在 Devices Collection 中。
- IotHub 将设备的状态通知到业务系统,业务系统再做后续的处理,比如通知相关运维人员等。
- IotHub 提供接口供业务系统查询设备的当前状态。
为了对消息进行去重,设备状态消息也会带 MessageID; 设备状态消息一定是单向,设备端状态更改,然后经由 IotHub 通知到业务系统,如果一个状态是业务系统/IotHub 和设备端都有可能更改的,那么使用之后我们要讲的设备影子可能会更好; 如果说业务系统需要记录设备状态的历史记录,那么使用前面实现的上行数据管理功能就可以了。把设备的状态也看作一般上行数据。
DeviceSDK 端实现
DeviceSDK 需要实现一个接口,将状态数据发布到指定的主题:
//IotHub_Device/sdk/iot_device.js
updateStatus(status){
if(this.client != null){
var topic = `update_status/${this.productName}/${this.deviceName}/${new ObjectId().toHexString()}`
this.client.publish(topic, JSON.stringify(status), {
qos: 1
})
}
}
IotHub Server 端实现
增加 ACL 列表
首先需要将上报状态的主题加入设备的 ACL 列表:
//IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
const publish = [
`upload_data/${this.product_name}/${this.device_name}/+/+`,
`update_status/${this.product_name}/${this.device_name}/+`
]
}
你可能需要重新注册一个设备,进行后续测试(或者手动在 MongoDB 里更新老设备的 ACL 列表)
存储状态数据
接下来在 Devices Collection 里面添加一个字段来存储设备状态:
//IotHub_Server/models/devices
const deviceSchema = new Schema({
...
device_status: {
type: String,
default: "{}"
},
last_status_update: Number //最后一次状态更新的时间
})
然后在 WebHook 里判断是否是设备上报的状态数据,如果是,则进行存储:
//IotHub_Server/services/message_service.js
static dispatchMessage({topic, payload, ts} = {}) {
var dataTopicRule = "upload_data/:productName/:deviceName/:dataType/:messageId";
var statusTopicRule = "update_status/:productName/:deviceName/:messageId"
const topicRegx = pathToRegexp(dataTopicRule)
const statusRegx = pathToRegexp(statusTopicRule)
var result = null;
if ((result = topicRegx.exec(topic)) != null) {
//处理上报数据
...
} else if ((result = statusRegx.exec(topic)) != null) {
this.checkMessageDuplication(result[3], function (isDup) {
if (!isDup) {
MessageService.handleUpdateStatus({
productName: result[1],
deviceName: result[2],
deviceStatus: new Buffer(payload, 'base64').toString(),
ts: ts
})
}
})
}
}
虽然 MQTT 协议可以保证数据包是按序到达的,但是在 WebHook 并发处理时有可能会乱序,所以我们只更新时间更近的状态数据:
//IotHub_Server/services/message_service.js
static handleUpdateStatus({productName, deviceName, deviceStatus, ts}) {
Device.findOneAndUpdate({product_name: productName, device_name: deviceName,
"$or":[{last_status_update:{"$exists":false}}, {last_status_update:{"$lt":ts}}]},
{device_status: deviceStatus, last_status_update: ts}, {useFindAndModify: false}).exec()
}
通知业务系统
同上报数据一样,设备上报状态时 IotHub 也是通过 RabbmitMQ 来通知业务系统,IotHub 会向 RabbitMQ 名为”iothub.events.update_status”的 Direct Exchage 发送一条消息,RoutingKey 为设备的 ProductName,消息格式依然是 BSON:
//IotHub_Server/services/notify_service.js
var updateStatusExchange = "iothub.events.update_status"
static notifyUpdateStatus({productName, deviceName, deviceStatus}){
var data = bson.serialize({
device_name: deviceName,
device_status: deviceStatus
})
if(currentChannel != null) {
currentChannel.publish(updateStatusExchange, productName, data, {
persistent: true
})
}
}
//IotHub_Server/services/message_service.js
static handleUpdateStatus({productName, deviceName, deviceStatus, ts}) {
Device.findOneAndUpdate({product_name: productName, device_name: deviceName,
"$or":[{last_status_update:{"$exists":false}}, {last_status_update:{"$lt":ts}}]},
{device_status: deviceStatus, last_status_update: ts}, {useFindAndModify: false}).exec(function (error, device) {
if (device != null) {
NotifyService.notifyUpdateStatus({
productName: productName,
deviceName: deviceName,
deviceStatus: deviceStatus
})
}
})
}
查询设备状态
我们只需在设备详情接口里面添加状态字段就可以了:
//IotHub_Server/models/device.js
deviceSchema.methods.toJSONObject = function () {
return {
product_name: this.product_name,
device_name: this.device_name,
secret: this.secret,
device_status: JSON.parse(this.device_status)
}
}
验证流程
我们可以在business_sim.js
里加一小段代码,来验证整个流程:
//IotHub_Server/business_sim.js
var updateStatusExchange = "iothub.events.update_status"
channel.assertExchange(updateStatusExchange, 'direct', {durable: true})
var queue = "iotapp_update_status";
channel.assertQueue(queue, {durable: true})
channel.bindQueue(queue, updateStatusExchange, "IotApp")
channel.consume(queue, function (msg) {
var data = bson.deserialize(msg.content)
console.log(`received from ${data.device_name}, status: ${data.device_status}`)
channel.ack(msg)
})
再写一小段代码调用 DeviceSDK 来上报设备状态:
//IotHub_Device/samples/update_status.js
...
device.connect()
device.updateStatus({lights: "on"})
运行IotHub_Server/business_sim.js
,然后运行update_status.js
,再运行business_sim.js
的终端会输出:
received from 60de4bqyu, status: {"lights":"on"}
接着再调用设备详情接口curl http://localhost:3000/devices/IotApp/60de4bqyu
:
{"product_name":"IotApp","device_name":"60de4bqyu","secret":"sVDhDJZhm7","device_status":{"lights":"on"},"connections":[{"connected":true,"client_id":"IotApp/60de4bqyu/update_status","ipaddress":"127.0.0.1","connected_at":1558964672,"disconnect_at":1558964668}]}%
为何不用 Retained Message ?
看过《MQTT 协议快速入门》的同学可能还记得,在那门课程中,我提到过用 Retained Message 来记录设备的状态,这是个不错的方案,但是这在 Maque IotHub 里目前是行不通的,我们来看一下假设设备在向某个主题 TopicA 发送一条 Retained Message 来表明自己的状态时,会发生什么:
- 设备 A 向 TopicA 发送一条消息 M,标记为 Retained,QoS=1;
- EMQ X Broker 收到 M,回复设备 A PUBACK;
- EMQ X 为 TopicA 保存下 Retained 消息 M_retained;
- EMQ X 通过 WebHook 将消息传递给 IotHub Server;
- EMQ X 发现没有任何 Client 订阅 TopicA,丢弃掉 M。
大家可以看到,因为在 IotHub 中使用的是基于 Hook 的方式来获取设备发布的消息,没有实际的 Client 订阅设备发布状态的主题,所以即使发送 Retained Message,也只是白浪费 Broker 的存储空间罢了。
那么设备在什么时候需要上报状态呢,在 DeviceSDK 里面没有强制的约定,不过我的建议是:
- 在设备每次开机时;
- 在状态发生变化时。
这一节我们完成了设备状态上报的功能。下一节,我们来了解一下一种非常适合于物联网数据存储的数据库:时序数据库。