ThingsBoard规则引擎是对于处理复杂的事件具有灵活配置和高度定制化的特点。
你可以使用规则引擎的Filter、Enrichment和Transform节点通过设备和相关资产发出输入消息。
你可以使用规则引擎的Action、Externala节点触发各种操作与通信。
关键概念
规则引擎消息
规则引擎消息是可序列化的,不变的数据结构,代表系统中的各种消息。例如:
规则引擎消息包含以下信息:
- 消息ID:基于时间的通用唯一标识符;
- 消息的发起者:设备,资产或其他实体标识符;
- 消息类型:“后遥测”或“不活动事件”等;
- 消息的有效负载:带有实际消息有效负载的JSON正文;
-
规则节点
规则节点是规则引擎的基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤,丰富,转换传入消息,执行操作或与外部系统通信。
规则节点关系
规则节点可能与其他规则节点相关。每个关系都有关系类型,这是一个用于识别关系逻辑意义的标签。当规则节点生成传出消息时,它总是指定用于将消息路由到下一个节点的关系类型。
典型的规则节点关系是 “Success” 和 “Failure”。表示逻辑操作的规则节点可以使用 “True” 或 “False”。一些特定的规则节点可能使用完全不同的关系类型,例如: “Post Telemetry” 、 “Attributes Updated” 、 “Entity Created” 等。规则链
规则链是规则节点及其关系的逻辑组。例如,下面的规则链将:
将所有遥测消息保存到数据库;
- 如果消息中的温度场高于50度,则发出“高温警报”;
- 如果消息中的温度场低于-40度,则发出“低温警报”;
- 如果脚本中出现逻辑或语法错误,则无法执行温度检查脚本进行记录。
租户管理员能够定义一个 Root Rule Chain 和可选的多个其他规则链。根规则链处理所有传入消息,并可能将它们转发到其他规则链进行额外处理。其他规则链也可以将消息转发到不同的规则链。
例如,下面的规则链将:
- 如果消息中的温度将高于 50 度,则触发 “高温警报”;
- 如果消息中的温度低于 50 度,则清除 “高温警报”;
- 将有关 “已创建” 和 “已清除” 警报的事件转发到外部规则链,该规则链处理向相应用户的通知。
消息处理结果
消息处理有三种可能的结果:成功,失败和超时。当处理链中的最后一个规则节点成功处理消息时,消息处理尝试将标记为“成功”。如果规则节点之一产生消息处理的“失败”,则消息处理尝试将标记为“失败”,并且没有规则节点可以处理该失败。当处理的总时间超过可配置的阈值时,消息处理尝试将标记为“超时”。
参见下图,让我们回顾一下可能的情况:
如果“转换”脚本失败,则该消息不会标记为“失败”,因为存在一个与“失败”关系连接的“保存到数据库”节点。如果“转换”脚本成功,则将通过REST API调用将其推送到“外部系统”。如果外部系统过载,则REST API调用可能会“挂起”一段时间。假设消息包处理的总体超时为20秒。让我们忽略转换脚本的执行时间,因为它小于1毫秒。因此,如果“外部系统”将在20秒内回复,则消息将被成功处理。同样,如果“保存到数据库”调用成功,则消息将被成功处理。但是,如果外部系统在20秒内未答复,则消息处理尝试将标记为“超时”。同样,如果“保存到数据库”调用失败,
规则引擎队列
规则引擎在启动时订阅队列并轮询新消息。总会有“主”主题用作新传入消息的主要入口点。您可以使用Thingsboard.yml或环境变量配置多个队列。配置完成后,您可以使用“检查点”节点将消息发送到其他主题。这将自动确认当前主题中的相应消息。
队列的定义由以下参数组成:
- 名称-用于统计和记录;
- 主题-队列实现用于生成和使用消息;
- poll-interval-如果没有新消息到达,则两次轮询之间的持续时间(以毫秒为单位);
- partitions-与该队列关联的分区数。用于扩展可以并行处理的消息数;
- pack-processing-timeout-处理消费者返回的特定消息包的时间间隔(以毫秒为单位);
- Submit-strategy-定义将消息提交到规则引擎的逻辑和顺序。请参阅下面的单独段落。
-
排队提交策略
规则引擎服务不断轮询特定主题的消息,一旦使用者返回消息列表,它就会创建TbMsgPackProcessingContext对象。队列提交策略控制来自TbMsgPackProcessingContext的消息如何提交到规则链。有5种可用策略:
突发-所有邮件按到达顺序提交到规则链。
- BATCH-使用“ queue.rule-engine.queues [队列索引] .batch-size”配置参数将消息分组为批次。在确认之前的批次之前,不会提交新批次。
- SEQUENTIAL_BY_ORIGINATOR-消息在特定实体(消息的发起者)内按顺序提交。在确认设备A的先前消息之前,不会提交例如设备A的新消息。
- SEQUENTIAL_BY_TENANT-消息在租户(消息发起者的所有者)内按顺序提交。在确认租户A的先前消息之前,不会提交例如租户A的新消息。
- 连续-消息按顺序提交。在确认之前的消息之前,不会提交新消息。这使得处理相当慢。
请参阅本指南,以获取提交策略用例的示例。
队列处理策略
处理策略控制失败或超时消息的重新处理方式。有5种可用策略:
- SKIP_ALL_FAILURES-只需忽略所有故障和超时。将导致消息“丢失”。例如,如果数据库关闭,则消息将不会保留,但仍将标记为“已确认”并从队列中删除。创建该策略主要是为了与以前的版本和开发/演示环境向后兼容。
- RETRY_ALL-重试处理包中的所有消息。如果每100条消息中有1条将失败,则策略仍将重新处理(重新提交到规则引擎)100条消息。
- RETRY_FAILED-重试处理包中所有失败的消息。如果每100条消息中有1条失败,则策略将仅重新处理(重新提交到规则引擎)1条消息。超时的消息将不被重新处理。
- RETRY_TIMED_OUT-重试处理包中的所有超时消息。如果每100条消息中有1条超时,则策略将仅重新处理(重新提交到规则引擎)1条消息。失败的消息将不会被重新处理。
- RETRY_FAILED_AND_TIMED_OUT-重试处理包中所有失败和超时的消息。
所有“重试*”策略均支持重要的配置参数:
- 重试-重试次数,0为无限
- failure-percentage-如果失败或超时少于消息的X百分比,则跳过重试;
- pause-between-retries-在重试之前在使用者线程中等待的时间(以秒为单位);
有关处理策略用例的示例,请参见本指南。
默认队列
配置了三个默认队列:Main,HighPriority和SequentialByOriginator。它们基于提交和处理策略而不同。基本上,规则引擎处理来自主主题的消息,并可以选择使用“检查点”规则节点将其放入其他主题。默认情况下,主要主题只是忽略失败的消息。这样做是为了与以前的版本向后兼容。但是,您可能需要自行承担重新配置的风险。请注意,如果由于规则节点脚本中的某些故障而未处理一条消息,则可能会阻止处理下一条消息。我们设计了特定的仪表板来监视规则引擎的处理和故障。
HighPriority主题可用于传递警报或其他关键处理步骤。在发生故障的情况下,HighPriority主题中的消息会不断进行重新处理,直到消息处理成功为止。如果SMTP服务器或外部系统中断,这将很有用。规则引擎将重试发送消息,直到处理完为止。
如果您想确保以正确的顺序处理消息,则SequentialByOriginator主题很重要。来自同一实体的消息将按照到达队列的顺序进行处理。直到确认相同实体ID的先前消息,规则引擎才会向规则链提交新消息。
预定义的消息类型
下表显示了预定义的消息类型的列表:
讯息类型 | 显示名称 | 描述 | 消息元数据 | 邮件有效负载 |
---|---|---|---|---|
POST_ATTRIBUTES_REQUEST | 发布属性 | 来自设备的请求以发布客户端属性(请参阅属性api以获取参考) | deviceName-发起方设备名称, deviceType-发起方设备类型 |
键/值json:{<br /> "currentState": "IDLE"<br />} |
POST_TELEMETRY_REQUEST | 遥测后 | 设备发出的发布遥测的请求(请参阅遥测上传API以供参考) | DEVICENAME -发端设备名称, 设备类型-始发设备类型, TS -时间戳(毫秒) |
键/值json:{<br /> "temperature": 22.7<br />} |
TO_SERVER_RPC_REQUEST | 来自设备的RPC请求 | 来自设备的RPC请求(请参阅客户端rpc以获取参考) | deviceName-发起方设备名称, deviceType-发起方设备类型, requestId-客户端提供的RPC请求ID |
json包含方法和参数:{<br /> "method": "getTime",<br /> "params": { "param1": "val1" }<br />} |
RPC_CALL_FROM_SERVER_TO_DEVICE | RPC请求到设备 | 服务器到设备的RPC请求(请参阅服务器端rpc api以获取参考) | requestUUID -使用sustem识别应答目标内部请求ID, expirationTime -时间时请求将被过期, 单向-指定请求类型:真-无响应,假-与响应 |
json包含方法和参数:{<br /> "method": "getGpioStatus",<br /> "params": { "param1": "val1" }<br />} |
ACTIVITY_EVENT | 活动事件 | 指示设备处于活动状态的事件 | deviceName-发起方设备名称, deviceType-发起方设备类型 |
包含设备活动信息的json:{<br /> "active": true,<br /> "lastConnectTime": 1526979083267,<br /> "lastActivityTime": 1526979083270,<br /> "lastDisconnectTime": 1526978493963,<br /> "lastInactivityAlarmTime": 1526978512339,<br /> "inactivityTimeout": 10000<br />} |
INACTIVITY_EVENT | 不活动事件 | 指示设备不活动的事件 | deviceName-发起方设备名称, deviceType-发起方设备类型 |
包含设备活动信息的json,请参阅活动事件有效负载 |
CONNECT_EVENT | 连接事件 | 连接设备时产生事件 | deviceName-发起方设备名称, deviceType-发起方设备类型 |
包含设备活动信息的json,请参阅活动事件有效负载 |
DISCONNECT_EVENT | 断开事件 | 设备断开连接时产生的事件 | deviceName-发起方设备名称, deviceType-发起方设备类型 |
包含设备活动信息的json,请参阅活动事件有效负载 |
ENTITY_CREATED | 实体已创建 | 在系统中创建新实体时产生的事件 | userName-创建实体的用户名, userId-用户ID |
包含创建的实体详细信息的json:{<br /> "id": {<br /> "entityType": "DEVICE",<br /> "id": "efc4b9e0-5d0f-11e8-8559-37a7f8cdca74"<br /> },<br /> "createdTime": 1526918366334,<br /> ...<br /> "name": "my-device",<br /> "type": "temp-sensor"<br />} |
ENTITY_UPDATED | 实体已更新 | 更新现有实体时产生的事件 | userName-更新实体的用户名, userId-用户ID |
包含更新的实体详细信息的json,请参阅实体创建的有效负载 |
ENTITY_DELETED | 实体已删除 | 删除现有实体时产生的事件 | userName-删除实体的用户名, userId-用户ID |
包含已删除实体详细信息的json,请参阅实体创建的有效内容 |
ENTITY_ASSIGNED | 实体分配 | 现有实体分配给客户时产生的事件 | userName-执行分配操作的用户的名称, userId-用户ID, assignedCustomerName-分配的客户名称, assignedCustomerId-分配的客户ID |
包含已分配实体详细信息的json,请参阅实体创建的有效负载 |
ENTITY_UNASSIGNED | 实体未分配 | 从客户取消分配现有实体时产生的事件 | userName-执行取消分配操作的用户名, userId-用户ID, unassignedCustomerName-未分配的客户名, unassignedCustomerId-未分配的客户的ID |
包含未分配实体详细信息的json,请参阅实体创建的有效负载 |
ADDED_TO_ENTITY_GROUP | 已添加到组 | 将实体添加到实体组时产生的事件。此消息类型特定于ThingsBoard PE。 | userName的-谁执行任务操作,用户名 用户id的用户ID, - addedToEntityGroupName -实体组名称, addedToEntityGroupId -标识的实体组 |
空的json负载 |
REMOVED_FROM_ENTITY_GROUP | 从组中删除 | 从实体组中删除实体时产生的事件。此消息类型特定于ThingsBoard PE。 | userName的-谁执行取消分配操作,用户名 用户id的用户ID, - removedFromEntityGroupName -实体组名称, removedFromEntityGroupId - Id的实体组 |
空的json负载 |
ATTRIBUTES_UPDATED | 属性已更新 | 执行实体属性更新时产生的事件 | userName的-谁执行属性更新的用户名, 用户id -用户ID, 范围-更新属性范围(可以是SERVER_SCOPE或SHARED_SCOPE) |
具有更新属性的键/值json:{<br /> "softwareVersion": "1.2.3"<br />} |
ATTRIBUTES_DELETED | 属性已删除 | 删除某些实体属性时产生的事件 | userName的-谁删除属性的用户名, 用户id -用户ID, 范围-删除属性范围(可以是SERVER_SCOPE或SHARED_SCOPE) |
具有属性字段的json,其中包含已删除的属性键的列表:{<br /> "attributes": ["modelNumber", "serial"]<br />} |
报警 | 警报事件 | 创建,更新或删除警报时产生的事件 | 原始消息元数据中的所有字段 isNewAlarm-如果刚刚创建了一个新 的警报,则为true isExistingAlarm-如果已存在警报,则为 true isClearedAlarm-如果清除了警报,则为true |
包含创建的警报详细信息的json:{<br /> "tenantId": {<br /> ...<br /> },<br /> "type": "High Temperature Alarm",<br /> "originator": {<br /> ...<br /> },<br /> "severity": "CRITICAL",<br /> "status": "CLEARED_UNACK",<br /> "startTs": 1526985698000,<br /> "endTs": 1526985698000,<br /> "ackTs": 0,<br /> "clearTs": 1526985712000,<br /> "details": {<br /> "temperature": 70,<br /> "ts": 1526985696000<br /> },<br /> "propagate": true,<br /> "id": "33cd8999-5dac-11e8-bbab-ad47060c9431",<br /> "createdTime": 1526985698000,<br /> "name": "High Temperature Alarm"<br />} |
REST_API_REQUEST | REST API向规则引擎的请求 | 用户执行REST API调用时产生的事件 | requestUUID-唯一请求ID, expirationTime-请求的到期时间 |
具有请求有效负载的json |
规则节点类型
根据其性质将所有可用规则节点分组:
- Filter Nodes 用于消息过滤和路由;
- Enrichment Nodes 用于更新传入消息的元数据;
- Transformation Nodes 用于更改传入消息字段,如发起人、类型、有效负载、元数据;
- Action Nodes 根据传入消息执行各种操作;
External Nodes 用于与外部系统交互。
组态
每个规则节点可能都有特定的配置参数,这些参数取决于规则节点的实现。例如,“Filter-script” 规则节点可以通过处理传入数据的自定义 JS 函数进行配置。“外部发送电子邮件” 节点配置允许指定邮件服务器连接参数。
通过双击规则链编辑器中的节点,可以打开规则节点配置窗口:
测试JavaScript函数
一些规则节点具有特定的UI功能,允许用户测试JS函数。单击 Test Filter Function,您将看到JS编辑器,该编辑器允许您替换输入参数并验证该函数的输出。
您可以定义:Message Type 在左上角字段中。
- Message payload 在左消息部分。
- Metadata 在右元数据部分。
- 过滤器部分的实际 JS script。
按下 Test 输出后,将在右侧的 Output 部分返回。
规则引擎统计
ThingsBoard已经为“规则引擎”统计信息准备了“默认”仪表板。
将为每个租户自动加载此仪表板。统计信息收集默认情况下处于启用状态,并通过配置属性进行控制。
你可能会在下面的仪表板上注意到有关处理错误及其原因的见解:
调试
ThingsBoard 提供了查看每个规则节点的传入和传出消息的能力。要启用调试,用户需要确保在主配置窗口中选择了 “Debug mode” 复选框 (请参见配置部分的第一张图片)。
启用调试后,只要对应的关系类型,用户就可以看到传入和传出消息信息。有关调试消息视图示例,请参见下图:
导入和导出
您可以将规则链导出为JSON格式,并将其导入到相同或另一个ThingsBoard实例。
为了导出规则链,您应该进入“规则链”页面,然后单击位于特定规则链卡上的导出按钮。
类似地,要导入规则链,您应该进入到“规则链”页面,然后单击屏幕右下角的大“ +”按钮,然后单击导入按钮。
架构
要了解有关规则引擎内部的更多信息,请参阅架构页面。
自定义REST API调用规则引擎
ThingsBoard PE功能 仅专业版支持自定义规则引擎REST API调用功能。 使用ThingsBoard Cloud或安装自己的平台实例。 |
---|
ThingsBoard提供了将自定义REST API调用发送到规则引擎,处理请求的有效负载并在响应正文中返回处理结果的API。 这对于许多用例很有用。例如:
- 通过自定义API调用扩展平台的现有REST API;
- 利用设备/资产/客户的属性丰富REST API调用,并转发给外部系统以进行复杂处理;
- 为你的自定义小部件提供自定义API.
要执行REST API调用,你可以使用规则引擎控制器REST APIs:
注意: 您在调用中指定的实体 id 将是规则引擎消息的发起者。如果不指定实体 id 参数,您的用户实体将成为消息的发起者。
讲解
ThingsBoard作者准备了一些教程,以帮助您通过示例设计规则链:
在此处查看更多教程。