  • OpenJDK的8
  • Maven 3.6.0+
  • 任何现代的Java IDE,尽管我们建议使用IntelliJ IDEA
  • [可选]将Lombok插件安装到您喜欢的IDE。


    1. git clone https://github.com/thingsboard/rule-node-examples
    2. cd rule-node-examples

默认情况下,示例项目配置为使用ThingsBoard Community Edition的API。这使您的规则节点与平台的社区版和专业版都兼容。
如果您想使用某些专有的Professional Edition API(例如使用实体组等),则应在Thingsboard.yml中更改“ thingsboard.version”参数:

  1. nano pom.xml

例如,下面的属性设置为3.2.0PE Professional Edition:

  1. ...
  2. <properties>
  3. ...
  4. <thingsboard.version>3.2.0PE</thingsboard.version>
  5. ...
  6. </properties>
  7. ...


  1. mvn clean install


  1. ...
  2. [INFO] ------------------------------------------------------------------------
  4. [INFO] ------------------------------------------------------------------------
  5. [INFO] Total time: 1.431 s
  6. [INFO] Finished at: 2020-08-18T11:01:40+03:00
  7. [INFO] ------------------------------------------------------------------------




为了创建新的规则节点,您应该实现 TbNode接口,并使用 RuleNode批注对其进行批注。

  1. @RuleNode(
  2. type = ComponentType.FILTER,
  3. name = "check key",
  4. relationTypes = {"True", "False"},
  5. configClazz = TbKeyFilterNodeConfiguration.class,
  6. nodeDescription = "Checks the existence of the selected key in the message payload.",
  7. nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
  8. uiResources = {"static/rulenode/custom-nodes-config.js"},
  9. configDirective = "tbFilterNodeCheckKeyConfig")
  10. public class TbKeyFilterNode implements TbNode {
  11. private static final ObjectMapper mapper = new ObjectMapper();
  12. private TbKeyFilterNodeConfiguration config;
  13. private String key;
  14. @Override
  15. public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
  16. this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
  17. key = config.getKey();
  18. }
  19. @Override
  20. public void onMsg(TbContext ctx, TbMsg msg) {
  21. try {
  22. ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
  23. } catch (IOException e) {
  24. ctx.tellFailure(msg, e);
  25. }
  26. }
  27. @Override
  28. public void destroy() {
  29. }
  30. }




  • type是可用的“规则节点类型”之一。此参数影响“规则链编辑器”的哪个部分将包含您的“规则节点”。
  • 名称-用于规则链编辑器和调试消息的规则节点的任何合理名称;
  • nodeDescription-您的节点的简短描述。在规则链编辑器中可见;
  • nodeDetails-带有html标签支持的节点的完整描述。在规则链编辑器中可见;
  • configClazz-描述配置json的类的完整类名。
  • RelationTypes-具有预定义关系类型的字符串数组;该值应对应于TbContext.tellNext方法中使用的
  • customRelations-布尔值,指示您将在TbContext.tellNext方法中使用任何自定义关系;
  • configDirective-基于Angular的UI指令的名称,该指令将允许用户编辑规则节点的配置。这是可选的,可以为空。在这种情况下,用户将看到原始的JSON编辑器;
  • uiResources-包含配置指令的Angular UI文件的路径。这是可选的,可以为空。在这种情况下,用户将看到原始的JSON编辑器;
  • 图标-来自角材料包的图标名称;
  • iconUrl-图标的完整URL,将用于在“规则链编辑器”中的节点列表中显示规则节点;
  • docUrl-链接到当前规则节点的文档页面,该页面将在“规则链编辑器”中提供。


    “初始化”方法是通过规则引擎创建了新的规则节点时调用。如果有人将规则节点添加到规则链或系统停止,则可能会发生这种情况。此方法主要用于解析作为JSON对象的配置或获取TbContext的本地副本。“ TbNodeUtils.convert”正在将原始配置解析为指定类的java对象。



    1. /**
    2. * Indicates that message was successfully processed by the rule node.
    3. * Sends message to all Rule Nodes in the Rule Chain
    4. * that are connected to the current Rule Node using "Success" relationType.
    5. *
    6. * @param msg
    7. */
    8. void tellSuccess(TbMsg msg);
    9. /**
    10. * Sends message to all Rule Nodes in the Rule Chain
    11. * that are connected to the current Rule Node using specified relationType.
    12. *
    13. * @param msg
    14. * @param relationType
    15. */
    16. void tellNext(TbMsg msg, String relationType);
    17. /**
    18. * Sends message to all Rule Nodes in the Rule Chain
    19. * that are connected to the current Rule Node using one of specified relationTypes.
    20. *
    21. * @param msg
    22. * @param relationTypes
    23. */
    24. void tellNext(TbMsg msg, Set<String> relationTypes);

    如果消息处理失败,则规则节点实现必须调用“ tellFailure”方法:

    1. /**
    2. * Notifies Rule Engine about failure to process current message.
    3. *
    4. * @param msg - message
    5. * @param th - exception
    6. */
    7. void tellFailure(TbMsg msg, Throwable th);




    1. // Allows to work with entity attributes: get and save them;
    2. AttributesService getAttributesService();
    3. // Allows CRUD (Create, Read, Updat, Delete) operations over the customer entities;
    4. CustomerService getCustomerService();
    5. // Allows CRUD operations over users;
    6. UserService getUserService();
    7. // Allows CRUD operations over assets;
    8. AssetService getAssetService();
    9. // Allows CRUD operations over devices;
    10. DeviceService getDeviceService();
    11. // Allows CRUD operations over entity views;
    12. EntityViewService getEntityViewService();
    13. // Allows to programmatically create and manage dashboards;
    14. DashboardService getDashboardService();
    15. // Allows to create and clear alarms;
    16. RuleEngineAlarmService getAlarmService();
    17. // Allows to programmatically create and manage rule chains;
    18. RuleChainService getRuleChainService();
    19. // Allows to send RPC commands to devices;
    20. RuleEngineRpcService getRpcService();
    21. // Allows to store telemetry to the database and push notifications to the dashbaords via WebSockets;
    22. RuleEngineTelemetryService getTelemetryService();
    23. // Allows to find telemetry and save it to the database without notifications to the dashboards;
    24. TimeseriesService getTimeseriesService();
    25. // Allows to programmatically query and manage entity relations;
    26. RelationService getRelationService();

    ThingsBoard PE用户可以使用TbContext.getPeContext()方法访问其他服务。TbPeContext提供对以下服务的访问:

    1. // Allows to programmatically create and manage integrations;
    2. IntegrationService getIntegrationService();
    3. // Allows to programmatically create and manage entity groups;
    4. EntityGroupService getEntityGroupService();
    5. // Allows to programmatically create reports;
    6. ReportService getReportService();
    7. // Allows to programmatically manage blob entities;
    8. BlobEntityService getBlobEntityService();
    9. // Allows to programmatically manage group permissions;
    10. GroupPermissionService getGroupPermissionService();
    11. // Allows to programmatically manage roles;
    12. RoleService getRoleService();
    13. // Get entity owner (TenantId or CustomerId)
    14. EntityId getOwner(TenantId tenantId, EntityId entityId);
    15. // Clear entity owners cache
    16. void clearOwners(EntityId entityId);
    17. // Get all sub-customers of the current entity
    18. Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);
    19. // Allows to change entity owner. Expects TenantId or CustomerId as targetOwnerId
    20. void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws ThingsboardException;
    21. void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws ThingsboardException;
    22. void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws ThingsboardException;
    23. void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws ThingsboardException;
    24. void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws ThingsboardException;
    25. void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws ThingsboardException;
    26. void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws ThingsboardException;
    27. // Allows to push custom downlink message to the integration
    28. void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);



    1. @Override
    2. public void onMsg(TbContext ctx, TbMsg msg) {
    3. EntityId msgOriginator = msg.getOriginator();
    4. // Checking that the message originator is a Customer;
    5. if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
    6. CustomerId customerId = new CustomerId(msgOriginator.getId());
    7. boolean hasNext = true;
    8. // Creating the page link to iterate through the devices;
    9. PageLink pageLink = new PageLink(1024);
    10. while (hasNext) {
    11. // Using the Device Service to get devices from the database;
    12. PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
    13. hasNext = devices.hasNext();
    14. pageLink = pageLink.nextPageLink();
    15. for (Device device : devices.getData()) {
    16. // Creating new message with different originator
    17. TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
    18. // Pushing new message to the queue instead of tellNext to make sure that the message will be persisted;
    19. ctx.enqueueForTellNext(newMsg, "Success");
    20. }
    21. }
    22. // Don't forget to acknowledge original message or use ctx.tellSuccess(msg);
    23. ctx.ack(msg);
    24. } else {
    25. ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
    26. }
    27. }


    1. void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);


    1. void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);


    规则引擎是参与者模型的实现,该参与者模型针对规则节点邮箱中的每个新消息顺序调用 TbNode.onMsg方法。因此,如果您在同一线程中处理消息,则您的实现是线程安全的。

    1. @Override
    2. public void onMsg(TbContext ctx, TbMsg msg) {
    3. try {
    4. // Parsing the incoming message;
    5. ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
    6. // Converting temperature from °F to °C
    7. double temperatureF = json.get("temperature").asDouble();
    8. double temperatureC = (temperatureF - 32) * 5 / 9;
    9. // Creating the telemetry data point
    10. TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
    11. // Using async API call to save telemetry with the callback
    12. ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
    13. @Override
    14. public void onSuccess(@Nullable Void aVoid) {
    15. // Telemetry is saved, now we can acknowledge the message;
    16. ctx.tellSuccess(msg);
    17. }
    18. @Override
    19. public void onFailure(Throwable throwable) {
    20. // Telemetry is not saved, we need rule engine to reprocess the message;
    21. ctx.tellFailure(msg, throwable);
    22. }
    23. });
    24. } catch (JsonProcessingException e) {
    25. ctx.tellFailure(msg, e);
    26. }
    27. }



    作为规则节点开发人员,您可以覆盖默认方法TbNode.onPartitionChangeMsg 以对集群拓扑的更改做出反应。这对于有状态节点根据消息的始发者(设备/资产)ID决定缓存信息很有用。为了确定当前实体ID属于当前分配的分区列表,可以使用TbContext.isLocalEntity。请参阅下面的完整示例:

    1. package org.thingsboard.rule.engine.node.filter;
    2. import com.fasterxml.jackson.core.JsonProcessingException;
    3. import com.fasterxml.jackson.databind.ObjectMapper;
    4. import com.fasterxml.jackson.databind.node.ObjectNode;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
    7. import org.thingsboard.rule.engine.api.RuleNode;
    8. import org.thingsboard.rule.engine.api.TbContext;
    9. import org.thingsboard.rule.engine.api.TbNode;
    10. import org.thingsboard.rule.engine.api.TbNodeConfiguration;
    11. import org.thingsboard.rule.engine.api.TbNodeException;
    12. import org.thingsboard.server.common.data.DataConstants;
    13. import org.thingsboard.server.common.data.id.EntityId;
    14. import org.thingsboard.server.common.data.kv.AttributeKvEntry;
    15. import org.thingsboard.server.common.data.plugin.ComponentType;
    16. import org.thingsboard.server.common.msg.TbMsg;
    17. import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
    18. import java.util.Optional;
    19. import java.util.concurrent.ConcurrentHashMap;
    20. import java.util.concurrent.ConcurrentMap;
    21. import java.util.concurrent.ExecutionException;
    22. @Slf4j
    23. @RuleNode(
    24. type = ComponentType.FILTER,
    25. name = "Cache example",
    26. relationTypes = {"True", "False"},
    27. configClazz = EmptyNodeConfiguration.class,
    28. nodeDescription = "Checks that the incoming value exceeds certain threshold",
    29. nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
    30. uiResources = {"static/rulenode/rulenode-core-config.js"},
    31. configDirective = "tbNodeEmptyConfig")
    32. public class TbCacheExampleNode implements TbNode {
    33. private static final ObjectMapper mapper = new ObjectMapper();
    34. private ConcurrentMap<EntityId, Double> cache;
    35. @Override
    36. public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
    37. this.cache = new ConcurrentHashMap<>();
    38. }
    39. @Override
    40. public void onMsg(TbContext ctx, TbMsg msg) {
    41. try {
    42. // Parsing the incoming message;
    43. ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
    44. double temperature = json.get("temperature").asDouble();
    45. // Fetching temperatureThreshold attribute from cache or from the database
    46. Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
    47. // Compare and do something with the result of comparison;
    48. ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
    49. } catch (JsonProcessingException e) {
    50. ctx.tellFailure(msg, e);
    51. }
    52. }
    53. @Override
    54. public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
    55. // Cleanup the cache for all entities that are no longer assigned to current server partitions
    56. cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
    57. }
    58. private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
    59. // Get value from cache or from the database.
    60. return cache.computeIfAbsent(entityId, id -> {
    61. try {
    62. Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
    63. if (attr.isPresent()) {
    64. return attr.get().getDoubleValue().orElse(defaultValue);
    65. } else {
    66. return defaultValue;
    67. }
    68. } catch (InterruptedException | ExecutionException e) {
    69. throw new RuntimeException(e);
    70. }
    71. });
    72. }
    73. @Override
    74. public void destroy() {
    75. // In case you have changed the configuration, it is good idea to clear the entire cache.
    76. cache.clear();
    77. }
    78. }



    1. mvn clean install


    1. target/rule-engine-1.0.0-custom-nodes.jar


  • 如果将ThingsBoard作为服务安装,请使用步骤4.1。

  • 如果ThingsBoard是从源代码构建的,并且是从IDE本地启动的,则使用步骤4.2


  • 首先,您需要执行以下命令将jar文件复制到ThingsBoard扩展:

    1. sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
  • 接下来,执行以下操作以将所有者更改为ThingsBoard:

    1. sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*


    1. sudo service thingsboard restart



  • 请参阅IDEAEclipse的单独说明。




  1. # Plugins configuration parameters
  2. plugins:
  3. # Comma separated package list used during classpath scanning for plugins
  4. scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"




要以热重新部署方式运行Rule Node UI容器:

  • 首先,您需要在文件proxy.conf.js中将常数ruleNodeUiforwardPort8080更改为5000,该文件应位于此处:

    1. nano ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
  • 其次,您需要在热部署模式下运行UI容器。请参考以下链接以了解如何执行此操作:在热部署模式下运行UI容器

  • 最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行以下命令:

    1. npm start


