这一节我们来设计和实现 IotHub 设备间通信功能。

设备间通信

到目前为止,我们在 MQTT 协议上抽象出了服务端和设备端,数据的流向是从服务端(业务系统,IotHub)到设备端,或者从设备到服务端。在某些场景下,接入 IotHub 的设备可能还需要和其他接入的设备进行通信,例如管理终端通过 P2P 的方式查看监控终端的实时视频,在建立 P2P 的连接之前,需要管理终端和监控终端进行通信,交换一些建立会话的数据。
两个不同的设备 DeviceA、DeviceB 作为 MQTT Client 接入 EMQ X Broker,他们直接进行通信的流程很简单,DeviceA 订阅主题 TopicA,DeviceB 订阅主题 TopicB,如果 DeviceA 想向 DeviceB 发送信息,只需要向 TopicB 发布消息就可以了,反之亦然。
不过 IotHub 和 DeviceSDK 需要对这个过程进行抽象和封装,DeviceSDK 需要对设备应用代码屏蔽掉 MQTT 协议层的细节,做到以下功能:

  • 设备间以 DeviceName 作为标识发送消息;
  • 当 DeviceA 收到 DeviceB 的消息时,它知道这个消息是来自 Device B 的,可以通过 Device B 的DeviceName对 Device B进行回复。

在 IotHub Server 端,需要控制设备间通信的范围,这里我们约定只有同一个 ProductName 下的设备可以相互通信。

主题名规划

为了接收其他设备发来的消息,设备会订阅主题:
m2m/:ProductName/:DeviceName/:SenderDeviceName/:MessageID
其中:

  • ProductName、DeviceName:和之前的使用方式一样,唯一标识一个设备(消息的接收方);
  • SenderDeviceName:消息发送方的的设备名,表明消息的来源方,接收方在需要回复消息发送时使用;
  • MessageID:消息的唯一 ID,以便对消息进行去重。

也就是说,在设备间通信这个场景下,设备需要同时发布和订阅主题:
m2m/:ProductName/:DeviceName/:SenderDeviceName/:MessageID

至于设备如何获取可发送消息的消息接收方 DeviceName,就取决于设备和业务系统的业务逻辑了,业务系统可以通过指令下发、设备主动数据请求的方式等将这些信息告知设备。

接下来我们开始实现这个功能。

添加 ACL 列表

在 IotHub Server 端将这个主题加入设备的 ACL 列表:

  1. //Iothub_Server/models/device.js
  2. deviceSchema.methods.getACLRule = function () {
  3. const publish = [
  4. ...
  5. `m2m/${this.product_name}/+/${this.device_name}/+`
  6. ]
  7. ...
  8. }

你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。

服务端设备订阅

接下来配置服务端订阅,自动订阅这个主题:

  1. ## < E MQX 安装目录>/etc/emqx.config
  2. module.subscription.3.topic = m2m/%u/+/+
  3. module.subscription.3.qos = 1

然后运行< EMQ X 安装目录>/bin/emqx restart

设备端实现

向设备发送消息

DeviceSDK 提供一个方法,可以向指定的 DeviceName 发送消息:

//IotHub_Device/sdk/iot_device.js
sendToDevice(deviceName, payload){
        if (this.client != null) {
            var topic = `m2m/${this.productName}/${deviceName}/${this.deviceName}/${new ObjectId().toHexString()}`
            this.client.publish(topic, payload, {
                qos: 1
            })
        }
    }

主题名里的 :DeviceName 层级使用消息接收方的 DeviceName; :SenderDeviceName 层级使用发送方,即设备自己的 DeviceName;:ProductName 层级使用发送方的 ProductName,保证了设备只能给属于同一ProductName的设备发送消息。

接受来自其他设备的消息

DeviceSDK 需要处理来自这个主题的消息,并用 event 的方式将消息传递给设备应用代码:

//IotHub_Device/sdk/iot_device.js
dispatchMessage(topic, payload) {
        ...
        var m2mTopicRule = "m2m/:productName/:deviceName/:senderDeviceName/:MessageID"
        var result
        var self = this
        ...
        else if ((result = pathToRegexp(m2mTopicRule).exec(topic)) != null) {
            this.checkRequestDuplication(result[4], function (isDup) {
                if (!isDup) {
                    self.emit("device_message", result[3], payload)
                }
            })
        }
         ...

DeviceSDK 会将发送方的 DeviceName 和 消息内容通过 “device_message” 事件传递给设备应用代码。

代码联调

接下来我们写一段代码来验证这个功能,我们会实现 2 个设备端,互相发送 ping/pong:

//IotHub_Device/samples/m2m_pinger.js
...
device.on("online", function () {
    console.log("device is online")
})
device.on("device_message", function (sender, payload) {
    console.log(`received ${payload.toString()} from: ${sender}`)
    setTimeout(function () {
        device.sendToDevice(sender, "ping")
    }, 1000)
})
device.connect()
device.sendToDevice(process.env.DEVICE_NAME2, "ping")
//IotHub_Device/samples/m2m_ponger.js
...
var device = new IotDevice({
    productName: process.env.PRODUCT_NAME,
    deviceName: process.env.DEVICE_NAME2,
    secret: process.env.SECRET2,
   ...
})
device.on("online", function () {
    console.log("device is online")
})
device.on("device_message", function (sender, payload) {
    console.log(`received ${payload.toString()} from: ${sender}`)
    setTimeout(function () {
        device.sendToDevice(sender, "pong")
    }, 1000)
})
device.connect()

这里需要同时运行两个不同的设备端,所以新增了环境变量 DEVICE_NAME2 和 SECRET2 来保存第二设备的 DeviceName 和 Secret,可以看到以下输出:

## m2m_pinger.js
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
...
## m2m_ponger.js
eceived ping from: M-lKbbY80
received ping from: M-lKbbY80
received ping from: M-lKbbY80
received ping from: M-lKbbY80
...

说明设备间的通信功能是正常工作的。

这一节我们设计和实现了 IotHub 的设备间通信功能,下一节我们来讨论一下设备的 OTA 升级。