概述
RelaperCloud 规则引擎是一个高度可定制和可配置的复杂事件处理系统。使用规则引擎,您可以过滤、丰富和转换来自物联网设备和相关资产的传入消息。您还可以触发各种操作,例如通知或与外部系统的通信。
关键概念
规则引擎消息
规则引擎消息是一种可序列化的、不可变的数据结构,表示系统中的各种消息。例如:
- 来自设备的传入遥测、属性更新或RPC 调用;
- 实体生命周期事件:创建、更新、删除、分配、未分配、属性更新;
- 设备状态事件:连接、断开、活动、非活动等;
- 其他系统事件。
规则引擎消息包含以下信息:
- 消息 ID:基于时间的、普遍唯一的标识符;
- 消息的发起者:设备、资产或其他实体标识符;
- 消息类型:“Post telemetry”或“Inactivity Event”等;
- 消息的有效负载:带有实际消息有效负载的 JSON 正文;
-
规则节点
规则节点是规则引擎的一个基本组件,它一次处理单个传入消息并产生一个或多个传出消息。规则节点是规则引擎的主要逻辑单元。规则节点可以过滤、丰富、转换传入消息、执行操作或与外部系统通信。
规则节点关系
规则节点可能与其他规则节点相关。每个关系都有关系类型,一个用于识别关系逻辑含义的标签。当规则节点产生传出消息时,它总是指定用于将消息路由到下一个节点的关系类型。
典型的规则节点关系是“成功”和“失败”。表示逻辑运算的规则节点可以使用“真”或“假”。一些特定的规则节点可能使用完全不同的关系类型,例如:“Post Telemetry”、“Attributes Updated”、“Entity Created”等。规则链
规则链是规则节点及其关系的逻辑组。例如,下面的规则链将:
将所有遥测消息保存到数据库;
- 如果消息中的温度字段高于 50 度,则发出“高温警报”;
- 如果消息中的温度字段低于 -40 度,则发出“低温警报”;
- 如果脚本中出现逻辑或语法错误,则将无法执行温度检查脚本记录到控制台。
消息处理结果
消息处理有三种可能的结果:成功、失败和超时。当处理链中的最后一个规则节点成功处理消息时,消息处理尝试被标记为“成功”。如果其中一个规则节点产生消息处理“失败”,并且没有规则节点来处理该失败,则消息处理尝试被标记为“失败”。当总的处理时间超过可配置的阈值时,消息处理尝试被标记为“超时”。
预定义的消息类型
下表列出了预定义的消息类型:
OST_ATTRIBUTES_REQUEST | 属性发布 | 发布设备客户端 属性 |
deviceName - 设备名称, deviceType - 设备类型 |
键/值 { “currentState”: “IDLE” } |
---|---|---|---|---|
POST_TELEMETRY_REQUEST | 遥测发布 | 发布设备遥测数据 | deviceName - 设备名称, deviceType - 设备类型, ts - 时间戳 (毫秒) |
键/值 { “temperature”: 22.7 } |
TO_SERVER_RPC_REQUEST | RPC Request from Device | 设备RPC请求 | deviceName - 设备名称, deviceType - 设备类型, requestId - RPC请求Id |
包含方法和参数的json: { “method”: “getTime”, “params”: { “param1”: “val1” } } |
RPC_CALL_FROM_SERVER_TO_DEVICE | 服务端RPC响应 | 响应RPC请求 | requestUUID - sustem表示内部应答的请求id, expirationTime - 请求过期时间, oneway - 指定请求类型: true - 无响应, false - 有响应 |
包含方法和参数的json: { “method”: “getGpioStatus”, “params”: { “param1”: “val1” } } |
ACTIVITY_EVENT | 活动事件 | 表明设备处于活动状态的事件 | deviceName - 设备名称, deviceType - 设备类型 |
包含设备活动信息的json: { “active”: true, “lastConnectTime”: 1526979083267, “lastActivityTime”: 1526979083270, “lastDisconnectTime”: 1526978493963, “lastInactivityAlarmTime”: 1526978512339, “inactivityTimeout”: 10000 } |
INACTIVITY_EVENT | 不活动事件 | 表示设备处理非活动状态的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
CONNECT_EVENT | 连接事件 | 设备连接时的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
DISCONNECT_EVENT | 断开事件 | 设备断开连接产生的事件 | deviceName - 设备名称, deviceType - 设备类型 |
设备活动信息的json活动事件 payload |
ENTITY_CREATED | 实体创建 | 实体创建产生的事件 | userName - 实体创建的用户名, userId - 用户Id |
实体详细信息的json: { “id”: { “entityType”: “DEVICE”, “id”: “efc4b9e0-5d0f-11e8-8559-37a7f8cdca74” }, “createdTime”: 1526918366334, … “name”: “my-device”, “type”: “temp-sensor” } |
ENTITY_UPDATED | 实体更新 | 更新实体产生的事件 | userName - 更新实体的用户名, userId - 用户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_DELETED | 实体删除 | 删除实体产生的事件 | userName - 删除实体的用户名, userId - 用户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_ASSIGNED | 实体分配 | 实体分配给客户时生的事件 | userName - 分配实体的用户名, userId - 用户Id, assignedCustomerName -分配的客户名, assignedCustomerId - 客户Id |
实体详细信息的json:参见实体创建 payload |
ENTITY_UNASSIGNED | 取消实体分配 | 取消实体对客户分配时产生的事件 | userName - 取消分配操作的用户名, userId - 用户Id, unassignedCustomerName - 取消配客户名称, unassignedCustomerId - 取消配客户Id |
实体详细信息的json:参见实体创建 payload |
ATTRIBUTES_UPDATED | 属性更新 | 实体属性更新时产生的事件 | userName - 操作的用户名, userId - 用户Id, scope - 属性更新作用 ( SERVER_SCOPE或SHARED_SCOPE) |
键/值json: { “softwareVersion”: “1.2.3” } |
ATTRIBUTES_DELETED | 属性删除 | 实体属性删除时产生的事件 | userName - 操作的用户名, userId - 用户Id, scope - 属性删除作用 (SERVER_SCOPE或SHARED_SCOPE) |
已删除的属性的keys列表: { “attributes”: [“modelNumber”, “serial”] } |
ALARM | 警报事件 | 创建、更新或删除警报时产生的事件 | 消息发起者元数据中的所有字段 isNewAlarm - 创建了一个新的Alram,则为true isExistingAlarm - 已存在警报,则为true isClearedAlarm - 清除了警报,则为true |
创建警报的json详细信息: { “tenantId”: { … }, “type”: “High Temperature Alarm”, “originator”: { … }, “severity”: “CRITICAL”, “status”: “CLEARED_UNACK”, “startTs”: 1526985698000, “endTs”: 1526985698000, “ackTs”: 0, “clearTs”: 1526985712000, “details”: { “temperature”: 70, “ts”: 1526985696000 }, “propagate”: true, “id”: “33cd8999-5dac-11e8-bbab-ad47060c9431”, “createdTime”: 1526985698000, “name”: “High Temperature Alarm” } |
REST_API_REQUEST | REST API请求到规则引擎 | 执行REST API调用时产生的事件 | requestUUID - 请求id, expirationTime - 请求过期时间 |
json请求的playload |
规则节点类型
根据其性质将所有可用规则节点分组:
- Filter Nodes用于消息过滤和路由;
- Enrichment Nodes用于更新传入消息的元数据;
- Transformation Nodes用于更改传入的消息字段,例如Originator, Type, Payload, Metadata;
- Action Nodes根据传入的消息执行各种动作;
External Nodes用于与外部系统进行交互.
配置
每一个规则节点具有特定的参数配置,例如:过滤脚本节点可以通过自定义JS函数。外部节点可以通过参数配置实现外部邮件服务器连接设置
可以通过在“规则链”编辑器中双击节点来打开“规则节点”配置窗口:Javascript函数
一些规则节点具有特定的UI功能,允许用户测试JS函数。单击Test Filter Function后,你将看到JS编辑器,可使用该编辑器替换输入参数并验证函数的输出。
你可以定义:Message Type 左上角.
- Message payload 左侧中间.
- Metadata 右上角.
- JS script 实际脚本.
点击Test按钮将在右侧Output返回值
调试
启用调试后,只要相应的关系类型,用户就可以查看传入和传出消息的信息。请参阅下图,获取示例调试消息视图:
导入导出
你可以将规则链导出为JSON格式,并将其导入到相同或其他RelaperCloud 实例。 为了导出规则链,你应该导航到规则链库页面,然后单击位于特定规则链卡上的导出按钮。
类似地,要导入规则链,你应该导航到规则链库页面,然后单击屏幕右上角的大“ +”按钮,然后单击导入按钮。
拖拽或以选择的方式添加规则链配置文件
典型案例
通过睿珀iot规则引擎配置的常见用例:
- 在保存到数据库之前对接收的遥测数据或属性进行验证和修改。
- 将遥测或属性从设备复制到相关资产以便可以汇总遥测。例如:可以将多个设备中的数据汇总到相关资产中。
- 根据定义的条件对alarms进行创建、更新、清除。
- 根据设备生命周期事件触发操作。例如:如果设备处于在线/离线状态,则创建警告。
- 加载所需的其他处理数据。例如:在客户设备或租户属性中定义的设备的playload温度阈值。
- 调用外部系统的REST API。
- 发生复杂事件时发送电子邮件,企业微信,钉钉并使用配置模板中的其他实体的属性。
- 在事件处理期间要考虑用户的偏好。
- 根据定义的条件进行RPC调用。
- 集成第三方消息队列例如:Kafka,Rabbitmq,Mqtt, AWS等。
简单案例
温度传感器上报数据过滤展示
使用RelaperCloud 将DHT22温度传感器采集的-30°C至 90°C温度值进行收集展示
1 修改规则链配置
进入RelaperCloud UI选择规则链库转到Root Rule Chain打开.
2 添加过滤脚本
拖动过滤脚本节点放入链中并配置如下脚本:
return typeof msg.temperature === 'undefined'
|| (msg.temperature >= -30 && msg.temperature <= 90);
如果传入温度值或属性值满足条件则脚本将返回True,否则将返回False。如果脚本返回True则传入消息将被关联到与True关系连接的下一个节点。
我们希望所有的telemetry requests(遥测)都通过此脚本进行验证. 删除消息类型切换节点和保存时间序列节点之间的Post Telemetry关系节点。
3 建立连接关系
将消息类型切换节点和过滤脚本使用Post Telemetry进行连接:
将过滤脚本节点与保存时间序列节点使用关系True进行连接:
将过滤脚本节点与日志节点使用关系False进行连接这样无效数据将被记录在系统日志中
点击保存按钮应用更新。
4 验证结果
创建设备并将遥测提交到RelaperCloud,点击菜单栏中的设备并创建新的设备:
设备详情页复制设备访问令牌,使用设备令牌进行rest api提交遥测数据
提交temperature = 99的值,可以进行Latest Telemetry中查看,发现并未加成功:
-----注意:替换掉$ACCESS_TOKEN为实际设备的Token-------
curl -v -X POST -d '{"temperature":99}' http://iot.relaper.com/api/v1/$ACCESS_TOKEN/telemetry --header "Content-Type:application/json"
提交temperature = 20可以看见遥测数据保存成功
-----注意:替换掉$ACCESS_TOKEN为实际设备的Token-------
curl -v -X POST -d '{"temperature":20}' http://iot.relaper.com/api/v1/$ACCESS_TOKEN/telemetry --header "Content-Type:application/json"