说明
通过下面案例了解MQTT连接器的使用,包括遥测,属性,单向双向RPC。
案例:有个开关传感器,且集成一些温湿度,温湿度上传为设备遥测,型号等上传为设备属性。单向RPC控制开关,双向RPC获取开关状态。
BUG:tb网关的MQTT连接器双向RPC存在问题,无法成功,视频和文章会演示如何解决BUG。
过程演示
文字展示视频操作过程
修改网关配置
thingsboard_gateway/config/tb_gateway.yaml
thingsboard:
host: 192.168.7.198 # TB mqtt ip
port: 1883 # TB mqtt端口
remoteShell: false
remoteConfiguration: false
security:
accessToken: LmX4G3nhJXRr0zOk6mzL # mqtt网关设备accesstoken
qos: 0
storage:
type: memory
read_records_count: 100
max_records_count: 100000
connectors: # 打开MQTT连接器
-
name: MQTT Broker Connector
type: mqtt
configuration: mqtt-test.json #指定mqtt连接器配置文件
MQTT连接器配置
{
"broker": {
"name":"EMQX Broker",
"host":"192.168.7.190",
"port":1883,
"clientId": "ThingsBoard_gateway",
"security": {
"type": "basic",
"username": "admin",
"password": "admin"
}
},
"mapping": [
{
"topicFilter": "sensor/data",
"converter": {
"type": "json",
"deviceNameJsonExpression": "${serialNumber}",
"deviceTypeJsonExpression": "${sensorType}",
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "model",
"value": "${sensorModel}"
},
{
"type": "string",
"key": "${sensorModel}",
"value": "on"
}
],
"timeseries": [
{
"type": "double",
"key": "temperature",
"value": "${temp}"
},
{
"type": "double",
"key": "humidity",
"value": "${hum}"
},
{
"type": "boolean",
"key": "occupancy",
"value": "${occ}"
},
{
"type": "int",
"key": "state",
"value": "${state}"
}
]
}
}
],
"connectRequests": [
{
"topicFilter": "sensor/connect",
"deviceNameJsonExpression": "${serialNumber}"
}
],
"disconnectRequests": [
{
"topicFilter": "sensor/disconnect",
"deviceNameJsonExpression": "${serialNumber}"
}
],
"attributeUpdates": [
{
"deviceNameFilter": ".*",
"attributeFilter": "uploadFrequency",
"topicExpression": "sensor/${serialNumber}/${attributeKey}",
"valueExpression": "{\"${attributeKey}\":\"${attributeValue}\"}"
}
],
"serverSideRpc": [
{
"deviceNameFilter": ".*",
"methodFilter": "getState",
"requestTopicExpression": "sensor/${deviceName}/request/${methodName}",
"responseTopicExpression": "sensor/${deviceName}/response/${methodName}",
"responseTimeout": 10000,
"valueExpression": "${params}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "setState",
"requestTopicExpression": "sensor/${deviceName}/request/${methodName}",
"valueExpression": "${params}"
}
]
}
JS模拟网关子设备
安装依赖 npm install mqtt
js文件名 mqtt_client.js
注意下面的日志格式,后面不在赘述。
var mqtt = require('mqtt');
console.log('start mqtt_client,prepare connect');
//连接emqx broker
var client = mqtt.connect('mqtt://192.168.7.190:1883', {
username: 'admin',
password: 'admin',
});
//开关状态
state = 0;
var preTimer;
//设备连接回调
client.on('connect', function () {
console.log('connected');
//发送连接请求
publish('sensor/connect', {
serialNumber: 'SN-002',
});
//订阅服务端RPC
client.subscribe('sensor/+/request/+');
//订阅设备属性更新
client.subscribe('sensor/+/+');
//不断的上传遥测
preTimer = telemetry();
});
//收到消息回调
client.on('message', function (topic, message) {
console.log('on message:');
console.log(topic);
console.log(message.toString());
console.log('--------------------------------------');
let data = JSON.parse(message);
if (new RegExp('sensor/+/request/+'.replace('+', '[^/]+')).test(topic)) {
console.log('RegExp success');
var levels = topic.split('/');
if (levels[3] === 'getState') {
console.log('receive getState');
//回复 twoway RPC
publish('sensor/' + levels[1] + '/response/' + levels[3], {
state: state,
});
}
if (levels[3] === 'setState') {
console.log('receive setState');
state = data.stateValue;
//重新发送遥测 js没有双向绑定只能这样操作了
if (preTimer != null) {
clearInterval(preTimer);
}
preTimer = telemetry();
}
}
});
//发送遥测 返回定时器对象
function telemetry() {
return setInterval(() => {
publish('sensor/data', {
serialNumber: 'SN-002',
sensorType: 'default',
sensorModel: 'SN-model',
'SN-model': 'on',
temp: Math.random(),
hum: Math.random(),
occ: true,
state: state,
});
}, 3000);
}
//发送数据+打印日志
function publish(topic, message) {
client.publish(topic, JSON.stringify(message));
log(topic, message);
}
//控制台打印
function log(topic, message) {
console.log('send message:');
console.log(topic);
console.log(JSON.stringify(message));
console.log('===================================');
}
添加网关设备
启动网关
网关日志打印 tb_device_mqtt - 141 - connection SUCCESSb
代表网关连接TB MQTT服务器成功。
""2021-04-23 13:17:12" - INFO - [tb_gateway_service.py] - tb_gateway_service - 138 - Gateway started."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 235 - MQTT Broker Connector connected to 192.168.7.190:1883 - successfully."
""2021-04-23 13:17:12" - INFO - [tb_loader.py] - tb_loader - 66 - Import JsonMqttUplinkConverter from /Users/weijixin/PycharmProjects/thingsboard-gateway/thingsboard_gateway/connectors/mqtt."
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 276 - Connector "MQTT Broker Connector" subscribe to sensor/data"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/data, subscription message id = 1"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/connect, subscription message id = 2"
""2021-04-23 13:17:12" - INFO - [mqtt_connector.py] - mqtt_connector - 324 - "MQTT Broker Connector" subscription success to topic sensor/disconnect, subscription message id = 3"
""2021-04-23 13:17:12" - INFO - [tb_device_mqtt.py] - tb_device_mqtt - 141 - connection SUCCESS"
""2021-04-23 13:17:12" - INFO - [tb_gateway_mqtt.py] - tb_gateway_mqtt - 176 - Subscribed to *|* with id 2"
启动js模拟设备
控制台执行 node mqtt_client.js
前俩行日志打印如下,则js模拟设备成功连接emqx broker MQTT服务器
start mqtt_client,prepare connect
connected
接下来会发送一条连接请求,主题sensor/connect
到emqx broker,日志如下
send message:
sensor/connect
{"serialNumber":"SN-002"}
===================================
对应mqtt-test.json
配置
"connectRequests": [
{
"topicFilter": "sensor/connect",
"deviceNameJsonExpression": "${serialNumber}"
}
],
网关收到打印如下,SN-002
就是网关解析的子设备名
""2021-04-23 13:17:14" - INFO - [mqtt_connector.py] - mqtt_connector - 402 - Connecting device SN-002 of type default"
此时打开tb页面,设备列表出现设备SN-002
。
接着js会开始定时3s间隔不断的发送遥测,此时tb页面可以观察到客户端属性和遥测数据了。
创建开关小部件
创建Round switch``小部件
,图中的ADCDE部分一一解释。
A
机器翻译:属性/时间序列值键(仅当订阅属性/时间序列方法时)
应该就是从遥测或属性中找state
数据点,这个输入框我也比较迷糊,他的值用来做什么的?直接拿来做开关量的话,不需要转换函数吗?
B
通过发送输入框指定的双向RPC方法(有响应的请求),获取开关的值,并且用D部分的函数转换RPC返回值为true/false,从而改变开关状态。
C
通过发送输入框指定的RPC方法,改变子设备对应的开关量状态值,发送方法的参数用E部分的函数转换。
D
E
点击开关按钮后会给此函数传入value,点击开,传入value就是true;点击关传入value就是false。然后经过自己编写的函数转换后作为C方法的参数发送。
小部件创建好了,应该会自动发送一个getState双向RPC请求
来确定自己目前是开还是关的状态,但是看看网关日志吧,报错了,反复调试后确定是BUG.下面说说这个BUG如何修复!
MQTT连接器双向RPC的BUG修复
开关小部件创建成功后,会发送getState的双向RPC,此时网关报错如下: