ThingsBoard规则引擎是对于处理复杂的事件具有灵活配置和高度定制化的特点。
你可以使用规则引擎的Filter、Enrichment和Transform节点通过设备和相关资产发出输入消息。
你可以使用规则引擎的Action、Externala节点触发各种操作与通信。

关键概念

规则引擎消息

规则引擎消息是可序列化的,不变的数据结构,代表系统中的各种消息。例如:

  • 来自设备的传入遥测属性更新RPC调用
  • 实体生命周期事件:创建,更新,删除,分配,未分配,属性更新;
  • 设备状态事件:已连接,已断开连接,活动,不活动等;
  • 其他系统事件。

规则引擎消息包含以下信息:

  • 消息ID:基于时间的通用唯一标识符;
  • 消息的发起者:设备,资产或其他实体标识符;
  • 消息类型:“后遥测”或“不活动事件”等;
  • 消息的有效负载:带有实际消息有效负载的JSON正文;
  • 元数据:键值对的列表以及有关消息的其他数据。

    规则节点

    规则节点是规则引擎的基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。

    规则节点关系

    规则节点可能与其他规则节点相关。每个关系都有关系类型,这是一个用于识别关系逻辑意义的标签。当规则节点生成传出消息时,它总是指定用于将消息路由到下一个节点的关系类型。
    典型的规则节点关系是 “Success” 和 “Failure”。表示逻辑操作的规则节点可以使用 “True” 或 “False”。一些特定的规则节点可能使用完全不同的关系类型,例如: “Post Telemetry” 、 “Attributes Updated” 、 “Entity Created” 等。

    规则链

    规则链是规则节点及其关系的逻辑组。例如,下面的规则链将:

  • 将所有遥测消息保存到数据库;

  • 如果消息中的温度场高于50度,则发出“高温警报”;
  • 如果消息中的温度场低于-40度,则发出“低温警报”;
  • 如果脚本中出现逻辑或语法错误,则无法执行温度检查脚本进行记录。

概述 - 图1
租户管理员能够定义一个 Root Rule Chain 和可选的多个其他规则链。根规则链处理所有传入消息,并可能将它们转发到其他规则链进行额外处理。其他规则链也可以将消息转发到不同的规则链。
例如,下面的规则链将:

  • 如果消息中的温度将高于 50 度,则触发 “高温警报”;
  • 如果消息中的温度低于 50 度,则清除 “高温警报”;
  • 将有关 “已创建” 和 “已清除” 警报的事件转发到外部规则链,该规则链处理向相应用户的通知。

概述 - 图2

消息处理结果

消息处理有三种可能的结果:成功,失败和超时。当处理链中的最后一个规则节点成功处理消息时,消息处理尝试将标记为“成功”。如果规则节点之一产生消息处理的“失败”,则消息处理尝试将标记为“失败”,并且没有规则节点可以处理该失败。当处理的总时间超过可配置的阈值时,消息处理尝试将标记为“超时”。
参见下图,让我们回顾一下可能的情况:
概述 - 图3
如果“转换”脚本失败,则该消息不会标记为“失败”,因为存在一个与“失败”关系连接的“保存到数据库”节点。如果“转换”脚本成功,则将通过REST API调用将其推送到“外部系统”。如果外部系统过载,则REST API调用可能会“挂起”一段时间。假设消息包处理的总体超时为20秒。让我们忽略转换脚本的执行时间,因为它小于1毫秒。因此,如果“外部系统”将在20秒内回复,则消息将被成功处理。同样,如果“保存到数据库”调用成功,则消息将被成功处理。但是,如果外部系统在20秒内未答复,则消息处理尝试将标记为“超时”。同样,如果“保存到数据库”调用失败,

规则引擎队列

规则引擎在启动时订阅队列并轮询新消息。总会有“主”主题用作新传入消息的主要入口点。您可以使用Thingsboard.yml或环境变量配置多个队列。配置完成后,您可以使用“检查点”节点将消息发送到其他主题。这将自动确认当前主题中的相应消息。
队列的定义由以下参数组成:

  • 名称-用于统计和记录;
  • 主题-队列实现用于生成和使用消息;
  • poll-interval-如果没有新消息到达,则两次轮询之间的持续时间(以毫秒为单位);
  • partitions-与该队列关联的分区数。用于扩展可以并行处理的消息数;
  • pack-processing-timeout-处理消费者返回的特定消息包的时间间隔(以毫秒为单位);
  • Submit-strategy-定义将消息提交到规则引擎的逻辑和顺序。请参阅下面的单独段落。
  • 处理策略-定义消息确认的逻辑。请参阅下面的单独段落。

    排队提交策略

    规则引擎服务不断轮询特定主题的消息,一旦使用者返回消息列表,它就会创建TbMsgPackProcessingContext对象。队列提交策略控制来自TbMsgPackProcessingContext的消息如何提交到规则链。有5种可用策略:

  • 突发-所有邮件按到达顺序提交到规则链。

  • BATCH-使用“ queue.rule-engine.queues [队列索引] .batch-size”配置参数将消息分组为批次。在确认之前的批次之前,不会提交新批次。
  • SEQUENTIAL_BY_ORIGINATOR-消息在特定实体(消息的发起者)内按顺序提交。在确认设备A的先前消息之前,不会提交例如设备A的新消息。
  • SEQUENTIAL_BY_TENANT-消息在租户(消息发起者的所有者)内按顺序提交。在确认租户A的先前消息之前,不会提交例如租户A的新消息。
  • 连续-消息按顺序提交。在确认之前的消息之前,不会提交新消息。这使得处理相当慢。

请参阅本指南,以获取提交策略用例的示例。

队列处理策略

处理策略控制失败或超时消息的重新处理方式。有5种可用策略:

  • SKIP_ALL_FAILURES-只需忽略所有故障和超时。将导致消息“丢失”。例如,如果数据库关闭,则消息将不会保留,但仍将标记为“已确认”并从队列中删除。创建该策略主要是为了与以前的版本和开发/演示环境向后兼容。
  • RETRY_ALL-重试处理包中的所有消息。如果每100条消息中有1条将失败,则策略仍将重新处理(重新提交到规则引擎)100条消息。
  • RETRY_FAILED-重试处理包中所有失败的消息。如果每100条消息中有1条失败,则策略将仅重新处理(重新提交到规则引擎)1条消息。超时的消息将不被重新处理。
  • RETRY_TIMED_OUT-重试处理包中的所有超时消息。如果每100条消息中有1条超时,则策略将仅重新处理(重新提交到规则引擎)1条消息。失败的消息将不会被重新处理。
  • RETRY_FAILED_AND_TIMED_OUT-重试处理包中所有失败和超时的消息。

所有“重试*”策略均支持重要的配置参数:

  • 重试-重试次数,0为无限
  • failure-percentage-如果失败或超时少于消息的X百分比,则跳过重试;
  • pause-between-retries-在重试之前在使用者线程中等待的时间(以秒为单位);

有关处理策略用例的示例,请参见本指南

默认队列

配置了三个默认队列:Main,HighPriority和SequentialByOriginator。它们基于提交和处理策略而不同。基本上,规则引擎处理来自主主题的消息,并可以选择使用“检查点”规则节点将其放入其他主题。默认情况下,主要主题只是忽略失败的消息。这样做是为了与以前的版本向后兼容。但是,您可能需要自行承担重新配置的风险。请注意,如果由于规则节点脚本中的某些故障而未处理一条消息,则可能会阻止处理下一条消息。我们设计了特定的仪表板来监视规则引擎的处理和故障。
HighPriority主题可用于传递警报或其他关键处理步骤。在发生故障的情况下,HighPriority主题中的消息会不断进行重新处理,直到消息处理成功为止。如果SMTP服务器或外部系统中断,这将很有用。规则引擎将重试发送消息,直到处理完为止。
如果您想确保以正确的顺序处理消息,则SequentialByOriginator主题很重要。来自同一实体的消息将按照到达队列的顺序进行处理。直到确认相同实体ID的先前消息,规则引擎才会向规则链提交新消息。

预定义的消息类型

下表显示了预定义的消息类型的列表:

讯息类型 显示名称 描述 消息元数据 邮件有效负载
POST_ATTRIBUTES_REQUEST 发布属性 来自设备的请求以发布客户端属性(请参阅属性api以获取参考) deviceName-发起方设备名称,
deviceType-发起方设备类型
键/值json:
{<br /> "currentState": "IDLE"<br />}
POST_TELEMETRY_REQUEST 遥测后 设备发出的发布遥测的请求(请参阅遥测上传API以供参考) DEVICENAME -发端设备名称,
设备类型-始发设备类型,
TS -时间戳(毫秒)
键/值json:
{<br /> "temperature": 22.7<br />}
TO_SERVER_RPC_REQUEST 来自设备的RPC请求 来自设备的RPC请求(请参阅客户端rpc以获取参考) deviceName-发起方设备名称,
deviceType-发起方设备类型,
requestId-客户端提供的RPC请求ID
json包含方法参数
{<br /> "method": "getTime",<br /> "params": { "param1": "val1" }<br />}
RPC_CALL_FROM_SERVER_TO_DEVICE RPC请求到设备 服务器到设备的RPC请求(请参阅服务器端rpc api以获取参考) requestUUID -使用sustem识别应答目标内部请求ID,
expirationTime -时间时请求将被过期,
单向-指定请求类型:-无响应,-与响应
json包含方法参数
{<br /> "method": "getGpioStatus",<br /> "params": { "param1": "val1" }<br />}
ACTIVITY_EVENT 活动事件 指示设备处于活动状态的事件 deviceName-发起方设备名称,
deviceType-发起方设备类型
包含设备活动信息的json:
{<br /> "active": true,<br /> "lastConnectTime": 1526979083267,<br /> "lastActivityTime": 1526979083270,<br /> "lastDisconnectTime": 1526978493963,<br /> "lastInactivityAlarmTime": 1526978512339,<br /> "inactivityTimeout": 10000<br />}
INACTIVITY_EVENT 不活动事件 指示设备不活动的事件 deviceName-发起方设备名称,
deviceType-发起方设备类型
包含设备活动信息的json,请参阅活动事件有效负载
CONNECT_EVENT 连接事件 连接设备时产生事件 deviceName-发起方设备名称,
deviceType-发起方设备类型
包含设备活动信息的json,请参阅活动事件有效负载
DISCONNECT_EVENT 断开事件 设备断开连接时产生的事件 deviceName-发起方设备名称,
deviceType-发起方设备类型
包含设备活动信息的json,请参阅活动事件有效负载
ENTITY_CREATED 实体已创建 在系统中创建新实体时产生的事件 userName-创建实体的用户名,
userId-用户ID
包含创建的实体详细信息的json:
{<br /> "id": {<br /> "entityType": "DEVICE",<br /> "id": "efc4b9e0-5d0f-11e8-8559-37a7f8cdca74"<br /> },<br /> "createdTime": 1526918366334,<br /> ...<br /> "name": "my-device",<br /> "type": "temp-sensor"<br />}
ENTITY_UPDATED 实体已更新 更新现有实体时产生的事件 userName-更新实体的用户名,
userId-用户ID
包含更新的实体详细信息的json,请参阅实体创建的有效负载
ENTITY_DELETED 实体已删除 删除现有实体时产生的事件 userName-删除实体的用户名,
userId-用户ID
包含已删除实体详细信息的json,请参阅实体创建的有效内容
ENTITY_ASSIGNED 实体分配 现有实体分配给客户时产生的事件 userName-执行分配操作的用户的名称,
userId-用户ID,
assignedCustomerName-分配的客户名称,
assignedCustomerId-分配的客户ID
包含已分配实体详细信息的json,请参阅实体创建的有效负载
ENTITY_UNASSIGNED 实体未分配 从客户取消分配现有实体时产生的事件 userName-执行取消分配操作的用户名,
userId-用户ID,
unassignedCustomerName-未分配的客户名,
unassignedCustomerId-未分配的客户的ID
包含未分配实体详细信息的json,请参阅实体创建的有效负载
ADDED_TO_ENTITY_GROUP 已添加到组 将实体添加到实体组时产生的事件。此消息类型特定于ThingsBoard PE userName的-谁执行任务操作,用户名
用户id的用户ID, -
addedToEntityGroupName -实体组名称,
addedToEntityGroupId -标识的实体组
空的json负载
REMOVED_FROM_ENTITY_GROUP 从组中删除 实体组中删除实体时产生的事件。此消息类型特定于ThingsBoard PE userName的-谁执行取消分配操作,用户名
用户id的用户ID, -
removedFromEntityGroupName -实体组名称,
removedFromEntityGroupId - Id的实体组
空的json负载
ATTRIBUTES_UPDATED 属性已更新 执行实体属性更新时产生的事件 userName的-谁执行属性更新的用户名,
用户id -用户ID,
范围-更新属性范围(可以是SERVER_SCOPESHARED_SCOPE
具有更新属性的键/值json:
{<br /> "softwareVersion": "1.2.3"<br />}
ATTRIBUTES_DELETED 属性已删除 删除某些实体属性时产生的事件 userName的-谁删除属性的用户名,
用户id -用户ID,
范围-删除属性范围(可以是SERVER_SCOPESHARED_SCOPE
具有属性字段的json,其中包含已删除的属性键的列表:
{<br /> "attributes": ["modelNumber", "serial"]<br />}
报警 警报事件 创建,更新或删除警报时产生的事件 原始消息元数据中的所有字段
isNewAlarm-如果刚刚创建了一个新 的警报,则为true
isExistingAlarm-如果已存在警报,则为 true
isClearedAlarm-如果清除了警报,则为true
包含创建的警报详细信息的json:
{<br /> "tenantId": {<br /> ...<br /> },<br /> "type": "High Temperature Alarm",<br /> "originator": {<br /> ...<br /> },<br /> "severity": "CRITICAL",<br /> "status": "CLEARED_UNACK",<br /> "startTs": 1526985698000,<br /> "endTs": 1526985698000,<br /> "ackTs": 0,<br /> "clearTs": 1526985712000,<br /> "details": {<br /> "temperature": 70,<br /> "ts": 1526985696000<br /> },<br /> "propagate": true,<br /> "id": "33cd8999-5dac-11e8-bbab-ad47060c9431",<br /> "createdTime": 1526985698000,<br /> "name": "High Temperature Alarm"<br />}
REST_API_REQUEST REST API向规则引擎的请求 用户执行REST API调用时产生的事件 requestUUID-唯一请求ID,
expirationTime-请求的到期时间
具有请求有效负载的json

规则节点类型

根据其性质将所有可用规则节点分组:

  • Filter Nodes 用于消息过滤和路由;
  • Enrichment Nodes 用于更新传入消息的元数据;
  • Transformation Nodes 用于更改传入消息字段,如发起人、类型、有效负载、元数据;
  • Action Nodes 根据传入消息执行各种操作;
  • External Nodes 用于与外部系统交互。

    组态

    每个规则节点可能都有特定的配置参数,这些参数取决于规则节点的实现。例如,“Filter-script” 规则节点可以通过处理传入数据的自定义 JS 函数进行配置。“外部发送电子邮件” 节点配置允许指定邮件服务器连接参数。
    通过双击规则链编辑器中的节点,可以打开规则节点配置窗口:
    概述 - 图4

    测试JavaScript函数

    一些规则节点具有特定的UI功能,允许用户测试JS函数。单击 Test Filter Function,您将看到JS编辑器,该编辑器允许您替换输入参数并验证该函数的输出。
    概述 - 图5
    您可以定义:

  • Message Type 在左上角字段中。

  • Message payload 在左消息部分。
  • Metadata 在右元数据部分。
  • 过滤器部分的实际 JS script。

按下 Test 输出后,将在右侧的 Output 部分返回。

规则引擎统计

ThingsBoard已经为“规则引擎”统计信息准备了“默认”仪表板。
将为每个租户自动加载此仪表板。统计信息收集默认情况下处于启用状态,并通过配置属性进行控制。
你可能会在下面的仪表板上注意到有关处理错误及其原因的见解:
概述 - 图6

调试

ThingsBoard 提供了查看每个规则节点的传入和传出消息的能力。要启用调试,用户需要确保在主配置窗口中选择了 “Debug mode” 复选框 (请参见配置部分的第一张图片)。
启用调试后,只要对应的关系类型,用户就可以看到传入和传出消息信息。有关调试消息视图示例,请参见下图:
概述 - 图7

导入和导出

您可以将规则链导出为JSON格式,并将其导入到相同或另一个ThingsBoard实例。
为了导出规则链,您应该进入“规则链”页面,然后单击位于特定规则链卡上的导出按钮。
概述 - 图8
类似地,要导入规则链,您应该进入到“规则链”页面,然后单击屏幕右下角的大“ +”按钮,然后单击导入按钮。

架构

要了解有关规则引擎内部的更多信息,请参阅架构页面。

自定义REST API调用规则引擎


ThingsBoard PE功能
专业版支持自定义规则引擎REST API调用功能。
使用ThingsBoard Cloud安装自己的平台实例。

ThingsBoard提供了将自定义REST API调用发送到规则引擎,处理请求的有效负载并在响应正文中返回处理结果的API。 这对于许多用例很有用。例如:

  • 通过自定义API调用扩展平台的现有REST API;
  • 利用设备/资产/客户的属性丰富REST API调用,并转发给外部系统以进行复杂处理;
  • 为你的自定义小部件提供自定义API.

要执行REST API调用,你可以使用规则引擎控制器REST APIs:
概述 - 图9
注意: 您在调用中指定的实体 id 将是规则引擎消息的发起者。如果不指定实体 id 参数,您的用户实体将成为消息的发起者。

讲解

ThingsBoard作者准备了一些教程,以帮助您通过示例设计规则链:

在此处查看更多教程。