说明

通过下面案例了解MQTT连接器的使用,包括遥测,属性,单向双向RPC。
案例:有个开关传感器,且集成一些温湿度,温湿度上传为设备遥测,型号等上传为设备属性。单向RPC控制开关,双向RPC获取开关状态。
BUG:tb网关的MQTT连接器双向RPC存在问题,无法成功,视频和文章会演示如何解决BUG。

过程演示

mqtt连接器演示.mp4 (32.19MB)

文字展示视频操作过程

注意注释部分,代码块滚动查看

修改网关配置

thingsboard_gateway/config/tb_gateway.yaml

  1. thingsboard:
  2. host: 192.168.7.198 # TB mqtt ip
  3. port: 1883 # TB mqtt端口
  4. remoteShell: false
  5. remoteConfiguration: false
  6. security:
  7. accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstoken
  8. qos: 0
  9. storage:
  10. type: memory
  11. read_records_count: 100
  12. max_records_count: 100000
  13. connectors: # 打开MQTT连接器
  14. -
  15. name: MQTT Broker Connector
  16. type: mqtt
  17. configuration: mqtt-test.json #指定mqtt连接器配置文件

MQTT连接器配置

  1. {
  2. "broker": {
  3. "name":"EMQX Broker",
  4. "host":"192.168.7.190",
  5. "port":1883,
  6. "clientId": "ThingsBoard_gateway",
  7. "security": {
  8. "type": "basic",
  9. "username": "admin",
  10. "password": "admin"
  11. }
  12. },
  13. "mapping": [
  14. {
  15. "topicFilter": "sensor/data",
  16. "converter": {
  17. "type": "json",
  18. "deviceNameJsonExpression": "${serialNumber}",
  19. "deviceTypeJsonExpression": "${sensorType}",
  20. "timeout": 60000,
  21. "attributes": [
  22. {
  23. "type": "string",
  24. "key": "model",
  25. "value": "${sensorModel}"
  26. },
  27. {
  28. "type": "string",
  29. "key": "${sensorModel}",
  30. "value": "on"
  31. }
  32. ],
  33. "timeseries": [
  34. {
  35. "type": "double",
  36. "key": "temperature",
  37. "value": "${temp}"
  38. },
  39. {
  40. "type": "double",
  41. "key": "humidity",
  42. "value": "${hum}"
  43. },
  44. {
  45. "type": "boolean",
  46. "key": "occupancy",
  47. "value": "${occ}"
  48. },
  49. {
  50. "type": "int",
  51. "key": "state",
  52. "value": "${state}"
  53. }
  54. ]
  55. }
  56. }
  57. ],
  58. "connectRequests": [
  59. {
  60. "topicFilter": "sensor/connect",
  61. "deviceNameJsonExpression": "${serialNumber}"
  62. }
  63. ],
  64. "disconnectRequests": [
  65. {
  66. "topicFilter": "sensor/disconnect",
  67. "deviceNameJsonExpression": "${serialNumber}"
  68. }
  69. ],
  70. "attributeUpdates": [
  71. {
  72. "deviceNameFilter": ".*",
  73. "attributeFilter": "uploadFrequency",
  74. "topicExpression": "sensor/${serialNumber}/${attributeKey}",
  75. "valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
  76. }
  77. ],
  78. "serverSideRpc": [
  79. {
  80. "deviceNameFilter": ".*",
  81. "methodFilter": "getState",
  82. "requestTopicExpression": "sensor/${deviceName}/request/${methodName}",
  83. "responseTopicExpression": "sensor/${deviceName}/response/${methodName}",
  84. "responseTimeout": 10000,
  85. "valueExpression": "${params}"
  86. },
  87. {
  88. "deviceNameFilter": ".*",
  89. "methodFilter": "setState",
  90. "requestTopicExpression": "sensor/${deviceName}/request/${methodName}",
  91. "valueExpression": "${params}"
  92. }
  93. ]
  94. }

JS模拟网关子设备

安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。

  1. var mqtt = require('mqtt');
  2. console.log('start mqtt_client,prepare connect');
  3. //连接emqx broker
  4. var client = mqtt.connect('mqtt://192.168.7.190:1883', {
  5. username: 'admin',
  6. password: 'admin',
  7. });
  8. //开关状态
  9. state = 0;
  10. var preTimer;
  11. //设备连接回调
  12. client.on('connect', function () {
  13. console.log('connected');
  14. //发送连接请求
  15. publish('sensor/connect', {
  16. serialNumber: 'SN-002',
  17. });
  18. //订阅服务端RPC
  19. client.subscribe('sensor/+/request/+');
  20. //订阅设备属性更新
  21. client.subscribe('sensor/+/+');
  22. //不断的上传遥测
  23. preTimer = telemetry();
  24. });
  25. //收到消息回调
  26. client.on('message', function (topic, message) {
  27. console.log('on message:');
  28. console.log(topic);
  29. console.log(message.toString());
  30. console.log('--------------------------------------');
  31. let data = JSON.parse(message);
  32. if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {
  33. console.log('RegExp success');
  34. var levels = topic.split('/');
  35. if (levels[3] === 'getState') {
  36. console.log('receive getState');
  37. //回复 twoway RPC
  38. publish('sensor/' + levels[1] + '/response/' + levels[3], {
  39. state: state,
  40. });
  41. }
  42. if (levels[3] === 'setState') {
  43. console.log('receive setState');
  44. state = data.stateValue;
  45. //重新发送遥测 js没有双向绑定只能这样操作了
  46. if (preTimer != null) {
  47. clearInterval(preTimer);
  48. }
  49. preTimer = telemetry();
  50. }
  51. }
  52. });
  53. //发送遥测 返回定时器对象
  54. function telemetry() {
  55. return setInterval(() => {
  56. publish('sensor/data', {
  57. serialNumber: 'SN-002',
  58. sensorType: 'default',
  59. sensorModel: 'SN-model',
  60. 'SN-model': 'on',
  61. temp: Math.random(),
  62. hum: Math.random(),
  63. occ: true,
  64. state: state,
  65. });
  66. }, 3000);
  67. }
  68. //发送数据+打印日志
  69. function publish(topic, message) {
  70. client.publish(topic, JSON.stringify(message));
  71. log(topic, message);
  72. }
  73. //控制台打印
  74. function log(topic, message) {
  75. console.log('send message:');
  76. console.log(topic);
  77. console.log(JSON.stringify(message));
  78. console.log('===================================');
  79. }

添加网关设备

image.png

启动网关

网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb 代表网关连接TB MQTT服务器成功。

  1. ""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
  2. ""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
  3. ""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
  4. ""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
  5. ""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
  6. ""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
  7. ""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
  8. ""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
  9. ""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"

启动js模拟设备

控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器

  1. start mqtt_client,prepare connect
  2. connected

接下来会发送一条连接请求,主题sensor/connect到emqx broker,日志如下

  1. send message:
  2. sensor/connect
  3. {"serialNumber":"SN-002"}
  4. ===================================

对应mqtt-test.json配置

  1. "connectRequests": [
  2. {
  3. "topicFilter": "sensor/connect",
  4. "deviceNameJsonExpression": "${serialNumber}"
  5. }
  6. ],

网关收到打印如下,SN-002就是网关解析的子设备名

  1. ""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"

此时打开tb页面,设备列表出现设备SN-002


接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。

创建开关小部件

创建Round switch``小部件,图中的ADCDE部分一一解释。
image.png

A

机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)
应该就是从遥测或属性中找state数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?

B

通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。

C

通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。

D

参考B的解释

E

点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。


小部件创建好了,应该会自动发送一个getState双向RPC请求来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!
不点赞美女哭了.jpg

MQTT连接器双向RPC的BUG修复

开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下: