概述
在本教程中你将学习如何创建自定义节点:
- 过滤节点检查入站消息的遥测数据中是否存在key。如果所选密钥存在-通过True链发送消息,否则使用False链。
- 富集节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将结果添加到消息元数据中。
- 转换节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将总和添加到新的消息payload中。
先决条件
我们假设你已完成以下指南并查看了以下文章:
- 入门 guide.
- 规则引擎概述 article.
规则引擎架构 article.
定制
为了创建新的规则节点,你应该实现TbNode 接口:
package org.thingsboard.rule.engine.api;
...
public interface TbNode {
void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
void destroy();
}
并使用以下引用运行时的多值注释来注释你的实现:
org.thingsboard.rule.engine.api.RuleNode
每个规则节点也可以具有实现NodeConfiguration接口的配置类。
package org.thingsboard.rule.engine.api;
public interface NodeConfiguration<T extends NodeConfiguration> {
T defaultConfiguration();
}
TbKeyFilterNodeConfiguration 类:
package org.thingsboard.rule.engine.node.filter;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbKeyFilterNodeConfiguration implements NodeConfiguration<TbKeyFilterNodeConfiguration> {
private String key;
@Override
public TbKeyFilterNodeConfiguration defaultConfiguration() {
TbKeyFilterNodeConfiguration configuration = new TbKeyFilterNodeConfiguration();
configuration.setKey(null);
return configuration;
}
}
TbCalculateSumNodeConfiguration 类: ``` package org.thingsboard.rule.engine.node.transform;
import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbCalculateSumNodeConfiguration implements NodeConfiguration<TbCalculateSumNodeConfiguration> {
private String inputKey;
private String outputKey;
@Override
public TbCalculateSumNodeConfiguration defaultConfiguration() {
TbCalculateSumNodeConfiguration configuration = new TbCalculateSumNodeConfiguration();
configuration.setInputKey("temperature");
configuration.setOutputKey("TemperatureSum");
return configuration;
}
}
- [TbGetSumIntoMetadataConfiguration](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadataConfiguration.java) 类:
package org.thingsboard.rule.engine.node.enrichment;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
@Data
public class TbGetSumIntoMetadataConfiguration implements NodeConfiguration<TbGetSumIntoMetadataConfiguration> {
private String inputKey;
private String outputKey;
@Override
public TbGetSumIntoMetadataConfiguration defaultConfiguration() {
TbGetSumIntoMetadataConfiguration configuration = new TbGetSumIntoMetadataConfiguration();
configuration.setInputKey("temperature");
configuration.setOutputKey("TemperatureSum");
return configuration;
}
}
配置类在规则节点类中定义。
<a name="b964ee3b"></a>
### 方法流程
在本节中介绍**TbNode**接口实现每个方法的目的:
<a name="ccacb5c7"></a>
#### init方法
void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
创建规则节点后对其进行初始化的方法。上述每个规则节点的**init**方法的内容全部一样。我们将传入的JSON配置转换为特定的[NodeConfiguration](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java)实现。
- [**检查key**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/filter/TbKeyFilterNode.java):
private TbKeyFilterNodeConfiguration config; private String key;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
key = config.getKey();
}
- [**求和计算**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/transform/TbCalculateSumNode.java):
private TbCalculateSumConfiguration config; private String inputKey; private String outputKey;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbCalculateSumConfiguration.class);
inputKey = config.getInputKey();
outputKey = config.getOutputKey();
}
- [**求和放入元数据**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadata.java):
private TbGetSumIntoMetadataConfiguration config; private String inputKey; private String outputKey;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbGetSumIntoMetadataConfiguration.class);
inputKey = config.getInputKey();
outputKey = config.getOutputKey();
}
仅在创建或更新规则节点后才调用它,并接受两个输入参数:
- [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)是一个接口,可让规则节点访问大多数服务,例如,将遥测保存到数据库,并通过WebSockets通知实体数据更改的所有订阅:
ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
- [**TbNodeConfiguration**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java) TbNodeConfiguration是一个简单的类,只有一个在规则节点Web UI上处理的字段:
private final JsonNode data;
<a name="0a23ee4d"></a>
#### onMsg方法
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
处理接收到消息的方法。每当消息到达节点时都会调用它。并且还接受两个输入参数:
- [**TbMsg**](https://github.com/thingsboard/thingsboard/blob/release-2.0/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java)是最终的序列化类,它使你可以访问消息中的字段,并且还允许你复制消息,将消息转换为ByteBuffer以及进行其他操作。<br />可用字段:<br />
msg.getData(); msg.getMetaData(); msg.getOriginator(); msg.getType(); msg.getId(); msg.getRuleNodeId(); msg.getRuleChainId(); msg.getClusterPartition(); msg.getDataType();
复制消息:
TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
**×注意**×在群集模式下使用。
- [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)接口还具有在链上路由出站消息的方法,例如
![](https://cdn.nlark.com/yuque/0/2021/png/667839/1610765583934-e72c4526-8a7b-49d9-939e-664725b6de1e.png#align=left&display=inline&height=350&margin=%5Bobject%20Object%5D&originHeight=350&originWidth=680&size=0&status=done&style=none&width=680) ![](https://cdn.nlark.com/yuque/0/2021/png/667839/1610765583943-167e717e-d1bd-41c6-b1a7-b111a2bf2abc.png#align=left&display=inline&height=350&margin=%5Bobject%20Object%5D&originHeight=350&originWidth=680&size=0&status=done&style=none&width=680)<br />true & false:
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? “True” : “False”);
failure:
ctx.tellFailure(msg, e);
ctx.tellNext(msg, FAILURE, new Exception());
tellSelf和updateSelf方法:
ctx.tellSelf(tickMsg, curDelay);
ctx.updateSelf(ruleNode);
```
***注意***tellSelf()方法在基于特定延迟的规则节点中使用。每个更新规则节点上使用的updateSelf()方法。
此处, TbContext允许创建新消息:
String data = "{temperature:20, humidity:30}"
ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), data);
并转换消息:
ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
***注意***这些方法之间的区别在于:
TbMsg的newMsg方法使用新的messageId创建一条新消息;
TbMsg的transformMsg方法转换已存在的消息;
destroy方法
void destroy();
构建
克隆规则节点示例项目:
git clone git@github.com:thingsboard/rule-node-examples.git
从rule-node-examples文件夹执行以下命令以构建项目:
重新启动ThingsBoard服务器端容器。请参考以下链接以查看如何执行相关操作: 服务器端容器运行.
**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
Thingsboard服务:
你需要先执行以下命令将jar文件移动到Thingsboard扩展:
sudo mv rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
接下来执行以下操作以将权限更改为Thingsboard:
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重启Thingsboard服务:
sudo service thingsboard restart
**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
UI 配置
ThingsBoard规则节点UI在官方github仓库. 另一个项目。请参阅以下连接 查看相关说明。
在热部署模式下运行Rule Node UI容器
要以热重新部署方式运行Rule Node UI容器:
你需要先在server.js文件中将常量ruleNodeUiforwardPort从8080更改为5000,该常量应位于此处:
cd ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
其次你需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:以热重新部署模式运行UI容器。
接下来你需要将server.js文件中的常量forwardPort从8080更改为3000,应在此处:
cd ${TB_RULE_NODE_UI_WORK_DIR}/ui/server.js
最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行如下命令:
npm start
下一步
入门指南 - 这些指南提供了ThingsBoard主要功能的快速概述。
- 安装指南 - 了解如何在各种操作系统上安装ThingsBoard。
- 设备连接 - 了解如何根据您的连接方式或解决方案连接设备。
- 数据看板 - 这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。
- 数据处理 - 了解如何使用ThingsBoard规则引擎。
- 数据分析 - 了解如何使用规则引擎执行基本的分析任务。
- 硬件样品 - 了解如何将各种硬件平台连接到ThingsBoard。
- 高级功能 - 了解高级ThingsBoard功能。