:::info
匹配版本:V3.20
页面状态:未阅读
原文地址:点击跳转
:::
概述
在本教程中,您将学习如何创建自定义规则节点并将其添加到ThingsBoard服务器实例。我们将审查三种不同类型的规则节点:过滤器,扩展和转换。
先决条件
我们假设您已完成以下指南并查看了下面列出的文章:
我们还假定您已安装以下第三方:
- OpenJDK的8
- Maven 3.6.0+
- 任何现代的Java IDE,尽管我们建议使用IntelliJ IDEA
- [可选]将Lombok插件安装到您喜欢的IDE。
步骤1.下载并构建示例项目
克隆存储库并导航到repo文件夹:git clone https://github.com/thingsboard/rule-node-examples
cd rule-node-examples
默认情况下,示例项目配置为使用ThingsBoard Community Edition的API。这使您的规则节点与平台的社区版和专业版都兼容。
如果您想使用某些专有的Professional Edition API(例如使用实体组等),则应在Thingsboard.yml中更改“ thingsboard.version”参数:
nano pom.xml
例如,下面的属性设置为3.2.0PE Professional Edition:
...
<properties>
...
<thingsboard.version>3.2.0PE</thingsboard.version>
...
</properties>
...
最后,构建项目:
mvn clean install
预期产量:
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.431 s
[INFO] Finished at: 2020-08-18T11:01:40+03:00
[INFO] ------------------------------------------------------------------------
步骤2.将项目导入到IDE
确保将Lombok插件安装到您喜欢的IDE中。将项目作为Maven项目导入到您喜欢的IDE中。
步骤3.创建您的规则节点
为了创建新的规则节点,您应该实现 TbNode接口,并使用 RuleNode批注对其进行批注。
例如,您可以查看一个非常简单的规则节点,该规则节点根据消息有效负载中密钥的存在来过滤传入的消息。该规则节点是您在上一步中下载的项目的一部分。
@RuleNode(
type = ComponentType.FILTER,
name = "check key",
relationTypes = {"True", "False"},
configClazz = TbKeyFilterNodeConfiguration.class,
nodeDescription = "Checks the existence of the selected key in the message payload.",
nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/custom-nodes-config.js"},
configDirective = "tbFilterNodeCheckKeyConfig")
public class TbKeyFilterNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private TbKeyFilterNodeConfiguration config;
private String key;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
key = config.getKey();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
} catch (IOException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void destroy() {
}
}
@RuleNode批注
@RuleNode注释定义节点类型,名称,描述,UI形式和出站[关系]。
让我们来看看可用的参数:
- type是可用的“规则节点类型”之一。此参数影响“规则链编辑器”的哪个部分将包含您的“规则节点”。
- 名称-用于规则链编辑器和调试消息的规则节点的任何合理名称;
- nodeDescription-您的节点的简短描述。在规则链编辑器中可见;
- nodeDetails-带有html标签支持的节点的完整描述。在规则链编辑器中可见;
- configClazz-描述配置json的类的完整类名。
- RelationTypes-具有预定义关系类型的字符串数组;该值应对应于TbContext.tellNext方法中使用的值;
- customRelations-布尔值,指示您将在TbContext.tellNext方法中使用任何自定义关系;
- configDirective-基于Angular的UI指令的名称,该指令将允许用户编辑规则节点的配置。这是可选的,可以为空。在这种情况下,用户将看到原始的JSON编辑器;
- uiResources-包含配置指令的Angular UI文件的路径。这是可选的,可以为空。在这种情况下,用户将看到原始的JSON编辑器;
- 图标-来自角材料包的图标名称;
- iconUrl-图标的完整URL,将用于在“规则链编辑器”中的节点列表中显示规则节点;
docUrl-链接到当前规则节点的文档页面,该页面将在“规则链编辑器”中提供。
规则节点生命周期
该“初始化”方法是通过规则引擎创建了新的规则节点时调用。如果有人将规则节点添加到规则链或系统停止,则可能会发生这种情况。此方法主要用于解析作为JSON对象的配置或获取TbContext的本地副本。“ TbNodeUtils.convert”正在将原始配置解析为指定类的java对象。
在“消灭”方法是由规则引擎时,该规则节点被破坏调用。如果有人从规则链中删除规则节点或系统停止,则可能会发生这种情况。
当用户决定更改现有规则节点的配置时,规则引擎将依次调用“销毁”和“初始化”方法。处理传入消息
规则节点实现必须使用以下方法之一来通知规则引擎消息已成功处理:
/**
* Indicates that message was successfully processed by the rule node.
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using "Success" relationType.
*
* @param msg
*/
void tellSuccess(TbMsg msg);
/**
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using specified relationType.
*
* @param msg
* @param relationType
*/
void tellNext(TbMsg msg, String relationType);
/**
* Sends message to all Rule Nodes in the Rule Chain
* that are connected to the current Rule Node using one of specified relationTypes.
*
* @param msg
* @param relationTypes
*/
void tellNext(TbMsg msg, Set<String> relationTypes);
如果消息处理失败,则规则节点实现必须调用“ tellFailure”方法:
/**
* Notifies Rule Engine about failure to process current message.
*
* @param msg - message
* @param th - exception
*/
void tellFailure(TbMsg msg, Throwable th);
如果规则节点实现不会调用上面列出的任何方法,则规则引擎将等待可配置的超时并阻止其他消息的处理,并最终将当前消息标记为失败。
使用ThingsBoard服务
该TbContext包含了很多有用的服务“干将”。请不要忘记在您喜欢的IDE中按“下载源代码”,以简化这些服务的界面浏览;下面列出了可用的服务获取程序的简短列表:
// Allows to work with entity attributes: get and save them;
AttributesService getAttributesService();
// Allows CRUD (Create, Read, Updat, Delete) operations over the customer entities;
CustomerService getCustomerService();
// Allows CRUD operations over users;
UserService getUserService();
// Allows CRUD operations over assets;
AssetService getAssetService();
// Allows CRUD operations over devices;
DeviceService getDeviceService();
// Allows CRUD operations over entity views;
EntityViewService getEntityViewService();
// Allows to programmatically create and manage dashboards;
DashboardService getDashboardService();
// Allows to create and clear alarms;
RuleEngineAlarmService getAlarmService();
// Allows to programmatically create and manage rule chains;
RuleChainService getRuleChainService();
// Allows to send RPC commands to devices;
RuleEngineRpcService getRpcService();
// Allows to store telemetry to the database and push notifications to the dashbaords via WebSockets;
RuleEngineTelemetryService getTelemetryService();
// Allows to find telemetry and save it to the database without notifications to the dashboards;
TimeseriesService getTimeseriesService();
// Allows to programmatically query and manage entity relations;
RelationService getRelationService();
ThingsBoard PE用户可以使用TbContext.getPeContext()方法访问其他服务。TbPeContext提供对以下服务的访问:
// Allows to programmatically create and manage integrations;
IntegrationService getIntegrationService();
// Allows to programmatically create and manage entity groups;
EntityGroupService getEntityGroupService();
// Allows to programmatically create reports;
ReportService getReportService();
// Allows to programmatically manage blob entities;
BlobEntityService getBlobEntityService();
// Allows to programmatically manage group permissions;
GroupPermissionService getGroupPermissionService();
// Allows to programmatically manage roles;
RoleService getRoleService();
// Get entity owner (TenantId or CustomerId)
EntityId getOwner(TenantId tenantId, EntityId entityId);
// Clear entity owners cache
void clearOwners(EntityId entityId);
// Get all sub-customers of the current entity
Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);
// Allows to change entity owner. Expects TenantId or CustomerId as targetOwnerId
void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws ThingsboardException;
void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws ThingsboardException;
void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws ThingsboardException;
void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws ThingsboardException;
void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws ThingsboardException;
void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws ThingsboardException;
void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws ThingsboardException;
// Allows to push custom downlink message to the integration
void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);
从规则节点创建新消息
可能有必要创建并将当前消息派生的消息并将其推送到规则引擎。例如,让我们编写一个自定义规则节点,该节点将消息从当前客户复制到所有客户设备:
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
EntityId msgOriginator = msg.getOriginator();
// Checking that the message originator is a Customer;
if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
CustomerId customerId = new CustomerId(msgOriginator.getId());
boolean hasNext = true;
// Creating the page link to iterate through the devices;
PageLink pageLink = new PageLink(1024);
while (hasNext) {
// Using the Device Service to get devices from the database;
PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
hasNext = devices.hasNext();
pageLink = pageLink.nextPageLink();
for (Device device : devices.getData()) {
// Creating new message with different originator
TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
// Pushing new message to the queue instead of tellNext to make sure that the message will be persisted;
ctx.enqueueForTellNext(newMsg, "Success");
}
}
// Don't forget to acknowledge original message or use ctx.tellSuccess(msg);
ctx.ack(msg);
} else {
ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
}
}
您可能会注意到,我们已经使用TbContext.enqueueForTellNext方法将新消息推送到规则引擎。该消息将基于关系类型被推送到相关的规则节点。另一种选择是将消息放在处理的开始,基本上放在根规则链上。
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
另外,您可以使用略有不同的方法,该方法还允许您接收确认新消息已成功推送到队列的确认:
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
多线程
规则引擎是参与者模型的实现,该参与者模型针对规则节点邮箱中的每个新消息顺序调用 TbNode.onMsg方法。因此,如果您在同一线程中处理消息,则您的实现是线程安全的。
但是,出于性能原因,大多数API调用是在单独的线程中执行的。例如,让我们回顾一下如何保存传入消息中的遥测:@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// Parsing the incoming message;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
// Converting temperature from °F to °C
double temperatureF = json.get("temperature").asDouble();
double temperatureC = (temperatureF - 32) * 5 / 9;
// Creating the telemetry data point
TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
// Using async API call to save telemetry with the callback
ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void aVoid) {
// Telemetry is saved, now we can acknowledge the message;
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable throwable) {
// Telemetry is not saved, we need rule engine to reprocess the message;
ctx.tellFailure(msg, throwable);
}
});
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
您可能会注意到,我们通过TbContext.tellSuccess在回调线程而非主线程中“确认”或“转发”消息。
集群模式
为每个规则引擎微服务启动规则节点的单个实例。例如,如果您有三个规则引擎实例,则每个实例将启动一个RuleNode实例。规则引擎消息是根据消息的始发者ID(设备或资产ID)进行分区的。因此,来自一台设备的消息将始终转到特定规则引擎微服务上的同一规则节点实例。唯一的例外情况是添加或删除规则节点时。在这种情况下,将发生“重新分区”事件。
作为规则节点开发人员,您可以覆盖默认方法TbNode.onPartitionChangeMsg 以对集群拓扑的更改做出反应。这对于有状态节点根据消息的始发者(设备/资产)ID决定缓存信息很有用。为了确定当前实体ID属于当前分配的分区列表,可以使用TbContext.isLocalEntity。请参阅下面的完整示例:package org.thingsboard.rule.engine.node.filter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@Slf4j
@RuleNode(
type = ComponentType.FILTER,
name = "Cache example",
relationTypes = {"True", "False"},
configClazz = EmptyNodeConfiguration.class,
nodeDescription = "Checks that the incoming value exceeds certain threshold",
nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig")
public class TbCacheExampleNode implements TbNode {
private static final ObjectMapper mapper = new ObjectMapper();
private ConcurrentMap<EntityId, Double> cache;
@Override
public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
this.cache = new ConcurrentHashMap<>();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
// Parsing the incoming message;
ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
double temperature = json.get("temperature").asDouble();
// Fetching temperatureThreshold attribute from cache or from the database
Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
// Compare and do something with the result of comparison;
ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
} catch (JsonProcessingException e) {
ctx.tellFailure(msg, e);
}
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// Cleanup the cache for all entities that are no longer assigned to current server partitions
cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
}
private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
// Get value from cache or from the database.
return cache.computeIfAbsent(entityId, id -> {
try {
Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
if (attr.isPresent()) {
return attr.get().getDoubleValue().orElse(defaultValue);
} else {
return defaultValue;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void destroy() {
// In case you have changed the configuration, it is good idea to clear the entire cache.
cache.clear();
}
}
步骤4.将自定义规则节点导入ThingsBoard实例
完成规则节点的编码后,再次执行build命令:
mvn clean install
然后,将jar文件作为依赖关系库定位到ThingsBoard项目。构建结果位于此处:
target/rule-engine-1.0.0-custom-nodes.jar
现在,您可以将带有规则节点的jar文件添加到ThingsBoard实例中:
如果将ThingsBoard作为服务安装,请使用步骤4.1。
如果ThingsBoard是从源代码构建的,并且是从IDE本地启动的,则使用步骤4.2
步骤4.1将JAR文件添加到作为服务安装的ThingsBoard
首先,您需要执行以下命令将jar文件复制到ThingsBoard扩展:
sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
接下来,执行以下操作以将所有者更改为ThingsBoard:
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*
重新启动Thingsboard服务:
sudo service thingsboard restart
重启ThingsBoard之后,您需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面
步骤4.2将JAR文件添加到使用IDE启动的本地ThingsBoard
重新启动ThingsBoard服务器端容器。请参考以下链接以了解如何执行此操作:运行服务器端容器。
重启ThingsBoard之后,您需要清除浏览器缓存并刷新网页以重新加载规则节点的用户界面
步骤5.将自定义包名称添加到Thingsboard.yml
注意:如果您已将软件包名称从org.thingsboard.rule.engine更改为公司软件包名称,例如com.example.rule.engine,则还需要在插件部分的Thingsboard.yml文件中添加软件包名称:
# Plugins configuration parameters
plugins:
# Comma separated package list used during classpath scanning for plugins
scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"
步骤6.对规则节点进行故障排除
验证自定义规则节点的最简单方法是创建一个生成器规则节点,并将其连接到您的自定义规则节点。这将生成可配置的传入消息流。完成此操作后,您应该为自定义规则节点启用调试,以验证节点输出并检查它们是否存在错误。
步骤7.规则节点UI自定义(可选)
ThingsBoard规则节点UI在官方github存储库中配置了另一个项目。请参考以下链接查看构建说明。
要以热重新部署方式运行Rule Node UI容器:
首先,您需要在文件proxy.conf.js中将常数ruleNodeUiforwardPort从8080更改为5000,该文件应位于此处:
nano ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
其次,您需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:在热部署模式下运行UI容器。
最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行以下命令:
npm start
下一步
入门指南-这些指南提供了ThingsBoard主要功能的快速概述。设计在15至30分钟内完成。
- 安装指南-了解如何在各种可用的操作系统上设置ThingsBoard。
- 连接设备-了解如何根据连接技术或解决方案连接设备。
- 数据可视化-这些指南包含有关如何配置复杂的ThingsBoard仪表板的说明。
- 数据处理和操作-了解如何使用ThingsBoard规则引擎。
- 物联网数据分析-了解如何使用规则引擎执行基本分析任务。
- 硬件样本-了解如何将各种硬件平台连接到ThingsBoard。
- 高级功能-了解高级ThingsBoard功能。