在这一节我们将 CoAP 协议接入 IotHub,IotHub 的 CoAP 包含以下功能:

  • 允许设备用 CoAP 协议接入,并上传数据和状态;
  • DeviceSDK 仍然需要向设备应用屏蔽底层的协议细节;
  • CoAP 设备使用和 MQTT 设备同样的认证和权限系统。

    由于我只建议用 CoAP 来做数据上传功能,所以在这里只实现上行数据的功能。

EMQ X 的 CoAP 插件

EMQ X 提供一个 emqx_coap 插件来提供 CoAP 协议的接入,这个插件其实是一个 CoAP Gateway,和上一节提到的 CoAP HTTP Gateway 类似,不过它会把 CoAP 请求按照一定规则转换成 MQTT 的 Publish/Subscribe:
45.IotHub 接入 CoAP - 图1
以下的 CoAP 请求会被转换成 MQTT Publish
方法: PUT URL: coap://:/mqtt/?c=&u=&p=
例如 CoAP 请求 PUT coap://127.0.0.1:5683/mqtt/topic/test?c=c1&u=u1&p=p1 会被转换成主题为topic/test的 MQTT Publish 消息,使用的 username/password 为 u1/p1,ClientID 为 c1。
以下的CoAP请求会被转换成Subscribe请求
方法: GET URLcoap://<host>:<port>/mqtt/<topic>?c=<client_id>&u=<username>&p=<password> 例如 CoAP 请求 GET coap://127.0.0.1:5683/mqtt/topic/test?c=c1&u=u1&p=p1 会被转换成主题为topic/test的 MQTT Subscribe 消息,使用的 username/password 为 u1/p1,ClientID 为 c1。

本课程,我们只关心 Publish 消息的转换。

运行< EMQ X 安装目录>/bin/emqx_ctl plugins load emqx_coap加载 emqx_coap 插件,默认配置下使用端口 5683 接收 CoAP 数据。

CoAP 设备端代码

本课程使用 node-coap 作为 CoAP 库,在 DeviceSDK 中新建一个 IotCoAPDevice 类作为 CoAP 设备接入的入口:

  1. //IotHub_DeviceSDK/sdk/iot_coap_device.js
  2. class IotCoAPDevice {
  3. constructor({serverAddress = "127.0.0.1", serverPort = 5683, productName, deviceName, secret, clientID} = {}) {
  4. this.serverAddress = serverAddress
  5. this.serverPort = serverPort
  6. this.productName = productName
  7. this.deviceName = deviceName
  8. this.secret = secret
  9. this.username = `${this.productName}/${this.deviceName}`
  10. if (clientID != null) {
  11. this.clientIdentifier = `${this.username}/${clientID}`
  12. } else {
  13. this.clientIdentifier = this.username
  14. }
  15. }
  16. }

这里使用现有设备的 ProductName 和 DeviceName 进行接入认证。
上传数据和状态的代码很简单,按照转换规则去构造 CoAP 请求就可以了:

  1. //IotHub_DeviceSDK/sdk/iot_coap_device.js
  2. class IotCoAPDevice {
  3. ...
  4. publish(topic, payload) {
  5. var req = coap.request({
  6. hostname: this.serverAddress,
  7. port: this.serverPort,
  8. method: "put",
  9. pathname: `mqtt/${topic}`,
  10. query: `c=${this.clientIdentifier}&u=${this.username}&p=${this.secret}`
  11. })
  12. req.end(Buffer.from(payload))
  13. }
  14. uploadData(data, type){
  15. var topic = `upload_data/${this.productName}/${this.deviceName}/${type}/${new ObjectId().toHexString()}`
  16. this.publish(topic, data)
  17. }
  18. updateStatus(status){
  19. var topic = `update_status/${this.productName}/${this.deviceName}/${new ObjectId().toHexString()}`
  20. this.publish(topic, JSON.stringify(status))
  21. }
  22. }

代码联调

我们可以写一小段代码来测试 CoAP 的功能:

  1. //IotHub_DeviceSDK/samples/upload_coap.js
  2. var IotCoapDevice = require("../sdk/iot_coap_device")
  3. require('dotenv').config()
  4. var path = require('path');
  5. var device = new IotCoapDevice({
  6. productName: process.env.PRODUCT_NAME,
  7. deviceName: process.env.DEVICE_NAME,
  8. secret: process.env.SECRET,
  9. clientID: path.basename(__filename, ".js"),
  10. })
  11. device.updateStatus({coap: true})
  12. device.uploadData("this is a sample data", "coapSample")

运行node upload_coap.js,然后检查 MongoDB 里面对应设备的状态数据和消息数据,可以发现 CoAP 功能在正确的工作。

CoAP 的连接状态

CoAP 是基于 UDP 的,按道理来说是没有连接的,但是如果我们查看对应设备的连接状态:

  1. curl http://localhost:3000/devices/IotApp/H9rTa3uSm
  2. ...
  3. {"connected":true,"client_id":"IotApp/H9rTa3uSm/upload_coap","ipaddress":"127.0.0.1","connected_at":1560432017}
  4. ...

会发现对应的设备下多了一个已连接的 connection,而 upload_coap.js 早就执行完毕退出了,这是为什么呢?
因为经过 emqx_coap 的转换之后,对于 EMQ X Broker 来说,它认为是一个 MQTT Client 接入并发布数据,所以会保留一个 MQTT connection, 而 upload_coap.js 执行完毕退出的时候,并没有发送 DISCONNECT 数据包,所以这个 MQTT connection 状态是已连接。
那什么时候这个连接会变成未连接呢,按照 MQTT 协议的规范,当超过 keep_alive 的时间间隔内没有收到来自该连接的消息,就会认为该连接已关闭。在< EMQ X 安装目录>/etc/plugins/emqx_coap.conf中可以配置这个 keep_alive 的值,默认为 120 秒:

coap.keep_alive = 120s

等待超过 120 秒以后,再次查询设备的连接状态:

curl http://localhost:3000/devices/IotApp/H9rTa3uSm
...
{"connected":false,"client_id":"IotApp/H9rTa3uSm/upload_coap","ipaddress":"127.0.0.1","connected_at":1560432017,"disconnect_at":1560432197}
...

这个连接的状态已变为未连接。

这一节我们配置 EMQ X Broker 支持 CoAP,并用现有的 IotHub 的设备体系来支持 CoAP 设备的接入和数据上传。本课程的内容到此就结束了。