这一节我们开始实现 OTA 升级的服务端功能。在服务端我们会规划OTA相关的主题名,并存储设备上报的升级进度;服务端还需要提供接口,供业务系统下发OTA升级指令和查询升级进度。

主题规划

升级指令的下发使用已有的指令下发功能就好,但是我们需要增加一个主题名供设备上报升级进度,这里约定, 设备将使用主题: update\_ota\_status/:ProductName/:DeviceName/:messageID来上报升级进度。
这样的话,新的主题就可以和我们之前使用的状态上报的主题:
update_status/:ProductName/:DeviceName/:MessageID
统一为:(update\_ota\_status|update_status)/:ProductName/:DeviceName/:MessageID

添加 ACL 列表

将新的主题名加入 Device 的 ACL 列表:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.getACLRule = function () {
  3. const publish = [
  4. ...
  5. `update_ota_status/${this.product_name}/${this.device_name}/+`,
  6. ]
  7. ...
  8. }

你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表中。

处理设备上报的升级进度

这里我们选择把设备的升级进度保存到 Redis 里面, 对 OTA 的操作我们做一个封装:

//IotHub_Server/services/ota_service.js
const redisClient = require("../models/redis")
class OTAService{
    static updateProgress(productName, deviceName, progress){
        redisClient.set(`ota_progress/${productName}/${deviceName}`, JSON.stringify(progress))
    }
}
module.exports = OTAService

然后在收到设备的升级进度数据时调用上面的方法:

//IotHub_Server/services/message_service.js
static dispatchMessage({topic, payload, ts} = {}) {
        ...
        var statusTopicRule = "(update_status|update_ota_status)/:productName/:deviceName/:messageId"
        ...
        const statusRegx = pathToRegexp(statusTopicRule)
       ...
        var result = null;
        ...
        else if ((result = statusRegx.exec(topic)) != null) {
            this.checkMessageDuplication(result[4], function (isDup) {
                if (!isDup) {
                    if (result[1] == "update_status") {
                        MessageService.handleUpdateStatus({
                            productName: result[2],
                            deviceName: result[3],
                            deviceStatus: payload.toString(),
                            ts: ts
                        })
                    } else if (result[1] == "update_ota_status") {
                        var progress = JSON.parse(payload.toString())
                        progress.ts = ts
                        OTAService.updateProgress(result[2], result[3], progress)
                    }
                }
            })
        }

发送OTA指令

这里我们使用$ota_upgrade作为OTA升级的指令名, 同时支持单一设备下发和批量下发:

//IotHub_Server/services/ota_service.js
const Device = require("../models/device")
static sendOTA({productName, deviceName = null, tag = null, fileUrl, version, size, md5, type}) {
        var data = JSON.stringify({
            url: fileUrl,
            version: version,
            size: size,
            md5: md5,
            type: type
        })
        if (deviceName != null) {
            Device.sendCommand({
                productName: productName,
                deviceName: deviceName,
                commandName: "ota_upgrade",
                data: data
            })
        }else if(tag != null){
            Device.sendCommandByTag({
                productName: productName,
                tag: tag,
                commandName: "ota_upgrade",
                data: data 
            })
        }
    }
//IotHub_Server/models/device.js
deviceSchema.statics.sendCommandByTag = function({productName, tag, commandName, data, encoding = "plain", ttl = undefined,qos = 1}){
    var requestId = new ObjectId().toHexString()
    var topic = `tags/${productName}/${tag}/cmd/${commandName}/${encoding}/${requestId}`
    if (ttl != null) {
        topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
    }
    emqxService.publishTo({topic: topic, payload: data, qos: qos})
}

Server API

IotHub Server API 提供两个接口供业务调用 OTA 功能:

下发 OTA 指令接口

//IotHub_Server/routes/ota.js
var express = require('express');
var router = express.Router();
var Device = require("../models/device")
var OTAService = require("../services/ota_service")
router.post("/:productName/:deviceName", function (req, res) {
    var productName = req.params.productName
    var deviceName = req.params.deviceName
    Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
        if(err){
            res.send(err)
        }else if(device != null){
            OTAService.sendOTA({
                productName: device.product_name,
                deviceName: device.device_name,
                fileUrl: req.body.url,
                size: parseInt(req.body.size),
                md5: req.body.md5,
                version: req.body.version,
                type: req.body.type
            })
            res.status(200).send("ok")
        }else{
            res.status(400).send("device not found")
        }
    })
})
module.exports = router
//IotHub_Server/app.js
var otaRouter = require('./routes/ota')
app.use('/ota', otaRouter)

这个接口是向单一设备发送 OTA 接口,通过标签批量下发的接口实现也很简单,本课程在这里就跳过了。

获取 OTA 升级进度接口

最后是获取设备 OTA 升级进度的接口:

//IotHub_Server/services/ota_service.js
static getProgress(productName, deviceName, callback) {
        redisClient.get(`ota_progress/${productName}/${deviceName}`, function (err, value) {
            if (value != null) {
                callback(JSON.parse(value))
            } else {
                callback({})
            }
        })
    }
//IotHub_Server/routes/ota.js
router.get("/:productName/:deviceName", function (req, res) {
    var productName = req.params.productName
    var deviceName = req.params.deviceName
    OTAService.getProgress(productName, deviceName, function (progress) {
        res.status(200).json(progress)
    })
})

IotHub OTA 升级功能在服务端实现就完成了。

本课程,我们实现了 OTA 升级的服务端功能,下一节,我们来实现OTA升级的设备端功能,并对OTA功能进行测试。