说明
通过下面案例了解MQTT连接器的使用,包括遥测,属性,单向双向RPC。
案例:有个开关传感器,且集成一些温湿度,温湿度上传为设备遥测,型号等上传为设备属性。单向RPC控制开关,双向RPC获取开关状态。
BUG:tb网关的MQTT连接器双向RPC存在问题,无法成功,视频和文章会演示如何解决BUG。
过程演示
文字展示视频操作过程
修改网关配置
thingsboard_gateway/config/tb_gateway.yaml
thingsboard:host: 192.168.7.198 # TB mqtt ipport: 1883 # TB mqtt端口remoteShell: falseremoteConfiguration: falsesecurity:accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstokenqos: 0storage:type: memoryread_records_count: 100max_records_count: 100000connectors: # 打开MQTT连接器-name: MQTT Broker Connectortype: mqttconfiguration: mqtt-test.json #指定mqtt连接器配置文件
MQTT连接器配置
{"broker": {"name":"EMQX Broker","host":"192.168.7.190","port":1883,"clientId": "ThingsBoard_gateway","security": {"type": "basic","username": "admin","password": "admin"}},"mapping": [{"topicFilter": "sensor/data","converter": {"type": "json","deviceNameJsonExpression": "${serialNumber}","deviceTypeJsonExpression": "${sensorType}","timeout": 60000,"attributes": [{"type": "string","key": "model","value": "${sensorModel}"},{"type": "string","key": "${sensorModel}","value": "on"}],"timeseries": [{"type": "double","key": "temperature","value": "${temp}"},{"type": "double","key": "humidity","value": "${hum}"},{"type": "boolean","key": "occupancy","value": "${occ}"},{"type": "int","key": "state","value": "${state}"}]}}],"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],"disconnectRequests": [{"topicFilter": "sensor/disconnect","deviceNameJsonExpression": "${serialNumber}"}],"attributeUpdates": [{"deviceNameFilter": ".*","attributeFilter": "uploadFrequency","topicExpression": "sensor/${serialNumber}/${attributeKey}","valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"}],"serverSideRpc": [{"deviceNameFilter": ".*","methodFilter": "getState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","responseTopicExpression": "sensor/${deviceName}/response/${methodName}","responseTimeout": 10000,"valueExpression": "${params}"},{"deviceNameFilter": ".*","methodFilter": "setState","requestTopicExpression": "sensor/${deviceName}/request/${methodName}","valueExpression": "${params}"}]}
JS模拟网关子设备
安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。
var mqtt = require('mqtt');console.log('start mqtt_client,prepare connect');//连接emqx brokervar client = mqtt.connect('mqtt://192.168.7.190:1883', {username: 'admin',password: 'admin',});//开关状态state = 0;var preTimer;//设备连接回调client.on('connect', function () {console.log('connected');//发送连接请求publish('sensor/connect', {serialNumber: 'SN-002',});//订阅服务端RPCclient.subscribe('sensor/+/request/+');//订阅设备属性更新client.subscribe('sensor/+/+');//不断的上传遥测preTimer = telemetry();});//收到消息回调client.on('message', function (topic, message) {console.log('on message:');console.log(topic);console.log(message.toString());console.log('--------------------------------------');let data = JSON.parse(message);if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {console.log('RegExp success');var levels = topic.split('/');if (levels[3] === 'getState') {console.log('receive getState');//回复 twoway RPCpublish('sensor/' + levels[1] + '/response/' + levels[3], {state: state,});}if (levels[3] === 'setState') {console.log('receive setState');state = data.stateValue;//重新发送遥测 js没有双向绑定只能这样操作了if (preTimer != null) {clearInterval(preTimer);}preTimer = telemetry();}}});//发送遥测 返回定时器对象function telemetry() {return setInterval(() => {publish('sensor/data', {serialNumber: 'SN-002',sensorType: 'default',sensorModel: 'SN-model','SN-model': 'on',temp: Math.random(),hum: Math.random(),occ: true,state: state,});}, 3000);}//发送数据+打印日志function publish(topic, message) {client.publish(topic, JSON.stringify(message));log(topic, message);}//控制台打印function log(topic, message) {console.log('send message:');console.log(topic);console.log(JSON.stringify(message));console.log('===================================');}
添加网关设备
启动网关
网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb 代表网关连接TB MQTT服务器成功。
""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"
启动js模拟设备
控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器
start mqtt_client,prepare connectconnected
接下来会发送一条连接请求,主题sensor/connect到emqx broker,日志如下
send message:sensor/connect{"serialNumber":"SN-002"}===================================
对应mqtt-test.json配置
"connectRequests": [{"topicFilter": "sensor/connect","deviceNameJsonExpression": "${serialNumber}"}],
网关收到打印如下,SN-002就是网关解析的子设备名
""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"
此时打开tb页面,设备列表出现设备SN-002。
接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。
创建开关小部件
创建Round switch``小部件,图中的ADCDE部分一一解释。
A
机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)
应该就是从遥测或属性中找state数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?
B
通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。
C
通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。
D
E
点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。
小部件创建好了,应该会自动发送一个getState双向RPC请求来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!
MQTT连接器双向RPC的BUG修复
开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下:

