在这一节,我们讨论一下设备的权限管理。

什么是设备权限管理

设备权限管理是指对一个设备的 Publish 和 Subscribe 权限进行控制,设备只能 Publish 到它有 Publish 权限的主题上,同时它也只能 Subscribe 有 Subscribe 权限的主题。

为什么要控制 Publish 和 Subscribe

场景1 ClientA 订阅主题 command/ClientA 来接收服务器端的指令,这时 ClientB 接入,也同时订阅command/usernameA,那么在服务器向 ClientA 下发指令时,ClientB 也能收到,同时 ClientB 也可以将收到的指令再 Publish 到 command/ClientA, ClientA 无法肯定指令是否来自于正确的对象。
场景2 ClientA 向主题 data/ClientA 发布数据,这时 ClientB 接入,它也向 data/ClientA 发布数据,那么data/ClientA 的订阅者无法肯定数据来源是否为 ClientA。
在这两个场景下都存在安全和数据准确性的问题,但是如果我们能控制 Client 的权限,让 ClientA 才能订阅 command/ClientA,同时也只有 ClientA 才能发布到 /data/ClientA,那么刚才的问题就都不存在了。
这就是我们需要对设备的 Publish 权限和 Subscribe 权限进行控制的原因。

EMQ X 的 ACL(权限管理)功能

EMQ X 的 ACL 功能也是由插件来实现的,本课程里使用的 MongoDB 认证插件就包含 ACL 功能,在设备注册的章节里我们暂时关闭了这个功能,现在只需要打开就可以了。
MongoDB 插件的 ACL 功能很简单,Client 在 Publish 和 Subscribe 的时候会查询一个 Collection,找到该 Client 对应的 ACL 记录,这条记录应该包含 3 个字段:
publish: [“topic1”, “topic2”, …], subscribe: [“subtop1”, “subtop2”, …], pubsub: [“topic/#”, “topic1”, …]
publish 是指该 Client 可以 Publish 的主题列表,subscribe 是指该 Client可以 Subscribe 的主题列表,pubsub 是指 Client 可以同时 Subscribe 和 Publish 的主题列表,列表里的主题名可以使用通配符”+”和”#”,接下来我们写点代码来验证一下。

MongoDB 认证插件的 ACL 功能

编辑<EMQ X 安装目录>/etc/plugins/emqx_auth_mongo.conf:

  1. auth.mongo.acl_query = on,打开 ACL 功能;
  2. auth.mongo.acl_query.collection = mqtt_acl 在 mqtt_acl collection 中存放 ACL 信息;
  3. auth.mongo.acl_query.selector = username=%u 指明查询 ACL 记录的条件,%u 代表 Client 接入时使用的用户名。

然后重新加载插件:

  1. <EMQ X 安装目录>/bin/emqx_ctl plugins reload emqx_auth_mongo

接着在 MongDB 里创建 mqtt_acl collection,但是暂时不添加任何记录:

  1. ## MongoDB Shell
  2. db.createCollection("mqtt_acl")

写一点代码来验证一下对 Publish 的权限验证,首先实现一个Subscribe 端,使用 JWT 接入:

// IotHub_Device/samples/test_mqtt_sub.js
var jwt = require('jsonwebtoken')
var mqtt = require('mqtt')
require('dotenv').config()
var username = "username"
var password = jwt.sign({
    username: username,
    exp: Math.floor(Date.now() / 1000) + 10
}, process.env.JWT_SECRET)
var client = mqtt.connect('mqtt://127.0.0.1:1883', {
    username: username,
    password: password
})
client.on('connect', function (connack) {
    console.log(`return code: ${connack.returnCode}`)
    client.subscribe("/topic1")
})
client.on("message", function (_, message) {
    console.log(message.toString())
})
复制

接着实现一个 Publish 端,用已注册的设备的信息接入:

// IotHub_Device/samples/test_mqtt_pub.js
var mqtt = require('mqtt')
require('dotenv').config()
var client = mqtt.connect('mqtt://127.0.0.1:1883', {
    username: ${process.env.PRODUCT_NAME}/${process.env.DEVICE_NAME},
    password: process.env.SECRET
})
client.on('connect', function (connack) {
    console.log(`return code: ${connack.returnCode}`)
    client.publish("/topic1", "test", console.log)
})
  • 先运行test_mqtt_sub.js,然后再运行test_mqtt_pub.js,我们可以看到test_mqtt_sub.js输出test,说明 EMQ X 在查询不到对应的 ACL 记录时,对 Client的 Publish 和 Subscribe 权限是不限制的。可以在< EMQ X 安装目录>/etc/emqx.conf中对这一行为进行修改:

    ## Allow or deny if no ACL rules matched.
    ##
    ## Value: allow | deny
    acl_nomatch = allow
    
  • 在 mqtt_acl 里面插入一条记录:

    "username" : "xcYhpk6sB@IotApp",
      "publish" : [],
      "subscribe" : [],
      "pubsub" : []
    

    然后再重新运行test_mqtt_pub.js,这时test_mqtt_sub.js不会再输出test,说明 EMQ X 已经对 Client 的 Publish 权限进行了限制。
    这里的 publish 操作并没有报错,从安全性的角度来说忽略非法的操作比返回错误信息要好,也可以在< EMQ X 安装目录>/etc/emqx.conf中对这一行为进行修改:

    ## The action when acl check reject current operation
    ##
    ## Value: ignore | disconnect
    ## Default: ignore
    acl_deny_action = ignore
    
  • 修改这条记录,将 publish 字段改为: ["/topic1"],然后再重新运行test_mqtt_pub.js,这时test_mqtt_sub.js又会输出test,说明 EMQ X 已经按照 ACL 记录 Client 的 Publish 进行了限制。

Subscribe 权限验证就不做实验了,背后的逻辑是一样的。
如果启用了 ACL,那么 EMQ X 在 Publish 和 Subscribe 时候都会去查询 MongoDB 来进行验证,这会带来额外的开销,是需要去平衡和选择的,因为安全性和效率是冲突的。在一个完全可信的环境里,也可以不打开 ACL 来提升效率,这一切都取决于你的使用场景,在本课程中会保持 ACL 开启。
上面说的 ACL 会导致 EMQ X 在 Publish 和 Subscribe 时去查询 MongoDB 也不是完全正确,因为默认情况下 EMQ X 会缓存查询的结果,在< EMQ X 安装目录>/etc/emqx.conf可以找到这个 cache 相关的配置:

## Whether to enable ACL cache.
##
## If enabled, ACLs roles for each client will be cached in the memory
##
## Value: on | off
enable_acl_cache = on
## The maximum count of ACL entries can be cached for a client.
##
## Value: Integer greater than 0
## Default: 32
acl_cache_max_size = 32
## The time after which an ACL cache entry will be deleted
##
## Value: Duration
## Default: 1 minute
acl_cache_ttl = 1m

集成 EMQ X ACL

接下来我们把 EMQ X 的 ACL 功能集成到我们现有的体系当中:

  • 事先定义好设备可以订阅和发布的主题范围;
  • 在注册设备时,生成设备的 ACL 记录;
  • 在删除设备时,删除相应的ACL记录。

这里使用 device_acl collection 来存储 ACL 记录:

// IotHub_Server/models/device_acl.js
var mongoose = require('mongoose');
var Schema = mongoose.Schema;
const deviceACLSchema = new Schema({
    broker_username: String,
    publish: Array,
    subscribe: Array,
    pubsub: Array,
}, { collection: 'device_acl' })
const DeviceACL = mongoose.model("DeviceACL", deviceACLSchema);
module.exports = DeviceACL

我们在后面讲到处理上行数据和下发指令的课程时,再来规划设备可以订阅和发布的主题,所以目前暂时将设备的可订阅和可发布主题列表设为空:

// IotHub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
    const publish = []
    const subscribe = []
    const pubsub = []
    return {
        publish: publish,
        subscribe: subscribe,
        pubsub: pubsub
    }
}

设备注册的时候生成 ACL 记录:

// IotHub_Server/routes/devices.js
router.post("/", function (req, res) {
    ...
    device.save(function (err) {
        if (err) {
            res.status(500).send(err)
        } else {
            var aclRule = device.getACLRule()
            var deviceACL = new DeviceACL({
                broker_username: device.broker_username,
                publish: aclRule.publish,
                subscribe: aclRule.subscribe,
                pubsub: aclRule.pubsub
            })
            deviceACL.save(function () {
                res.json({product_name: productName, device_name: deviceName, secret: secret})
            })
        }
    })
})
复制

删除设备的时候也移除相应的 ACL 记录:

// IotHub_Server/models/device.js
deviceSchema.post("remove", function (device, next) {
    Connection.deleteMany({device: device._id}).exec()
    DeviceACL.deleteMany({broker_username: device.broker_username}).exec()
    next()
})
复制

我们也需要给 device_acl 加一个索引以提高查询速度:

## MongoDB Shell
use iothub
db.devices.createIndex({
    "broker_username" : 1,
})
复制

这样 Maque IotHub 的设备权限管理就完成了。

到这一节结束,我们就完成了 Maque IotHub 的设备从注册到删除的全生命周期管理,下一节,我们会讨论一个和设备管理无关的内容: Maque IotHub的扩展性,然后结束本课程的第一部分。