设备间通信
到目前为止,我们在 MQTT 协议上抽象出了服务端和设备端,数据的流向是从服务端(业务系统,IotHub)到设备端,或者从设备到服务端。在某些场景下,接入 IotHub 的设备可能还需要和其他接入的设备进行通信,例如管理终端通过 P2P 的方式查看监控终端的实时视频,在建立 P2P 的连接之前,需要管理终端和监控终端进行通信,交换一些建立会话的数据。
两个不同的设备 DeviceA、DeviceB 作为 MQTT Client 接入 EMQ X Broker,他们直接进行通信的流程很简单,DeviceA 订阅主题 TopicA,DeviceB 订阅主题 TopicB,如果 DeviceA 想向 DeviceB 发送信息,只需要向 TopicB 发布消息就可以了,反之亦然。
不过 IotHub 和 DeviceSDK 需要对这个过程进行抽象和封装,DeviceSDK 需要对设备应用代码屏蔽掉 MQTT 协议层的细节,做到以下功能:
- 设备间以 DeviceName 作为标识发送消息;
- 当 DeviceA 收到 DeviceB 的消息时,它知道这个消息是来自 Device B 的,可以通过 Device B 的DeviceName对 Device B进行回复。
在 IotHub Server 端,需要控制设备间通信的范围,这里我们约定只有同一个 ProductName 下的设备可以相互通信。
主题名规划
为了接收其他设备发来的消息,设备会订阅主题:m2m/:ProductName/:DeviceName/:SenderDeviceName/:MessageID
。
其中:
- ProductName、DeviceName:和之前的使用方式一样,唯一标识一个设备(消息的接收方);
- SenderDeviceName:消息发送方的的设备名,表明消息的来源方,接收方在需要回复消息发送时使用;
- MessageID:消息的唯一 ID,以便对消息进行去重。
也就是说,在设备间通信这个场景下,设备需要同时发布和订阅主题:m2m/:ProductName/:DeviceName/:SenderDeviceName/:MessageID
。
至于设备如何获取可发送消息的消息接收方 DeviceName,就取决于设备和业务系统的业务逻辑了,业务系统可以通过指令下发、设备主动数据请求的方式等将这些信息告知设备。
添加 ACL 列表
在 IotHub Server 端将这个主题加入设备的 ACL 列表:
//Iothub_Server/models/device.js
deviceSchema.methods.getACLRule = function () {
const publish = [
...
`m2m/${this.product_name}/+/${this.device_name}/+`
]
...
}
你需要重新注册一个设备或者手动更新已注册设备存储在 MongoDB 的 ACL 列表。
服务端设备订阅
接下来配置服务端订阅,自动订阅这个主题:
## < E MQX 安装目录>/etc/emqx.config
module.subscription.3.topic = m2m/%u/+/+
module.subscription.3.qos = 1
然后运行< EMQ X 安装目录>/bin/emqx restart
设备端实现
向设备发送消息
DeviceSDK 提供一个方法,可以向指定的 DeviceName 发送消息:
//IotHub_Device/sdk/iot_device.js
sendToDevice(deviceName, payload){
if (this.client != null) {
var topic = `m2m/${this.productName}/${deviceName}/${this.deviceName}/${new ObjectId().toHexString()}`
this.client.publish(topic, payload, {
qos: 1
})
}
}
主题名里的 :DeviceName 层级使用消息接收方的 DeviceName; :SenderDeviceName 层级使用发送方,即设备自己的 DeviceName;:ProductName 层级使用发送方的 ProductName,保证了设备只能给属于同一ProductName的设备发送消息。
接受来自其他设备的消息
DeviceSDK 需要处理来自这个主题的消息,并用 event 的方式将消息传递给设备应用代码:
//IotHub_Device/sdk/iot_device.js
dispatchMessage(topic, payload) {
...
var m2mTopicRule = "m2m/:productName/:deviceName/:senderDeviceName/:MessageID"
var result
var self = this
...
else if ((result = pathToRegexp(m2mTopicRule).exec(topic)) != null) {
this.checkRequestDuplication(result[4], function (isDup) {
if (!isDup) {
self.emit("device_message", result[3], payload)
}
})
}
...
DeviceSDK 会将发送方的 DeviceName 和 消息内容通过 “device_message” 事件传递给设备应用代码。
代码联调
接下来我们写一段代码来验证这个功能,我们会实现 2 个设备端,互相发送 ping/pong:
//IotHub_Device/samples/m2m_pinger.js
...
device.on("online", function () {
console.log("device is online")
})
device.on("device_message", function (sender, payload) {
console.log(`received ${payload.toString()} from: ${sender}`)
setTimeout(function () {
device.sendToDevice(sender, "ping")
}, 1000)
})
device.connect()
device.sendToDevice(process.env.DEVICE_NAME2, "ping")
//IotHub_Device/samples/m2m_ponger.js
...
var device = new IotDevice({
productName: process.env.PRODUCT_NAME,
deviceName: process.env.DEVICE_NAME2,
secret: process.env.SECRET2,
...
})
device.on("online", function () {
console.log("device is online")
})
device.on("device_message", function (sender, payload) {
console.log(`received ${payload.toString()} from: ${sender}`)
setTimeout(function () {
device.sendToDevice(sender, "pong")
}, 1000)
})
device.connect()
这里需要同时运行两个不同的设备端,所以新增了环境变量 DEVICE_NAME2 和 SECRET2 来保存第二设备的 DeviceName 和 Secret,可以看到以下输出:
## m2m_pinger.js
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
received pong from: D_nSy7k7W
...
## m2m_ponger.js
eceived ping from: M-lKbbY80
received ping from: M-lKbbY80
received ping from: M-lKbbY80
received ping from: M-lKbbY80
...
说明设备间的通信功能是正常工作的。
这一节我们设计和实现了 IotHub 的设备间通信功能,下一节我们来讨论一下设备的 OTA 升级。