这一节课,我们来完成指令下发剩余部分的功能, 当设备对指令进行回复以后,IotHub 会通过 RabbitMQ 将设备的回复通知到业务系统,最后我们将 IotHub Server 端的代码和 DeviceSDK 的代码进行联调。

通知业务系统

指令处理的最后一步就是将设备对指令的回复再转发到业务服务器,具体流程:

  1. IotHub Server 通过 WebHook 获取设备对指令的回复消息;
  2. IotHub Server 通过解析消息的主题名获取指令回复的元数据;
  3. IotHub 通过 RabbitMQ 对指令的回复转发到对应的业务系统。

首先在 WebHook 里添加对指令回复消息的处理:

  1. //IotHub_Server/messages/message_service.js
  2. static dispatchMessage({topic, payload, ts} = {}) {
  3. ...
  4. const cmdRespRegx = pathToRegexp(cmdRespRule)
  5. var result = null;
  6. if ((result = topicRegx.exec(topic)) != null) {
  7. ...
  8. } else if ((result = statusRegx.exec(topic)) != null) {
  9. ...
  10. } else if ((result = cmdRespRegx.exec(topic)) != null) {
  11. this.checkMessageDuplication(result[5], function (isDup) {
  12. if (!isDup) {
  13. MessageService.handleCommandResp({
  14. productName: result[1],
  15. deviceName: result[2],
  16. ts: ts,
  17. command: result[3],
  18. requestId: result[4],
  19. payload: new Buffer(payload, 'base64')
  20. })
  21. }
  22. })
  23. }
  24. }

接下来再把指令回复通过 RabbitMQ 转发到对应的业务系统:

  1. //IotHub_Server/service/message_service.js
  2. static handleCommandResp({productName, deviceName, command, requestId, ts, payload}) {
  3. NotifyService.notifyCommandResp({
  4. productName: productName,
  5. deviceName: deviceName,
  6. command: command,
  7. requestId: requestId,
  8. ts: ts,
  9. payload: payload
  10. })
  11. }
  1. //IotHub_Server/service/notify_service.js
  2. static notifyCommandResp({productName, deviceName, command, requestId, ts, payload}){
  3. var data = bson.serialize({
  4. device_name: deviceName,
  5. command: command,
  6. request_id: requestId,
  7. send_at: ts,
  8. payload: payload
  9. })
  10. if(currentChannel != null){
  11. currentChannel.publish(commandRespExchange, productName, data)
  12. }
  13. }

到这里,最后一步就完成了。

代码联调

现在,我们可以总结整个指令下发的流程了:
22.服务端实现(二) - 图1

  1. 业务系统调用 Server API 发送指令。
  2. IotHub Server 调用 EMQ X 的 Publish API(RESTful)。
  3. EMQ X Broker Publish 消息到设备订阅的主题。
  4. DeviceSDK 提取出指令的信息并通过 Event 的方式传递到设备应用代码。
  5. 设备应用代码执行完指令要求的操作后,通过 Callback(闭包)的方式要求 DeviceSDK 对指令进行回复。
  6. DeviceSDK Publish 包含指令回复的消息到 EMQ X Broker。
  7. EMQ X Broker 通过 WebHook 将指令回复传递到 IotHub Server。
  8. IotHub Server 将指令回复放入到 RabbitMQ 对应的队列中。
  9. 业务系统从 RabbitMQ 的对应队列获得指令的回复。

下面我们来写点代码来验证这个流程。

现在开始我们把 Server 端的示例代码放在 IotHub_Server/samples 下面。

首先我们实现一段模拟业务系统的代码,它有以下的功能:

  • 调用 IotHub Server API,向设备发送指令 “ping”,指令数据为当前的时间戳,以二进制格式传输;
  • 可以通过命令行参数指定指令的 TTL,默认情况下指令无有效期的限制;
  • 从 RabbitMQ 中获取设备对指令的回复,并打印出来。
    1. //IotHub_Server/samples/ping.js
    2. require('dotenv').config({path: "../.env"})
    3. const bson = require('bson')
    4. const request = require("request")
    5. var amqp = require('amqplib/callback_api');
    6. var exchange = "iothub.events.cmd_resp"
    7. amqp.connect(process.env.RABBITMQ_URL, function (error0, connection) {
    8. if (error0) {
    9. console.log(error0);
    10. } else {
    11. connection.createChannel(function (error1, channel) {
    12. if (error1) {
    13. console.log(error1)
    14. } else {
    15. channel.assertExchange(exchange, 'direct', {durable: true})
    16. var queue = "iotapp_cmd_resp";
    17. channel.assertQueue(queue, {
    18. durable: true
    19. })
    20. channel.bindQueue(queue, exchange, process.env.TARGET_PRODUCT_NAME)
    21. channel.consume(queue, function (msg) {
    22. var data = bson.deserialize(msg.content)
    23. if(data.command == "ping") {
    24. console.log(`received from ${data.device_name}, requestId: ${data.request_id},payload: ${data.payload.buffer.readUInt32BE(0)}`)
    25. }
    26. channel.ack(msg)
    27. })
    28. }
    29. });
    30. }
    31. });
    32. const buf = Buffer.alloc(4);
    33. buf.writeUInt32BE(Math.floor(Date.now())/1000, 0);
    34. var formData = {
    35. command: "ping",
    36. data: buf.toString("base64"),
    37. encoding: "base64"
    38. }
    39. if(process.argv[2] != null){
    40. formData.ttl = process.argv[2]
    41. }
    42. request.post(`http://127.0.0.1:3000/devices/${process.env.TARGET_PRODUCT_NAME}/${process.env.TARGET_DEVICE_NAME}/command`, {
    43. form: formData
    44. }, function (error, response, body) {
    45. if (error) {
    46. console.log(error)
    47. } else {
    48. console.log('statusCode:', response && response.statusCode);
    49. console.log('body:', body);
    50. }
    51. })
    然后实现一段设备端应用代码,当接受到”ping”指令时,回复设备当前的时间戳,使用二进制格式进行传输:
    1. //IotHub_Device/samples/pong.js
    2. var IotDevice = require("../sdk/iot_device")
    3. require('dotenv').config()
    4. var path = require('path');
    5. var device = new IotDevice({
    6. productName: process.env.PRODUCT_NAME,
    7. deviceName: process.env.DEVICE_NAME,
    8. secret: process.env.SECRET,
    9. clientID: path.basename(__filename, ".js"),
    10. storePath: `../tmp/${path.basename(__filename, ".js")}`
    11. })
    12. device.on("online", function () {
    13. console.log("device is online")
    14. })
    15. device.on("command", function (command, data, respondCommand) {
    16. if (command == "ping") {
    17. console.log(`get ping with: ${data.readUInt32BE(0)}`)
    18. const buf = Buffer.alloc(4);
    19. buf.writeUInt32BE(Math.floor(Date.now())/1000, 0);
    20. respondCommand(buf)
    21. }
    22. })
    23. device.connect()

    这两段代码是有实际意义的,业务系统和设备可以通过一次指令的交互来了解他们之间数据传输的延迟状况(包括网络和 IotHub 处理的耗时)。

现在我们来运行上面的两段代码。

  • 首先运行node ping.js,然后再运行node pong.js可以看到以下输出:

    1. ## node ping.js
    2. statusCode: 200
    3. body: {"request_id":"5cf25cce5cb7dc80277d4641"}
    4. received from HBG84L_M6, requestId: 5cf25cce5cb7dc80277d4641,payload: 1559387342
    ## node pong.js
    device is online
    get ping with: 1559387342
    

    这说明设备可以接受离线消息并回复,业务系统也正确地接收了设备对指令的回复,设备回复里的 RequestID 和业务系统下发指令时的 RequestID 是一致的。

  • 首先运行node ping.js 10,设定指令有效期为 10 秒,然后在 10 秒内运行node pong.js,我们可以看到和第一步一致的输出。

  • 首先运行node ping.js 10,设定指令有效期为 10 秒,然后等待 10 秒,再运行node pong.js,在控制台上不会有任何和指令相关的输出,说明指令的有效期设置是生效的。

到这一节为止,IotHub 的下行数据处理功能就完成了。目前 IotHub 可以正确地处理上行数据和下行数据了,在第四部分的课程里,我们基于 IotHub 上行和下行数据处理的框架,做进一步的抽象,实现一些更高级的功能。