这一节我们开始实现 IotHub 指令下发的 IotHub Server 端的实现。这一节里面我们会实现使用 EMQ X 的 API 来发布消息,并实现指令下发接口供业务系统调用,最后使用 EMQ X 的服务器订阅功能,来实现设备的自动订阅。

添加 ACL 列表

设备端需要在回复指令的时候 Publish 到:
cmd_resp/:ProductName/:DeviceName/:CommandName/:RequestID/:MessageID。所以我们需要先把这个主题加到设备的 ACL 列表里:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.methods.getACLRule = function () {
  3. const publish = [
  4. `upload_data/${this.product_name}/${this.device_name}/+/+`,
  5. `update_status/${this.product_name}/${this.device_name}/+`,
  6. `cmd_resp/${this.product_name}/${this.device_name}/+/+/+`
  7. ]
  8. ...
  9. }

你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。

EMQ X Publish 功能

这里我们调用 EMQ X Publish API 接口,来实现消息发布的功能,当 IotHub Server 需要向某个设备下发一个指令的时候会用到。
和前面用的 EMQ X RestAPI 一样,我们将调用 EMQ X Publish API 的代码封装到 Service 类里面:

  1. //IotHub_Server/service/emqx_service
  2. var shortid = require("shortid")
  3. static publishTo({topic, payload, qos = 1, retained = false}) {
  4. const apiUrl = `${process.env.EMQX_API_URL}/mqtt/publish`
  5. request.post(apiUrl, {
  6. "auth": {
  7. 'user': process.env.EMQX_APP_ID,
  8. 'pass': process.env.EMQX_APP_SECRET,
  9. 'sendImmediately': true
  10. },
  11. json:{
  12. topic: topic,
  13. payload: payload,
  14. qos: qos,
  15. retained: retained,
  16. client_id: shortid.generate()
  17. }
  18. }, function (error, response, body) {
  19. console.log('statusCode:', response && response.statusCode);
  20. console.log('body:', body);
  21. })
  22. }

因为 IotHub 支持离线指令下发,所以 Publish 的 QoS=1,同时 EMQ X 的 Publish API 还需要提供一个 ClientID,这里我们随机生成一个就好了。

IotHub Server API

IotHub Server API 需要提供一个接口供业务系统向设备下发指令,业务系统可以通过调用这个接口向指定的设备下发一条指令,业务系统可以指定指令的名称和指令附带的数据。
首先我们在 Device 类里面做一个封装,实现一个下发指令的方法:

  1. //IotHub_Server/models/device.js
  2. const ObjectId = require('bson').ObjectID;
  3. deviceSchema.methods.sendCommand = function ({commandName, data, encoding, ttl = undefined}) {
  4. var requestId = new ObjectId().toHexString()
  5. var topic = `cmd/${this.product_name}/${this.device_name}/${commandName}/${encoding}/${requestId}`
  6. if (ttl != null) {
  7. topic = `${topic}/${Math.floor(Date.now() / 1000) + ttl}`
  8. }
  9. emqxService.publishTo({topic: topic, payload: data})
  10. return requestId
  11. }

注意这里如果指定了 TTL(有效期,单位为秒)的话,是用当前时间的时间戳加上 TTL 作为指令的过期时间。
这里仍然使用 BSON 的 ObjectID 作为指令的 RequestID。 同时需要将这个 RequestID 返回给调用者。
业务系统在请求 IotHub 给指定设备下发指令的时候,需要提供设备的 ProductName、DeviceName、指令的名称、指令数据以及指令的 TTL。如果说指令数据为二进制的数据,那么业务系统需要在请求前将指令数据使用 Base64 进行编码,并在请求时指明编码格式(Encoding)为 Base64:

  1. router.post("/:productName/:deviceName/command", function (req, res) {
  2. var productName = req.params.productName
  3. var deviceName = req.params.deviceName
  4. Device.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {
  5. if (err) {
  6. res.send(err)
  7. } else if (device != null) {
  8. var requestId = device.sendCommand({
  9. commandName: req.body.command,
  10. data: req.body.data,
  11. encoding: req.body.encoding || "plain",
  12. ttl: req.body.ttl != null ? parseInt(req.body.ttl) : null
  13. })
  14. res.status(200).json({request_id: requestId})
  15. }else{
  16. res.status(404).send("device not found")
  17. }
  18. })
  19. })

指令发送成功以后, IotHub 会把指令的 RequestID 返回给业务系统,业务系统应该保存这个 RequestID,以便在收到设备对指令的回复时候进行匹配。
例如,用户可以远程地让家里的路由器下载一个文件,并希望下载完成后在手机上能收到通知,那么业务系统在调用 IotHub Server API 下发指令到路由器后应该保存 RequestID 和用户 ID,路由器下载完之后便回复该指令,业务系统收到后用回复里的 RequestID 去匹配它保存的 RequestID 和用户 ID,如果匹配成功,则使用对应的用户 ID 去通知用户。

因为我们需要用 command 参数来拼接主题名,所以 command 参数不应该包含有# / +以及 IotHub 预留的一些字符,这里为了演示起见,跳过了输入参数的校验,但是在实际项目中,是需要加上的。

服务器订阅

在 DeviceSDK 里面没有任何订阅接收指令主题的代码,我们用 EMQ X 的服务器订阅来实现设备的自动订阅。
EMQ X 的服务器订阅是在<EMQ X 安装目录>/etc/emqx.conf里进行配置的,下面我们把这些配置项讲解一下。
首先,打开 EMQ X 的服务器订阅功能:

  1. module.subscription = on

然后配置需要自动订阅的主题名,以及 QoS:

module.subscription.1.topic = topics
module.subscription.1.qos = 1

module.subscription.1.topic这个配置项支持两个占位符,%u 代表 Client 接入时使用的 username,%c 代表 Client 接入时使用的 Client Identifier。在 MaqueIotHub 中,设备接入 EMQ X Broker 时使用的用户名为 ProductName/DeviceName,那么这里我们就可以这样配置自动订阅的主题名:

module.subscription.1.topic = cmd/%u/+/+/+/#

目前 EMQ X 只支持这种方式定义服务器订阅列表,如果你需要更灵活的配置方式,需要用插件的方式来扩展或者让设备进行自行订阅。 这就是为什么使用 ProductName/DeviceName 作为设备接入 Broker 的 username 的原因了,这算一个小小的取巧。

如果你需要配置更多的订阅主题,可以这样做:

module.subscription.1.topic = xxx
module.subscription.1.qos = xx
module.subscription.2.topic = xxx
module.subscription.2.qos = xx
module.subscription.3.topic = xxx
module.subscription.3.qos = xx
...

配置完成以后需要重新启动 EMQ X:<EMQ X 安装目录>/bin/emqx restart
我们可以使用 EMQ X 的 Management Web Console 来验证服务器自动订阅是否生效了。

EMQ X 默认情况下会在 http://host:18083 启动一个 Web 服务器,可以通过 Web UI 来查看和管理 EMQ X 的状态,登入这个 Web Console 的默认用户名和密码是 admin/public。

运行IotHub_Device/samples/connect_to_server.js,然后访问http://<host>:18083/#/subscriptions,你会发现服务器订阅已经生效了:
21.服务端实现(一) - 图1
因为这个主题是服务器自动订阅的,并不是由一个真实的 MQTT Client 去发起 Subscribe,所以不会触发 ACL 校验,我们无需将这个主题存入设备的ACL列表中。

这一节我们完成了下行数据处理 IotHub Server 端的大部分功能,下一节我们将介绍通知业务系统的部分,并把设备端和服务端的代码连在一起跑一下。