概述

在本教程中你将学习如何创建自定义节点:

  • 过滤节点检查入站消息的遥测数据中是否存在key。如果所选密钥存在-通过True链发送消息,否则使用False链。

规则引擎开发 - 图1
规则引擎开发 - 图2

  • 富集节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将结果添加到消息元数据中。

规则引擎开发 - 图3
规则引擎开发 - 图4

  • 转换节点当字段以指定的Input Key开头时,将为每个设备分别计算遥测数据的总和,然后使用Output Key将总和添加到新的消息payload中。

规则引擎开发 - 图5
规则引擎开发 - 图6

先决条件

我们假设你已完成以下指南并查看了以下文章:

  • 入门 guide.
  • 规则引擎概述 article.
  • 规则引擎架构 article.

    定制

    为了创建新的规则节点,你应该实现TbNode 接口:

    1. package org.thingsboard.rule.engine.api;
    2. ...
    3. public interface TbNode {
    4. void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;
    5. void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;
    6. void destroy();
    7. }

    并使用以下引用运行时的多值注释来注释你的实现:

    1. org.thingsboard.rule.engine.api.RuleNode

    每个规则节点也可以具有实现NodeConfiguration接口的配置类。

    1. package org.thingsboard.rule.engine.api;
    2. public interface NodeConfiguration<T extends NodeConfiguration> {
    3. T defaultConfiguration();
    4. }
  • TbKeyFilterNodeConfiguration 类:

    1. package org.thingsboard.rule.engine.node.filter;
    2. import lombok.Data;
    3. import org.thingsboard.rule.engine.api.NodeConfiguration;
    4. @Data
    5. public class TbKeyFilterNodeConfiguration implements NodeConfiguration<TbKeyFilterNodeConfiguration> {
    6. private String key;
    7. @Override
    8. public TbKeyFilterNodeConfiguration defaultConfiguration() {
    9. TbKeyFilterNodeConfiguration configuration = new TbKeyFilterNodeConfiguration();
    10. configuration.setKey(null);
    11. return configuration;
    12. }
    13. }
  • TbCalculateSumNodeConfiguration 类: ``` package org.thingsboard.rule.engine.node.transform;

    import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration;

  1. @Data
  2. public class TbCalculateSumNodeConfiguration implements NodeConfiguration<TbCalculateSumNodeConfiguration> {
  3. private String inputKey;
  4. private String outputKey;
  5. @Override
  6. public TbCalculateSumNodeConfiguration defaultConfiguration() {
  7. TbCalculateSumNodeConfiguration configuration = new TbCalculateSumNodeConfiguration();
  8. configuration.setInputKey("temperature");
  9. configuration.setOutputKey("TemperatureSum");
  10. return configuration;
  11. }
  12. }
  1. - [TbGetSumIntoMetadataConfiguration](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadataConfiguration.java) 类:

package org.thingsboard.rule.engine.node.enrichment;

  1. import lombok.Data;
  2. import org.thingsboard.rule.engine.api.NodeConfiguration;
  3. @Data
  4. public class TbGetSumIntoMetadataConfiguration implements NodeConfiguration<TbGetSumIntoMetadataConfiguration> {
  5. private String inputKey;
  6. private String outputKey;
  7. @Override
  8. public TbGetSumIntoMetadataConfiguration defaultConfiguration() {
  9. TbGetSumIntoMetadataConfiguration configuration = new TbGetSumIntoMetadataConfiguration();
  10. configuration.setInputKey("temperature");
  11. configuration.setOutputKey("TemperatureSum");
  12. return configuration;
  13. }
  14. }
  1. 配置类在规则节点类中定义。
  2. <a name="b964ee3b"></a>
  3. ### 方法流程
  4. 在本节中介绍**TbNode**接口实现每个方法的目的:
  5. <a name="ccacb5c7"></a>
  6. #### init方法

void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException;

  1. 创建规则节点后对其进行初始化的方法。上述每个规则节点的**init**方法的内容全部一样。我们将传入的JSON配置转换为特定的[NodeConfiguration](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/NodeConfiguration.java)实现。
  2. - [**检查key**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/filter/TbKeyFilterNode.java):

private TbKeyFilterNodeConfiguration config; private String key;

  1. @Override
  2. public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
  3. this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
  4. key = config.getKey();
  5. }
  1. - [**求和计算**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/transform/TbCalculateSumNode.java):

private TbCalculateSumConfiguration config; private String inputKey; private String outputKey;

  1. @Override
  2. public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  3. this.config = TbNodeUtils.convert(configuration, TbCalculateSumConfiguration.class);
  4. inputKey = config.getInputKey();
  5. outputKey = config.getOutputKey();
  6. }
  1. - [**求和放入元数据**](https://github.com/thingsboard/rule-node-examples/blob/master/src/main/java/org/thingsboard/rule/engine/node/enrichment/TbGetSumIntoMetadata.java):

private TbGetSumIntoMetadataConfiguration config; private String inputKey; private String outputKey;

  1. @Override
  2. public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  3. this.config = TbNodeUtils.convert(configuration, TbGetSumIntoMetadataConfiguration.class);
  4. inputKey = config.getInputKey();
  5. outputKey = config.getOutputKey();
  6. }
  1. 仅在创建或更新规则节点后才调用它,并接受两个输入参数:
  2. - [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)是一个接口,可让规则节点访问大多数服务,例如,将遥测保存到数据库,并通过WebSockets通知实体数据更改的所有订阅:

ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg));

  1. - [**TbNodeConfiguration**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeConfiguration.java) TbNodeConfiguration是一个简单的类,只有一个在规则节点Web UI上处理的字段:

private final JsonNode data;

  1. <a name="0a23ee4d"></a>
  2. #### onMsg方法

void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException;

  1. 处理接收到消息的方法。每当消息到达节点时都会调用它。并且还接受两个输入参数:
  2. - [**TbMsg**](https://github.com/thingsboard/thingsboard/blob/release-2.0/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java)是最终的序列化类,它使你可以访问消息中的字段,并且还允许你复制消息,将消息转换为ByteBuffer以及进行其他操作。<br />可用字段:<br />

msg.getData(); msg.getMetaData(); msg.getOriginator(); msg.getType(); msg.getId(); msg.getRuleNodeId(); msg.getRuleChainId(); msg.getClusterPartition(); msg.getDataType();

  1. 复制消息:

TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);

  1. **×注意**×在群集模式下使用。
  1. - [**TbContext**](https://github.com/thingsboard/thingsboard/blob/release-2.0/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java)接口还具有在链上路由出站消息的方法,例如
  2. ![](https://cdn.nlark.com/yuque/0/2021/png/667839/1610765583934-e72c4526-8a7b-49d9-939e-664725b6de1e.png#align=left&display=inline&height=350&margin=%5Bobject%20Object%5D&originHeight=350&originWidth=680&size=0&status=done&style=none&width=680) ![](https://cdn.nlark.com/yuque/0/2021/png/667839/1610765583943-167e717e-d1bd-41c6-b1a7-b111a2bf2abc.png#align=left&display=inline&height=350&margin=%5Bobject%20Object%5D&originHeight=350&originWidth=680&size=0&status=done&style=none&width=680)<br />true & false:

ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? “True” : “False”);

  1. failure:

ctx.tellFailure(msg, e);

  1. ctx.tellNext(msg, FAILURE, new Exception());
  1. tellSelfupdateSelf方法:

ctx.tellSelf(tickMsg, curDelay);

  1. ctx.updateSelf(ruleNode);
  1. ```
  2. ***注意***tellSelf()方法在基于特定延迟的规则节点中使用。每个更新规则节点上使用的updateSelf()方法。

此处, TbContext允许创建新消息:

  1. String data = "{temperature:20, humidity:30}"
  2. ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), data);

并转换消息:

  1. ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
  1. ***注意***这些方法之间的区别在于:
  2. TbMsgnewMsg方法使用新的messageId创建一条新消息;
  3. TbMsgtransformMsg方法转换已存在的消息;

destroy方法

  1. void destroy();

仅在规则节点停止或更新并且没有输入参数之后才调用的此方法。

构建

  • 克隆规则节点示例项目:

    1. git clone git@github.com:thingsboard/rule-node-examples.git
  • 从rule-node-examples文件夹执行以下命令以构建项目:

    • 注意 你需要先从Thingsboard文件夹执行此命令 。
      1. mvn clean install

      将可执行的jar文件导入到ThingsBoard实例中

      将jar文件作为依赖库导入到Thingsboard项目中,应该在这里:
      1. ./target/rule-engine-1.0.0-custom-nodes.jar

      使用IDE的Thingsboard:

  • 请参阅IDEAEclipse相关说明.

重新启动ThingsBoard服务器端容器。请参考以下链接以查看如何执行相关操作: 服务器端容器运行.

  1. **一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**

Thingsboard服务:

  • 你需要先执行以下命令将jar文件移动到Thingsboard扩展:

    1. sudo mv rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
  • 接下来执行以下操作以将权限更改为Thingsboard:

    1. sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*

    重启Thingsboard服务:

    1. sudo service thingsboard restart
    2. **一旦ThingsBoard重新启动,你需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面**

    UI 配置

    ThingsBoard规则节点UI在官方github仓库. 另一个项目。请参阅以下连接 查看相关说明。

    在热部署模式下运行Rule Node UI容器

    要以热重新部署方式运行Rule Node UI容器:

  • 你需要先在server.js文件中将常量ruleNodeUiforwardPort从8080更改为5000,该常量应位于此处:

    1. cd ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
  • 其次你需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:以热重新部署模式运行UI容器

  • 接下来你需要将server.js文件中的常量forwardPort8080更改为3000,应在此处:

    1. cd ${TB_RULE_NODE_UI_WORK_DIR}/ui/server.js
  • 最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行如下命令:

    1. npm start

    这会将规则节点UI请求转发到在4200端口上侦听的服务器。

    下一步

  • 入门指南 - 这些指南提供了ThingsBoard主要功能的快速概述。

  • 安装指南 - 了解如何在各种操作系统上安装ThingsBoard。
  • 设备连接 - 了解如何根据您的连接方式或解决方案连接设备。
  • 数据看板 - 这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。
  • 数据处理 - 了解如何使用ThingsBoard规则引擎。
  • 数据分析 - 了解如何使用规则引擎执行基本的分析任务。
  • 硬件样品 - 了解如何将各种硬件平台连接到ThingsBoard。
  • 高级功能 - 了解高级ThingsBoard功能。