前言
市面多数设备支持阿里云物联网平台的ALink协议,假设这类网关无法更改上下行协议,如何接入tb平台?
文章是个人调研过程中的理解和分享,顺便通过偏实战的方式,更全面的实践tb网关的mqtt连接器使用。看官网介绍的时候觉得很简单,真正用起来,细节和文档缺失的介绍还是很多的(比如官网只介绍了使用tb网关时,设备如何订阅共享属性更新,却没有介绍设备还可以主动请求共享属性)。
由于thingsboard平台和阿里物联网平台对物联网中的相关定义及功能设计不一致。以个人浅显理解,使用下面的对应关系做接入。
ALink协议文档
tb-gateway mqtt连接器文档
ALink协议 | ThingsBoard协议 [tb-gateway] |
---|---|
设备身份注册 | 自动注册 |
管理拓扑关系 | 不需要 |
子设备上下线 | 连接/断开连接请求 |
设备只读属性 | 客户端属性/遥测 |
设备读写属性 | 上行客户端属性 下行RPC |
设备上报事件 | 遥测+规则链/客户端属性 ? |
设备服务(异步同步) | 服务端RPC(oneway/twoway) |
设备期望属性值 | 待完善 |
设备禁用/启用/删除 | 上行客户端属性(属性名method)下行RPC |
设备标签 | 上行客户端属性 下行RPC |
ack | 遥测+规则链 |
TSL模板 | 共享属性 待完善 |
OTA | 待完善 |
远程配置 | 共享属性 待完善 |
设备日志上报 | 待完善 |
设备网络状态 | 待完善 |
设备分发 | 待完善 |
设备任务 | 待完善 |
整体规划
- ALink网关及网关子设备 用js模拟(硬件没到,手动调研吧)
需要依赖 npm install mqtt
- 使用tb-gateway接入ALink网关
-
调用流程
数据上传:(上行)
ALink网关js 发送mqtt到EMQX,tb-gateway订阅EMQX相关主题,并将数据格式及上下行协议转为tb格式 发送到tb
数据下发:(下行)
tb服务端下发指定 到 tb-gateway,tb-gateway转换主题和数据格式 发送到 EMQX,js订阅EMQX收到消息
下行如果需要回复:js 收到消息后发送数据到EMQX,tb-gateway订阅回复主题 并转换消息及上行协议格式 发送数据到tb开始
tb web添加设备
阿里云ALink协议网关
并勾选是网关
- 点击
复制访问令牌
将令牌粘贴到 tb-gateway的tb_gateway.yaml
中,并设置tb的地址
和端口
以及打开mqtt连接器
- 大体看下js中规划的功能,接下来根据各个功能配置mqtt.json文件 ```javascript //设备读写属性 var power = ‘on’; var publish_telemetry_interval = 10; var publish_label_interval = 30; //设备只读属性 var temperature; //设备标签 var model = ‘AHT15’//型号 var vendor = ‘jin_shi_da’;//厂商 var area = ‘beijing’; //设备禁用/启用/删除 var status = ‘enable’;
/ topic定义 / //下行 alink属性 var set_attributes_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/property/set’; //下行 alink属性响应 var set_attributes_topic_reply = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/property/set_reply’; //上行 alink属性 var property_post_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/event/property/post’; //上行 子设备上线 var device_login_topic = ‘/ext/session/‘ + productKey + ‘/‘ + deviceName + ‘/combine/login’; //上行 子设备下线 var device_logout_topic = ‘/ext/session/‘ + productKey + ‘/‘ + deviceName + ‘/combine/logout’; //下行 子设备禁用/启用/删除 var device_disable_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/disable’; var device_enable_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/enable’; var device_delete_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/delete’; //上行 设备标签上报 var label_upload_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/update’; //下行 设备标签上报 var label_upload_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/update_reply’; //上行 删除标签信息 var label_delete_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/delete’; //下行 删除标签信息 var label_delete_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/delete_reply’; //下行 服务调用 var service_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/+’; //上行 服务调用响应 var service_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/+’ + ‘_reply’;
4. 配置mqtt.json [json文件不能注释,下面这个文件加了注释,文章最后贴出无注释的mqtt.json文件]
模拟演示一个温度传感器经过ALink网关上传数据:<br />有`temperature`属性作为遥测,通过ALink上行属性topic上传。<br />`power`属性作为客户端属性,通过ALink上行属性topic上传。tb通过服务改变`power`的值,控制温度传感器的工作与否。<br />`publish_telemetry_interval`属性作为客户端属性,通过ALink上行属性topic上传。下行属性控制ALink上行属性topic上传间隔。<br />`model`型号`vendor`厂商`area`地址,作为客户端属性,通过ALink上行标签topic上传。<br />`status`设备状态,作为客户端属性,通过ALink下行 子设备禁用/启用/删除topic控制。<br />子设备上线、子设备下线<br />设置属性<br />调用服务<br />下行响应
```json
{
"broker": {
"name": "Default Local Broker", //外部mqtt服务器名称 这里使用主流 EMQX Broker
"host": "192.168.7.190", //外部mqtt服务器地址
"port": 1883, //外部mqtt服务器端口
"clientId": "ThingsBoard_gateway", //客户端ID
"security": {
"type": "basic",
"username": "admin", //emqx客户端账号
"password": "admin" //emqx客户端密码, 没有可以删掉这一项
}
},
"mapping": [
//这个对象将ALink网关的上行属性,根据属性特征转换为tb的客户端属性和遥测消息.
//ALink属性上行协议params中没有deviceName、productKey俩字段,由于目前没有时间研究deviceNameTopicExpression,就简单使用deviceNameJsonExpression
//所以在属性上传时params加了deviceName、productKey俩个字段。应该从topic获取就完美了,后面有时间补上
{
"topicFilter": "/sys/+/+/thing/event/property/post",
"subscriptionQos": 0, //qos可以不设置,默认1,只支持 0,1
"converter": {
"type": "json",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}",
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "power",
"value": "${params.power.value}"
},
{
"type": "int",
"key": "publish_telemetry_interval",
"value": "${params.publish_telemetry_interval.value}"
},
{
"type": "int",
"key": "publish_label_interval",
"value": "${params.publish_telemetry_interval.value}"
},
{
"type": "string",
"key": "status",
"value": "${params.status.value}"
}
],
"timeseries": [
{
"type": "float",
"key": "temperature",
"value": "${params.temperature.value}"
}
]
}
},
{
"topicFilter": "/sys/+/+/thing/deviceinfo/update",
"subscriptionQos": 0,
"converter": {
"type": "json",
"deviceNameJsonExpression": "${sys.deviceName}",
"deviceTypeJsonExpression": "${sys.productKey}",
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "model",
"value": "${params[0].attrValue}"
},
{
"type": "string",
"key": "vendor",
"value": "${params[1].attrValue}"
},
{
"type": "string",
"key": "area",
"value": "${params[2].attrValue}"
}
]
}
}
],
"connectRequests": [
{
"topicFilter": "/ext/session/+/+/combine/login",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}"
}
],
"disconnectRequests": [
{
"topicFilter": "/ext/session/+/+/combine/logout",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}"
}
],
"attributeUpdates": [
],
"attributeRequests": [
],
"serverSideRpc": [
{
"deviceNameFilter": ".*",
"methodFilter": "setStatus",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/${params}",
"responseTopicExpression": "/sys/${deviceType}/${deviceName}/thing/${params}_reply",
"responseTimeout": 10000,
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":{},\"method\":\"thing.${params}\"}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "setProperty",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/property/set",
"responseTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/property/set_reply",
"responseTimeout": 10000,
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":${params},\"method\":\"thing.service.property.set\"}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "replyLabelUpdate",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/deviceinfo/update_reply",
"valueExpression": "{\"id\":\"123\",\"code\":200,\"data\":{}}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "service.*",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/${methodName}",
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":${params},\"method\":\"thing.service.${methodName}\"}"
}
]
}
- 完整js ```javascript var mqtt = require(‘mqtt’); var client = mqtt.connect(‘mqtt://localhost:1884’, { username: ‘admin’, password: ‘admin’ }); // 设备预置参数 var productKey = “a1CpLSEvPUk”; var deviceName = “RoomTempSensor”; var version = “1.0”; var clientId = ‘asdasdasdaedqwe123123123123’; //设备读写属性 var power = ‘on’; var publish_telemetry_interval = 10; var publish_label_interval = 30; //设备只读属性 var temperature; //设备标签 var model = ‘AHT15’//型号 var vendor = ‘jin_shi_da’;//厂商 var area = ‘beijing’; // var model = ‘LM35DZ’; // var model = ‘LM35’; //设备禁用/启用/删除 var status = ‘enable’;
/ topic定义 / //下行 alink属性 var set_attributes_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/property/set’; //下行 alink属性响应 var set_attributes_topic_reply = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/property/set_reply’; //上行 alink属性 var property_post_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/event/property/post’; //上行 子设备上线 var device_login_topic = ‘/ext/session/‘ + productKey + ‘/‘ + deviceName + ‘/combine/login’; //上行 子设备下线 var device_logout_topic = ‘/ext/session/‘ + productKey + ‘/‘ + deviceName + ‘/combine/logout’; //下行 子设备禁用/启用/删除 var device_disable_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/disable’; var device_enable_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/enable’; var device_delete_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/delete’; //上行 设备标签上报 var label_upload_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/update’; //下行 设备标签上报 var label_upload_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/update_reply’; //上行 删除标签信息 var label_delete_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/delete’; //下行 删除标签信息 var label_delete_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/deviceinfo/delete_reply’; //下行 服务调用 var service_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/+’; //上行 服务调用响应 var service_reply_topic = ‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/service/+’ + ‘_reply’;
//程序变量 var id = 0; var pre_interval_property; var pre_interval_label; var success = JSON.stringify({“id”: “123”, “code”: 200, “data”: {}}) //设备连接回调 client.on(‘connect’, function () { console.log(‘connected’); connect(); client.subscribe(set_attributes_topic) client.subscribe(device_disable_topic) client.subscribe(device_enable_topic) client.subscribe(device_delete_topic) client.subscribe(service_topic) pre_interval_property = setPublishPropertyInterval(null, publish_telemetry_interval 1000); pre_interval_label = setPublishLabelInterval(null, publish_label_interval 1000); }); //收到消息回调 client.on(‘message’, function (topic, message) { console.log(‘on message:’) console.log(topic); console.log(message.toString()); console.log(‘———————————————————‘) let data = JSON.parse(message); //下行 服务调用正则验证 var service_topic_regex = new RegExp(service_topic.replace(“+”, “[^/]+”)) switch (topic) { case set_attributes_topic: if (data.params.hasOwnProperty(‘publish_telemetry_interval’)) { publish_telemetry_interval = data.params.publish_telemetry_interval; pre_interval_property = setPublishPropertyInterval(pre_interval_property, data.params.publish_telemetry_interval 1000); } if (data.params.hasOwnProperty(‘publish_label_interval’)) { publish_label_interval = data.params.publish_label_interval; pre_interval_label = setPublishPropertyInterval(pre_interval_label, data.params.publish_label_interval 1000); } if (data.params.hasOwnProperty(‘power’)) { power = data.params.power; } //响应设置设备属性 publish(set_attributes_topic_reply, success) break; case device_disable_topic: case device_enable_topic: case device_delete_topic: //模拟设备禁用启用删除 if (data.hasOwnProperty(‘method’)) { status = data.method.split(‘.’)[1]; //响应设备禁用启用删除 publish(‘/sys/‘ + productKey + ‘/‘ + deviceName + ‘/thing/‘ + status + ‘_reply’, success) } break; case label_delete_reply_topic: //下行 删除设备标签响应 break; case label_upload_reply_topic: //下行 标签信息上报响应 break; } //下行 服务调用 if (service_topic_regex.test(topic)) { if (data.hasOwnProperty(‘method’)) { var methodName = data.method.split(‘.’)[2] if (methodName === “power”) { power = data.params.power; } //上行 服务调用响应 publish(service_reply_topic.replace(‘+’, data.method), success);
}
}
}); //ctrl+c回调 client.on(‘SIGINT’, function () { console.log(‘disconnected’); disconnect(); client.end(); console.log(‘Exited!’); process.exit(2); }); //捕获异常 process.on(‘uncaughtException’, function (e) { console.log(‘Uncaught Exception…’); console.log(e.stack); process.exit(99); });
//alink属性上传 function publishTelemetry() { var message = { “id”: ++id, “version”: version, “sys”: { “ack”: 0 }, “params”: { “productKey”: productKey, “deviceName”: deviceName, “power”: { //alink读写属性 “value”: power, “time”: new Date().getTime() }, “publish_telemetry_interval”: { //alink读写属性 “value”: publish_telemetry_interval, “time”: new Date().getTime() }, “publish_label_interval”: { //alink读写属性 “value”: publish_label_interval, “time”: new Date().getTime() }, “temperature”: { //alink只读属性 “value”: genNextValue(23.6, 0, 45), “time”: new Date().getTime() }, ‘status’: { //alink设备启用禁用删除 ‘value’: status, ‘time’: new Date().getTime() }, “model”: { “value”: model, “time”: new Date().getTime() }, “vendor”: { “value”: vendor, “time”: new Date().getTime() }, “area”: { “value”: area, “time”: new Date().getTime() } }, “method”: “thing.event.property.post” } publish(property_post_topic, message); }
//alink标签上传 function publishLabel() { var message = { “id”: ++id, “version”: version, “sys”: { “ack”: 0, “productKey”: productKey, “deviceName”: deviceName }, “params”: [ { “attrKey”: “model”, “attrValue”: model }, { “attrKey”: “vendor”, “attrValue”: vendor }, { “attrKey”: “area”, “attrValue”: area } ], “method”: “thing.deviceinfo.update” } publish(label_upload_topic, message); }
//子设备上线 function connect() { var message = { “id”: ++id, “params”: { “productKey”: productKey, “deviceName”: deviceName, “clientId”: clientId, “timestamp”: new Date().getTime(), “signMethod”: “hmacmd5”, “sign”: “9B9C732412A4F84B981E1AB97CAB**“, “cleanSession”: “true” } } publish(device_login_topic, message); }
//子设备下线 function disconnect() { var message = { “id”: ++id, “params”: { “productKey”: productKey, “deviceName”: deviceName } } publish(device_logout_topic, JSON.stringify(message)); }
//发送数据+打印日志 function publish(topic, message) { client.publish(topic, JSON.stringify(message)); log(topic, message); }
//定时上传属性数据 function setPublishPropertyInterval(preInterval, interval) { if (status === ‘enable’) { if (preInterval != null) { clearInterval(preInterval); } return setInterval(publishTelemetry, interval); } return null; }
//定时上传标签数据 function setPublishLabelInterval(preInterval, interval) { if (status === ‘enable’) { if (preInterval != null) { clearInterval(preInterval); } return setInterval(publishLabel, interval); } return null; }
// Generates new random value that is within 3% range from previous value function genNextValue(prevValue, min, max) { var value = prevValue + ((max - min) (Math.random() - 0.5)) 0.03; value = Math.max(min, Math.min(max, value)); return Math.round(value * 10) / 10; }
//控制台打印 function log(topic, message) { console.log(‘send message:’) console.log(topic) console.log(JSON.stringify(message)) console.log(‘===================================’) }
6. 完整mqtt.json
```json
{
"broker": {
"name": "emqx",
"host": "localhost",
"port": 1884,
"clientId": "ThingsBoard_gateway",
"security": {
"type": "basic",
"username": "admin",
"password": "admin"
}
},
"mapping": [
{
"topicFilter": "/sys/+/+/thing/event/property/post",
"subscriptionQos": 0,
"converter": {
"type": "json",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}",
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "power",
"value": "${params.power.value}"
},
{
"type": "int",
"key": "publish_telemetry_interval",
"value": "${params.publish_telemetry_interval.value}"
},
{
"type": "int",
"key": "publish_label_interval",
"value": "${params.publish_telemetry_interval.value}"
},
{
"type": "string",
"key": "status",
"value": "${params.status.value}"
}
],
"timeseries": [
{
"type": "float",
"key": "temperature",
"value": "${params.temperature.value}"
}
]
}
},
{
"topicFilter": "/sys/+/+/thing/deviceinfo/update",
"subscriptionQos": 0,
"converter": {
"type": "json",
"deviceNameJsonExpression": "${sys.deviceName}",
"deviceTypeJsonExpression": "${sys.productKey}",
"timeout": 60000,
"attributes": [
{
"type": "string",
"key": "model",
"value": "${params[0].attrValue}"
},
{
"type": "string",
"key": "vendor",
"value": "${params[1].attrValue}"
},
{
"type": "string",
"key": "area",
"value": "${params[2].attrValue}"
}
]
}
}
],
"connectRequests": [
{
"topicFilter": "/ext/session/+/+/combine/login",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}"
}
],
"disconnectRequests": [
{
"topicFilter": "/ext/session/+/+/combine/logout",
"deviceNameJsonExpression": "${params.deviceName}",
"deviceTypeJsonExpression": "${params.productKey}"
}
],
"attributeUpdates": [
],
"attributeRequests": [
],
"serverSideRpc": [
{
"deviceNameFilter": ".*",
"methodFilter": "setStatus",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/${params}",
"responseTopicExpression": "/sys/${deviceType}/${deviceName}/thing/${params}_reply",
"responseTimeout": 10000,
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":{},\"method\":\"thing.${params}\"}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "setProperty",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/property/set",
"responseTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/property/set_reply",
"responseTimeout": 10000,
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":${params},\"method\":\"thing.service.property.set\"}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "replyLabelUpdate",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/deviceinfo/update_reply",
"valueExpression": "{\"id\":\"123\",\"code\":200,\"data\":{}}"
},
{
"deviceNameFilter": ".*",
"methodFilter": "service.*",
"requestTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/${methodName}",
"valueExpression": "{\"id\":\"123\",\"version\":\"1.0\",\"params\":${params},\"method\":\"thing.${methodName}\"}",
"responseTopicExpression": "/sys/${deviceType}/${deviceName}/thing/service/thing.${methodName}_reply",
"responseTimeout": 10000
}
]
}
- tb-gateway代码修改。有几处不能满足要求(ALink协议必须要productKey),添加一行,有些bug修复。这里暂时就不一一指出了。
- post模拟服务端RPC
全部使用twoway有响应的RPC调用
获取token
设备启用禁用删除
tb页面 客户端属性上报也会变
设置属性
服务调用
总结
要使用mqtt连接器,要熟悉源码,方便调试甚至更改源码BUG(bug不多)。在调试过程中可能有一些隐蔽的自身程序的问题,但看起来像tb网关出问题了,这类问题先找自身程序原因,不要有问题就说网关有bug,除非能从源码断定。
要熟练掌握mqtt.json的配置方式,还有源码,因为有些值不提供但你一定要用,就需要自己做些修改,比如alink协议topic上的productKey。以及掌握交互流程,这个比较容易晕头,因为子设备到tb,有俩个网关,俩个mqtt服务器,中间还要做格式转换。
了解俩种平台的上下行协议及功能对应。
后续还有些功能没有来得及实现,找机会补上,或者不补了吧。
没有什么不可能。