ThingsBoard 规则引擎是一个用于复杂事件处理的高度可定制和可配置的系统。使用规则引擎,您可以过滤(filter丰富(enrich 转换(transform 由 IoT 设备和相关资产发出的传入消息。您还可以触发各种操作,例如通知或与外部系统的通信。

规则引擎消息(Rule Engine Message)

规则引擎消息是一种可序列化、不可变的数据结构,代表系统中的各种消息。例如:

规则引擎消息包含以下信息:

  • 消息 ID:基于时间的通用唯一标识符;
  • 消息的发起者:设备、资产或其他实体标识符;
  • 消息类型:“发布遥测”或“不活动事件”等;Type of the message
  • 消息的有效载荷:带有实际消息有效载荷的 JSON 数据; Payload of the message
  • 元数据:包含有关消息的附加数据的键值对列表。Metadata

    规则节点

    规则节点是规则引擎的一个基本组件,它一次处理单个传入消息并生成一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤、丰富、转换传入消息、执行操作或与外部系统通信。

    规则节点关系

    规则节点可能与其他规则节点相关。每个关系都有关系类型,即用于标识关系逻辑含义的标签。当规则节点产生传出消息时,它总是指定用于将消息路由到下一个节点的关系类型。
    典型的规则节点关系是“成功”和“失败”。表示逻辑操作的规则节点可以使用“True”或“False”。一些特定的规则节点可能使用完全不同的关系类型,例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。

    规则链

    规则链是规则节点及其关系的逻辑组。例如:

  • 将所有遥测消息保存到数据库中;

  • 如果消息中的温度字段将高于 50 度,则引发“高温警报”;
  • 如果消息中的温度字段低于-40 度,则引发“低温警报”;
  • 如果脚本中出现逻辑或语法错误,则将执行温度检查脚本失败记录到控制台。

0.1 规则引擎中的关键概念(上) - 图1
租户管理员能够定义一个根规则链和可选的多个其他规则链。根规则链处理所有传入的消息,并可能将它们转发到其他规则链以进行额外处理。其他规则链也可以将消息转发到不同的规则链。

  • 如果消息中的温度字段将高于 50 度,则引发“高温警报”;
  • 如果消息中的温度字段低于 50 度,则清除“高温警报”;
  • 将有关“已创建”和“已清除”警报的事件转发到处理通知给相应用户的外部规则链。

0.1 规则引擎中的关键概念(上) - 图2

消息处理结果

消息处理有三种可能的结果:成功失败超时。当处理链中的最后一个规则节点成功处理消息时,消息处理尝试被标记为“成功”。如果规则节点之一产生消息处理“失败”,并且没有规则节点来处理该失败,则消息处理尝试被标记为“失败”。当处理的总时间超过可配置的阈值时,消息处理尝试被标记为“超时”。 例如:
0.1 规则引擎中的关键概念(上) - 图3

如果“Transformation”脚本失败,消息不会被标记为“Failed”,因为有一个“Save to DB”节点与“Failure”关系相连。如果“转换”脚本成功,它将通过 REST API 调用推送到“外部系统”。如果外部系统过载,REST API 调用可能会“挂起”一段时间。让我们假设消息包处理的总超时时间为 20 秒。让我们忽略 Transformation 脚本执行时间,因为它 < 1ms。因此,如果“外部系统”在 20 秒内回复,则消息将被成功处理。类似地,如果“Save to DB”调用成功,则消息将被成功处理。但是,如果外部系统在 20 秒内没有回复,消息处理尝试将被标记为“超时”。

规则引擎队列

规则引擎在启动时订阅队列并轮询新消息。始终有“主main”topic用作新传入消息的入口点。您可以使用 thingsboard.yml 或环境变量配置多个队列。配置完成后,您可以使用“Checkpoint”节点将消息发送到另一个主题。这会自动确认当前主题中的相应消息。
队列的定义由以下参数组成:

  • name名称 - 用于统计和日志记录;
  • topic主题 - 队列实现用于生成和消费消息;
  • poll-interval - 如果没有新消息到达,轮询消息之间的持续时间(以毫秒为单位);
  • partitions - 与此队列关联的分区数。用于扩展可并行处理的消息数量;
  • pack-processing-timeout - 处理消费者返回的特定消息包的时间间隔(以毫秒为单位);
  • submit-strategy 提交策略- 定义向规则引擎提交消息的逻辑和顺序。
  • processing-strategy处理策略 - 定义消息确认的逻辑。请参阅下面的单独段落。

    队列提交策略

    规则引擎服务不断轮询特定主题的消息,一旦消费者返回消息列表,它就会创建 TbMsgPackProcessingContext 对象。队列提交策略如何控制将来自 TbMsgPackProcessingContext 的消息提交到规则链,有5种可用策略:

  • BURST - 所有消息都按照它们到达的顺序提交到规则链。

  • BATCH - 使用“queue.rule-engine.queues[queue index].batch-size”配置参数将消息分组为批次。在确认前一个批次之前,不会提交新批次。
  • SEQUENTIAL_BY_ORIGINATOR - 消息在特定实体(消息的发起者)内按顺序提交。直到设备 A 的先前消息被确认后,才会提交例如设备 A 的新消息。
  • SEQUENTIAL_BY_TENANT - 消息在租户(消息发起者的所有者)内按顺序提交。在租户 A 的先前消息被确认之前,不会提交例如租户 A 的新消息。
  • SEQUENTIAL - 消息按顺序提交。在确认前一条消息之前,不会提交新消息。这使得处理非常缓慢。

    队列处理策略

    处理策略控制如何重新处理失败或超时的消息,有5种可用策略:

  • SKIP_ALL_FAILURES - 忽略所有失败和超时。会导致消息“丢失”。例如,如果数据库关闭,消息将不会被持久化,但仍会被标记为“已确认”并从队列中删除。创建此策略主要是为了与以前的版本和开发/演示环境向后兼容。

  • RETRY_ALL - 重试处理包中的所有消息。如果 100 条消息中有 1 条失败,策略仍将重新处理(重新提交到规则引擎)100 条消息。
  • RETRY_FAILED - 重试来自处理包的所有失败消息。如果 100 条消息中有 1 条失败,则策略将仅重新处理(重新提交到规则引擎)1 条消息。不会重新处理超时消息。
  • RETRY_TIMED_OUT - 重试处理包中的所有超时消息。如果 100 条消息中有 1 条超时,则策略将仅重新处理(重新提交到规则引擎)1 条消息。不会重新处理失败的消息。
  • RETRY_FAILED_AND_TIMED_OUT - 重试来自处理包的所有失败和超时消息。

所有“RETRY*”策略都支持重要的配置参数:

  • retries - 重试次数,0 为无限制
  • failure-percentage - 如果失败或超时小于消息的 X 百分比,则跳过重试;
  • pause-between-retries - 重试前在消费者线程中等待的时间(以秒为单位);

    默认队列

    配置了三个默认队列:Main、HighPriority 和 SequentialByOriginator。它们因提交和处理策略而异。基本上,规则引擎处理来自Main主题的消息,并可以选择使用“checkpoint”规则节点将它们放入其他主题。默认情况下,Main主题只是忽略失败的消息。这样做是为了与以前的版本向后兼容。但是,您可以自行承担重新配置此操作的风险。请注意,如果由于规则节点脚本中的某些故障而未处理一条消息,则可能会阻止处理下一条消息。我们设计了特定的仪表板来监控规则引擎处理和故障。
    HighPriority 主题可用于传递警报或其他关键处理步骤。HighPriority 主题中的消息会在失败的情况下不断重新处理,直到消息处理成功。如果 SMTP 服务器或外部系统中断,这将非常有用。规则引擎将重试发送消息,直到它被处理。
    如果您想确保按正确顺序处理消息,则 SequentialByOriginator 主题很重要。来自同一实体的消息将按照它们到达队列的顺序进行处理。规则引擎不会向规则链提交新消息,直到确认相同实体 ID 的前一条消息。