这一节我们开始实现 OTA 升级的服务端功能。在服务端我们会规划OTA相关的主题名,并存储设备上报的升级进度;服务端还需要提供接口,供业务系统下发OTA升级指令和查询升级进度。
主题规划
升级指令的下发使用已有的指令下发功能就好,但是我们需要增加一个主题名供设备上报升级进度,这里约定, 设备将使用主题: update\_ota\_status/:ProductName/:DeviceName/:messageID
来上报升级进度。
这样的话,新的主题就可以和我们之前使用的状态上报的主题:update_status/:ProductName/:DeviceName/:MessageID
统一为:(update\_ota\_status|update_status)/:ProductName/:DeviceName/:MessageID
添加 ACL 列表
将新的主题名加入 Device 的 ACL 列表:
//IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
const publish = [
...
`update_ota_status/${this.product_name}/${this.device_name}/+`,
]
...
}
你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表中。
处理设备上报的升级进度
这里我们选择把设备的升级进度保存到 Redis 里面, 对 OTA 的操作我们做一个封装:
//IotHub_Server/services/ota_service.js
const redisClient = require("../models/redis")
class OTAService{
static updateProgress(productName, deviceName, progress){
redisClient.set(`ota_progress/${productName}/${deviceName}`, JSON.stringify(progress))
}
}
module.exports = OTAService
然后在收到设备的升级进度数据时调用上面的方法:
//IotHub_Server/services/message_service.js
static dispatchMessage({topic, payload, ts} = {}) {
...
var statusTopicRule = "(update_status|update_ota_status)/:productName/:deviceName/:messageId"
...
const statusRegx = pathToRegexp(statusTopicRule)
...
var result = null;
...
else if ((result = statusRegx.exec(topic)) != null) {
this.checkMessageDuplication(result[4], function (isDup) {
if (!isDup) {
if (result[1] == "update_status") {
MessageService.handleUpdateStatus({
productName: result[2],
deviceName: result[3],
deviceStatus: payload.toString(),
ts: ts
})
} else if (result[1] == "update_ota_status") {
var progress = JSON.parse(payload.toString())
progress.ts = ts
OTAService.updateProgress(result[2], result[3], progress)
}
}
})
}
发送OTA指令
这里我们使用$ota_upgrade
作为OTA升级的指令名, 同时支持单一设备下发和批量下发:
//IotHub_Server/services/ota_service.js
const Device = require("../models/device")
static sendOTA({productName, deviceName = null, tag = null, fileUrl, version, size, md5, type}) {
var data = JSON.stringify({
url: fileUrl,
version: version,
size: size,
md5: md5,
type: type
})
if (deviceName != null) {
Device.sendCommand({
productName: productName,
deviceName: deviceName,
commandName: "ota_upgrade",
data: data
})
}else if(tag != null){
Device.sendCommandByTag({
productName: productName,
tag: tag,
commandName: "ota_upgrade",
data: data
})
}
}
//IotHub_Server/models/device.js
deviceSchema.statics.sendCommandByTag = function({productName, tag, commandName, data, encoding = "plain", ttl = undefined,qos = 1}){
var requestId = new ObjectId().toHexString()
var topic = `tags/${productName}/${tag}/cmd/${commandName}/${encoding}/${requestId}`
if (ttl != null) {
topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
}
emqxService.publishTo({topic: topic, payload: data, qos: qos})
}
Server API
IotHub Server API 提供两个接口供业务调用 OTA 功能:
下发 OTA 指令接口
//IotHub_Server/routes/ota.js
var express = require('express');
var router = express.Router();
var Device = require("../models/device")
var OTAService = require("../services/ota_service")
router.post("/:productName/:deviceName", function (req, res) {
var productName = req.params.productName
var deviceName = req.params.deviceName
Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
if(err){
res.send(err)
}else if(device != null){
OTAService.sendOTA({
productName: device.product_name,
deviceName: device.device_name,
fileUrl: req.body.url,
size: parseInt(req.body.size),
md5: req.body.md5,
version: req.body.version,
type: req.body.type
})
res.status(200).send("ok")
}else{
res.status(400).send("device not found")
}
})
})
module.exports = router
//IotHub_Server/app.js
var otaRouter = require('./routes/ota')
app.use('/ota', otaRouter)
这个接口是向单一设备发送 OTA 接口,通过标签批量下发的接口实现也很简单,本课程在这里就跳过了。
获取 OTA 升级进度接口
最后是获取设备 OTA 升级进度的接口:
//IotHub_Server/services/ota_service.js
static getProgress(productName, deviceName, callback) {
redisClient.get(`ota_progress/${productName}/${deviceName}`, function (err, value) {
if (value != null) {
callback(JSON.parse(value))
} else {
callback({})
}
})
}
//IotHub_Server/routes/ota.js
router.get("/:productName/:deviceName", function (req, res) {
var productName = req.params.productName
var deviceName = req.params.deviceName
OTAService.getProgress(productName, deviceName, function (progress) {
res.status(200).json(progress)
})
})
IotHub OTA 升级功能在服务端实现就完成了。
本课程,我们实现了 OTA 升级的服务端功能,下一节,我们来实现OTA升级的设备端功能,并对OTA功能进行测试。