在这一节中,我们来设计和实现设备生命周期管理中的禁用和删除功能。

设备禁用

设备禁用的逻辑很简单,业务系统可以通过一个接口暂停设备的接入认证,被禁用的设备无法接入 IotHub;业务系统也可以通过一个接口恢复设备的接入认证,使设备可以重新接入 IotHub,这里还暗含着另外一个操作,在禁用设备的时候,如果这个设备已经接入 IotHub,且是在线状态,那么需要将这个设备踢下线。接下来我们来一步一步实现这些功能。

Server API

首先在 Device 模型里加一个字段,来标识设备当前是否可以接入 IotHub:

  1. // IotHub_Server/models/Device.js
  2. const deviceSchema = new Schema({
  3. //ProductName
  4. product_name: {
  5. type: String,
  6. required: true
  7. },
  8. //DeviceName
  9. device_name: {
  10. type: String,
  11. required: true,
  12. },
  13. //接入EMQX时使用的username
  14. broker_username: {
  15. type: String,
  16. required: true
  17. },
  18. //secret
  19. secret: {
  20. type: String,
  21. required: true,
  22. },
  23. //可接入状态
  24. status: String
  25. })

创建设备时默认状态为 active:

  1. //IotHub_Server/routes/device.js
  2. router.post("/", function (req, res) {
  3. ...
  4. var device = new Device({
  5. product_name: productName,
  6. device_name: deviceName,
  7. secret: secret,
  8. broker_username: brokerUsername,
  9. status: "active"
  10. })
  11. ...
  12. })

然后添加两个 Server API 来设置设备的接入状态,禁用/恢复:

  1. // IotHub_Server/routes/device.js
  2. router.put("/:productName/:deviceName/suspend", function (req, res) {
  3. var productName = req.params.productName
  4. var deviceName = req.params.deviceName
  5. Device.findOneAndUpdate({"product_name": productName, "device_name": deviceName},
  6. {status: "suspended"}, {useFindAndModify: false}).exec(function (err) {
  7. if (err) {
  8. res.send(err)
  9. } else {
  10. res.status(200).send("ok")
  11. }
  12. })
  13. })
  14. router.put("/:productName/:deviceName/resume", function (req, res) {
  15. var productName = req.params.productName
  16. var deviceName = req.params.deviceName
  17. Device.findOneAndUpdate({"product_name": productName, "device_name": deviceName},
  18. {status: "active"}, {useFindAndModify: false}).exec(function (err) {
  19. if (err) {
  20. res.send(err)
  21. } else {
  22. res.status(200).send("ok")
  23. }
  24. })
  25. })

配置 MongoDB 认证插件

我们需要配置一下 MongoDB 认证,使它在认证接入的设备时会判断 Device 的 active 字段:

  1. # <EMQ X 安装目录>/etc/plugins/emqx_auth_mongo.conf
  2. auth.mongo.auth_query.selector = broker_username=%u, status=active

重新加载 MongoDB 认证插件:<EMQ X 安装目录>/bin/emqx_ctl plugins reload emqx_auth_mongo
这个时候我们可以来试一下,首先暂停设备的接入认证: curl http://localhost:3000/devices/IotApp/c-jOc-2qq/suspend -X PUT
然后运行 IotHub_Device/samples/connect_to_server.js,控制台的输出如下:

  1. Error: Connection refused: Not authorized
  2. ...

然后恢复这个设备的接入认证: curl http://localhost:3000/devices/IotApp/c-jOc-2qq/resume -X PUT,控制台的输出如下:

  1. device is online

主动断开设备连接

EMQ X 监控管理 API

当禁用设备的时候,如果设备已经连入了 IotHub,我们应该主动断开和这个设备的连接,MQTT 协议并没有在协议级别约定 Broker 如何来管理 Client 的连接,EMQ X Broker 提供了一套 Restful 的 API 来监控和管理 Broker,可以在这里看到这些 API 的文档,我们通过调用 EMQ X 提供的 API 来主动和设备断开连接。
EMQ X 使用 HTTP Basic 的认证方式对 API 的调用者进行认证,在调用这些 API 之前,需要创建一个账号,在EMQ X中称为创建一个 Application,创建 Application 的命令格式为:mgmt insert ,这里我们用 iothub 作为 AppId,Name 设为 MaqueIotHub:

  1. <EMQ X 安装目录>/bin/emqx_ctl mgmt insert iothub MaqueIotHub

控制台输出为:

AppSecret: Mjg3NDc3MDc1MjgxNTM3OTg2MTYwNjYwMjMyOTU2ODA1MTC

我们需要记录下这个 AppSecret。
EMQ X RestAPI 是通过插件的方式实现的,默认的访问地址是:http(s)://<host>:8080/api/v3/,可以在 etc/plugins/emqx_management.conf 进行配置,这个插件是默认加载的。
然后把这些信息都放入 .env 文件中:

# IotHub_Server/.env
EMQX_APP_ID=iothub
EMQX_APP_SECRET=Mjg3NDc3MDc1MjgxNTM3OTg2MTYwNjYwMjMyOTU2ODA1MTC
EMQX_API_URL=http://127.0.0.1:8080/api/v3/

调用 EMQ X API

EMQ X 断开 Client 连接的 API:
DELETE api/v3/connections/${clientid}
需要提供 ClientID 作为参数,在设备连接状态管理里面已经保存了设备的 ClientID,所以只用按需调用这个接口就可以了。
首先把对 EMQ X API 的调用进行一个封装:

// IotHub_Server/services/emqx_service.js
"use strict";
const request = require('request');

class EMQXService {
    static disconnectClient(clientId) {
        const apiUrl = `${process.env.EMQX_API_URL}/connections/${clientId}`
        request.delete(apiUrl, {
            "auth": {
                'user': process.env.EMQX_APP_ID,
                'pass': process.env.EMQX_APP_SECRET,
                'sendImmediately': true
            }
        }, function (error, response, body) {
            console.log('statusCode:', response && response.statusCode);
            console.log('body:', body);
        })
    }
}

module.exports = EMQXService

在 Device 模型里实现断开 Device 所有连接的功能:

// IotHub_Server/models/device.js

deviceSchema.methods.disconnect = function(){
    Connection.find({device: this._id}).exec(function (err, connections) {
        connections.forEach(function (conn) {
            emqxService.disconnectClient(conn.client_id)
        })
    })
}

然后在 suspend 接口里调用 disconnect 方法:

// IotHub_Server/routes/devices.js
...
router.put("/:productName/:deviceName/suspend", function (req, res) {
    var productName = req.params.productName
    var deviceName = req.params.deviceName
    Device.findOneAndUpdate({"product_name": productName, "device_name": deviceName},
        {status: "suspended"}, {useFindAndModify: false}).exec(function (err, device) {
        if (err) {
            res.send(err)
        } else {
            if (device != null) {
                device.disconnect()
            }
            res.status(200).send("ok")
        }
    })
})

这时我们再运行curl http://localhost:3000/devices/IotApp/c-jOc-2qq/suspend -X PUT,在运行connect_to_server.js的终端上会有以下输出:

device is online
device is offline
Error: Connection refused: Not authorized
...
  1. 设备连接上 IotHub 之后,输出device is online
  2. 被禁用以后,连接被 IotHub 断开,输出device is offline
  3. DeviceSDK 的自动重连功能触发,因为设备已被禁用,所以在连接时输出:Error: Connection refused: Not authorized

    删除设备

    完成了禁用设备的功能以后,删除设备对我们来说就非常简单了,基本上依葫芦画瓢就可以了。

  4. 将设备的所有连接断开;

  5. 删除设备;
  6. 删除设备的连接信息。
// IotHub_Server/routes/device.js

router.delete("/:productName/:deviceName", function (req, res) {
    var productName = req.params.productName
    var deviceName = req.params.deviceName
    Device.findOne({"product_name": productName, "device_name": deviceName}).exec(function (err, device) {
        if (err) {
            res.send(err)
        } else {
            if (device != null) {
                device.disconnect()
                device.remove()
                res.status(200).send("ok")
            } else {
                res.status(404).json({error: "Not Found"})
            }
        }
    })
})

同时,在删除设备时,删除所有的连接信息:

// IotHub_Server/models/device.js

deviceSchema.post("remove", function (device, next) {
    Connection.deleteMany({device: device._id}).exec()
    next()
})

可以运行curl http://localhost:3000/devices/IotApp/c-jOc-2qq -X DELETE 来看下效果。

这一节我们完成了设备的禁用和删除,下一节,我们来讨论一下设备的权限管理。