概述
在本教程中你将学习如何创建自定义节点:
- 过滤节点检查入站消息的遥测数据中是否存在key。如果所选密钥存在-通过True链发送消息,否则使用False链。


- 富集节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将结果添加到消息元数据中。


- 转换节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将总和添加到新的消息payload中。
先决条件
我们假设你已完成以下指南并查看了以下文章:
- 入门 guide.
- 规则引擎概述 article.
规则引擎架构 article.
定制
为了创建新的规则节点,你应该实现TbNode 接口:
package org.thingsboard.rule.engine.api;...public interface TbNode {void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;void destroy();}
并使用以下引用运行时的多值注释来注释你的实现:
org.thingsboard.rule.engine.api.RuleNode
每个规则节点也可以具有实现NodeConfiguration接口的配置类。
package org.thingsboard.rule.engine.api;public interface NodeConfiguration<T extends NodeConfiguration> {T defaultConfiguration();}
TbKeyFilterNodeConfiguration 类:
package org.thingsboard.rule.engine.node.filter;import lombok.Data;import org.thingsboard.rule.engine.api.NodeConfiguration;@Datapublic class TbKeyFilterNodeConfiguration implements NodeConfiguration<TbKeyFilterNodeConfiguration> {private String key;@Overridepublic TbKeyFilterNodeConfiguration defaultConfiguration() {TbKeyFilterNodeConfiguration configuration = new TbKeyFilterNodeConfiguration();configuration.setKey(null);return configuration;}}
TbCalculateSumNodeConfiguration 类: ``` package org.thingsboard.rule.engine.node.transform;
import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration;
@Datapublic class TbCalculateSumNodeConfiguration implements NodeConfiguration<TbCalculateSumNodeConfiguration> {private String inputKey;private String outputKey;@Overridepublic TbCalculateSumNodeConfiguration defaultConfiguration() {TbCalculateSumNodeConfiguration configuration = new TbCalculateSumNodeConfiguration();configuration.setInputKey("temperature");configuration.setOutputKey("TemperatureSum");return configuration;}}
- [TbGetSumIntoMetadataConfiguration](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadataConfiguration.java) 类:
package org.thingsboard.rule.engine.node.enrichment;
import lombok.Data;import org.thingsboard.rule.engine.api.NodeConfiguration;@Datapublic class TbGetSumIntoMetadataConfiguration implements NodeConfiguration<TbGetSumIntoMetadataConfiguration> {private String inputKey;private String outputKey;@Overridepublic TbGetSumIntoMetadataConfiguration defaultConfiguration() {TbGetSumIntoMetadataConfiguration configuration = new TbGetSumIntoMetadataConfiguration();configuration.setInputKey("temperature");configuration.setOutputKey("TemperatureSum");return configuration;}}
配置类在规则节点类中定义。<a name="b964ee3b"></a>### 方法流程在本节中介绍**TbNode**接口实现每个方法的目的:<a name="ccacb5c7"></a>#### init方法
void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
创建规则节点后对其进行初始化的方法。上述每个规则节点的**init**方法的内容全部一样。我们将传入的JSON配置转换为特定的[NodeConfiguration](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java)实现。- [**检查key**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/filter/TbKeyFilterNode.java):
private TbKeyFilterNodeConfiguration config; private String key;
@Overridepublic void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);key = config.getKey();}
- [**求和计算**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/transform/TbCalculateSumNode.java):
private TbCalculateSumConfiguration config; private String inputKey; private String outputKey;
@Overridepublic void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {this.config = TbNodeUtils.convert(configuration, TbCalculateSumConfiguration.class);inputKey = config.getInputKey();outputKey = config.getOutputKey();}
- [**求和放入元数据**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadata.java):
private TbGetSumIntoMetadataConfiguration config; private String inputKey; private String outputKey;
@Overridepublic void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {this.config = TbNodeUtils.convert(configuration, TbGetSumIntoMetadataConfiguration.class);inputKey = config.getInputKey();outputKey = config.getOutputKey();}
仅在创建或更新规则节点后才调用它,并接受两个输入参数:- [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)是一个接口,可让规则节点访问大多数服务,例如,将遥测保存到数据库,并通过WebSockets通知实体数据更改的所有订阅:
ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));
- [**TbNodeConfiguration**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java) TbNodeConfiguration是一个简单的类,只有一个在规则节点Web UI上处理的字段:
private final JsonNode data;
<a name="0a23ee4d"></a>#### onMsg方法
void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
处理接收到消息的方法。每当消息到达节点时都会调用它。并且还接受两个输入参数:- [**TbMsg**](https://github.com/thingsboard/thingsboard/blob/release-2.0/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java)是最终的序列化类,它使你可以访问消息中的字段,并且还允许你复制消息,将消息转换为ByteBuffer以及进行其他操作。<br />可用字段:<br />
msg.getData(); msg.getMetaData(); msg.getOriginator(); msg.getType(); msg.getId(); msg.getRuleNodeId(); msg.getRuleChainId(); msg.getClusterPartition(); msg.getDataType();
复制消息:
TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
**×注意**×在群集模式下使用。
- [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)接口还具有在链上路由出站消息的方法,例如 <br />true & false:
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? “True” : “False”);
failure:
ctx.tellFailure(msg, e);
ctx.tellNext(msg, FAILURE, new Exception());
tellSelf和updateSelf方法:
ctx.tellSelf(tickMsg, curDelay);
ctx.updateSelf(ruleNode);
```***注意***tellSelf()方法在基于特定延迟的规则节点中使用。每个更新规则节点上使用的updateSelf()方法。
此处, TbContext允许创建新消息:
String data = "{temperature:20, humidity:30}"ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), data);
并转换消息:
ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
***注意***这些方法之间的区别在于:TbMsg的newMsg方法使用新的messageId创建一条新消息;TbMsg的transformMsg方法转换已存在的消息;
destroy方法
void destroy();
构建
克隆规则节点示例项目:
git clone git@github.com:thingsboard/rule-node-examples.git
从rule-node-examples文件夹执行以下命令以构建项目:
重新启动ThingsBoard服务器端容器。请参考以下链接以查看如何执行相关操作: 服务器端容器运行.
**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
Thingsboard服务:
你需要先执行以下命令将jar文件移动到Thingsboard扩展:
sudo mv rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
接下来执行以下操作以将权限更改为Thingsboard:
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重启Thingsboard服务:
sudo service thingsboard restart**一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**
UI 配置
ThingsBoard规则节点UI在官方github仓库. 另一个项目。请参阅以下连接 查看相关说明。
在热部署模式下运行Rule Node UI容器
要以热重新部署方式运行Rule Node UI容器:
你需要先在server.js文件中将常量ruleNodeUiforwardPort从8080更改为5000,该常量应位于此处:
cd ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
其次你需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:以热重新部署模式运行UI容器。
接下来你需要将server.js文件中的常量forwardPort从8080更改为3000,应在此处:
cd ${TB_RULE_NODE_UI_WORK_DIR}/ui/server.js
最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行如下命令:
npm start
下一步
入门指南 - 这些指南提供了ThingsBoard主要功能的快速概述。
- 安装指南 - 了解如何在各种操作系统上安装ThingsBoard。
- 设备连接 - 了解如何根据您的连接方式或解决方案连接设备。
- 数据看板 - 这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。
- 数据处理 - 了解如何使用ThingsBoard规则引擎。
- 数据分析 - 了解如何使用规则引擎执行基本的分析任务。
- 硬件样品 - 了解如何将各种硬件平台连接到ThingsBoard。
- 高级功能 - 了解高级ThingsBoard功能。

