ThingsBoard rule engine supports basic analysis of incoming telemetry data, for example, threshold crossing. The idea behind rule engine is to provide functionality to route data from IoT Devices to different plugins, based on device attributes or the data itself.
However, most of the real-life use cases also require the support of advanced analytics: machine learning, predictive analytics, etc.
This tutorial will demonstrate how you can:

  • route telemetry device data from ThingsBoard to Kafka topic using the built-in rule engine capabilities (works for both ThingsBoard CE and PE).
  • aggregate data from multiple devices using a simple Kafka Streams application.
  • push results of the analytics back to ThingsBoard for persistence and visualization using ThingsBoard PE Kafka Integration.

The analytics in this tutorial is, of course, quite simple, but our goal is to highlight the integration steps.
Kafka - 图1
Let’s assume we have a large number of solar panels which include a number of solar modules. ThingsBoard is used to collect, store and visualize anomaly telemetry from these solar modules in each panels.
We calculated anomaly by comparing value produced from a solar module with the average valued produced by all modules of the same panel and standard deviation of the same value.
Kafka - 图2
We will analyze real-time data from multiple devices using Kafka Streams job with 30 seconds window (configurable).
In order to store and visualize the results of the analytics, we are going to create three virtual solar module devices for each solar panel.

Prerequisites

The following services must be up and running:

  • ThingsBoard PE v2.4.2+ instance
  • Kafka server

    Step 1. Rule Chain configuration

    During this step we will configure three generator nodes that will produce simulated data for testing during development. Typically, you don’t need them in production, but it is very useful for debugging. We will generate data for 3 modules and one panel. Two of those modules will produce the same value and one module will produce much lower value. Of course, you should replace this with real data produced by real devices. This is just an example.
    Let’s create three devices with type “solar-module”. If you are using ThingsBoard PE, you cna put them to new “Solar Modules” group.
    Kafka - 图3
    Now, let’s create three device simulators to push data directly to our local Kafka broker. The simulated data will be pushed to the Kafka Rule Node, which is responsible for pushing data to Kafka topic. Let’s configure Kafka Rule Node first. We will use local Kafka server (localhost:9092) and topic “solar-module-raw”.
    Kafka - 图4
    Now, let’s add “generator” node for the first module. We will configure generator to constantly “produce” 5 watts.
    Kafka - 图5
    Add “generator” node for the second module. We will configure generator to constantly “produce” 5 watts as well.
    Now, let’s add “generator” node for the third module. We will configure generator to constantly “produce” 3.5 watts which simulates module degradation.
    Kafka - 图6
    The result rule chain should look similar to this one:
    Kafka - 图7
    You can also download the rule chain JSON file and import it to your project.
    Once the rule chain is imported, you should check the debug output of the Kafka node. If your Kafka is up and running at localhost, you should see similar debug messages. Notice the absence of errors in debug log:
    Kafka - 图8

    Step 2. Launch Kafka Streams application.

    During this step we will download and launch sample application that analyze raw data from “solar-module-raw” and produce valuable insights about module degradations. The sample application calculates total amount of energy produced by each module in the panel within the time window (configurable). Then application calculates average power produced by module for each panel and it’s deviation within the same time window. Once this is done, the app compares each module values with the average and if the difference is bigger then the deviation, we treat this as anomaly.
    The results of anomaly calculations are pushed to the “anomalies-topic”. ThingsBoard subscribed to this topic using Kafka Integration, generate alarms and store anomalies to the database.

    Download the sample application

    Feel free to grab the code from the ThingsBoard repository and build the project with maven:

    1. mvn clean install

    Go ahead and add that maven project to your favorite IDE.

    Dependencies review

    Main dependencies that are used in the project:

    1. <dependencies>
    2. ...
    3. <dependency>
    4. <groupId>org.apache.kafka</groupId>
    5. <artifactId>kafka-streams</artifactId>
    6. <version>${kafka.version}</version>
    7. </dependency>
    8. ...
    9. </dependencies>

    Source code review

    The Kafka Streams Application logic is concentrated mainly in the SolarConsumer class.

    1. private static Properties getProperties() {
    2. final Properties props = new Properties();
    3. props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams");
    4. props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    5. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    6. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    7. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    8. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    9. return props;
    10. }
    1. private static final String IN_TOPIC = "solar-module-raw";
    2. private static final TopicNameExtractor<String, SolarModuleAggregatorJoiner> OUT_TOPIC =
    3. new StaticTopicNameExtractor<>("solar-module-anomalies");
    4. // Time for windowing
    5. private static final Duration DURATION = Duration.ofSeconds(30);
    6. private static final TimeWindows TIME_WINDOWS = TimeWindows.of(DURATION);
    7. private static final JoinWindows JOIN_WINDOWS = JoinWindows.of(DURATION);
    8. private static final StreamsBuilder builder = new StreamsBuilder();
    9. // serde - Serializer/Deserializer
    10. // for custom classes should be custom Serializer/Deserializer
    11. private static final Serde<SolarModuleData> SOLAR_MODULE_DATA_SERDE =
    12. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleData.class));
    13. private static final Serde<SolarModuleAggregator> SOLAR_MODULE_AGGREGATOR_SERDE =
    14. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregator.class));
    15. private static final Serde<SolarPanelAggregator> SOLAR_PANEL_AGGREGATOR_SERDE =
    16. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregator.class));
    17. private static final Serde<SolarModuleKey> SOLAR_MODULE_KEY_SERDE =
    18. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleKey.class));
    19. private static final Serde<SolarPanelAggregatorJoiner> SOLAR_PANEL_AGGREGATOR_JOINER_SERDE =
    20. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarPanelAggregatorJoiner.class));
    21. private static final Serde<SolarModuleAggregatorJoiner> SOLAR_MODULE_AGGREGATOR_JOINER_SERDE =
    22. Serdes.serdeFrom(new JsonPojoSerializer<>(), new JsonPojoDeserializer<>(SolarModuleAggregatorJoiner.class));
    23. private static final Serde<String> STRING_SERDE = Serdes.String();
    24. private static final Serde<Windowed<String>> WINDOWED_STRING_SERDE = Serdes.serdeFrom(
    25. new TimeWindowedSerializer<>(STRING_SERDE.serializer()),
    26. new TimeWindowedDeserializer<>(STRING_SERDE.deserializer(), TIME_WINDOWS.size()));
    27. // 1 - sigma
    28. private static final double Z = 1;
    1. // source stream from kafka
    2. final KStream<SolarModuleKey, SolarModuleData> source =
    3. builder
    4. .stream(IN_TOPIC, Consumed.with(STRING_SERDE, SOLAR_MODULE_DATA_SERDE))
    5. .map((k, v) -> KeyValue.pair(new SolarModuleKey(v.getPanel(), v.getName()), v));
    6. // calculating sum power and average power for modules
    7. final KStream<Windowed<SolarModuleKey>, SolarModuleAggregator> aggPowerPerSolarModuleStream =
    8. source
    9. .groupByKey(Grouped.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_DATA_SERDE))
    10. .windowedBy(TIME_WINDOWS)
    11. .aggregate(SolarModuleAggregator::new,
    12. (modelKey, value, aggregation) -> aggregation.updateFrom(value),
    13. Materialized.with(SOLAR_MODULE_KEY_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE))
    14. .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
    15. .toStream();
    16. // calculating sum power and average power for panels
    17. final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelStream =
    18. aggPowerPerSolarModuleStream
    19. .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v))
    20. .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE))
    21. .aggregate(SolarPanelAggregator::new,
    22. (panelKey, value, aggregation) -> aggregation.updateFrom(value),
    23. Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE))
    24. .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
    25. .toStream();
    26. // if used for join more than once, the exception "TopologyException: Invalid topology:" will be thrown
    27. final KStream<Windowed<String>, SolarModuleAggregator> aggPowerPerSolarModuleForJoinStream =
    28. aggPowerPerSolarModuleStream
    29. .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v));
    30. // joining aggregated panels with aggregated modules
    31. // need for calculating sumSquare and deviance
    32. final KStream<Windowed<String>, SolarPanelAggregatorJoiner> joinedAggPanelWithAggModule =
    33. aggPowerPerSolarPanelStream
    34. .join(
    35. aggPowerPerSolarModuleForJoinStream,
    36. SolarPanelAggregatorJoiner::new, JOIN_WINDOWS,
    37. Joined.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE));
    38. //calculating sumSquare and deviance
    39. final KStream<Windowed<String>, SolarPanelAggregator> aggPowerPerSolarPanelFinalStream =
    40. joinedAggPanelWithAggModule
    41. .groupByKey(Grouped.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_JOINER_SERDE))
    42. .aggregate(SolarPanelAggregator::new,
    43. (key, value, aggregation) -> aggregation.updateFrom(value),
    44. Materialized.with(WINDOWED_STRING_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE))
    45. .suppress(Suppressed.untilTimeLimit(DURATION, Suppressed.BufferConfig.unbounded()))
    46. .toStream();
    47. // joining aggregated modules with aggregated panels in which calculated sumSquare and deviance
    48. // need for check modules with anomaly power value
    49. final KStream<Windowed<String>, SolarModuleAggregatorJoiner> joinedAggModuleWithAggPanel =
    50. aggPowerPerSolarModuleStream
    51. .map((k, v) -> KeyValue.pair(new Windowed<>(k.key().getPanelName(), k.window()), v))
    52. .join(
    53. aggPowerPerSolarPanelFinalStream,
    54. SolarModuleAggregatorJoiner::new, JOIN_WINDOWS,
    55. Joined.with(WINDOWED_STRING_SERDE, SOLAR_MODULE_AGGREGATOR_SERDE, SOLAR_PANEL_AGGREGATOR_SERDE));
    56. // streaming result data (modules with anomaly power value)
    57. joinedAggModuleWithAggPanel
    58. .filter((k, v) -> isAnomalyModule(v))
    59. .map((k, v) -> KeyValue.pair(k.key(), v))
    60. .to(OUT_TOPIC, Produced.valueSerde(SOLAR_MODULE_AGGREGATOR_JOINER_SERDE));
    61. // starting streams
    62. final KafkaStreams streams = new KafkaStreams(builder.build(), getProperties());
    63. streams.cleanUp();
    64. streams.start();
    65. Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    Calculating anomaly data
    Kafka - 图9

    1. private static boolean isAnomalyModule(SolarModuleAggregatorJoiner module) {
    2. double currentZ = Math.abs(module.getSumPower() - module.getSolarPanelAggregator().getAvgPower()) / module.getSolarPanelAggregator().getDeviance();
    3. return currentZ > Z;
    4. }

    Sample application output

    1. ...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 1]: 30.0:6
    2. ...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 2]: 30.0:6
    3. ...SolarConsumer - PerSolarModule: [1572447720|Panel 1|Module 3]: 21.0:6
    4. ...SolarConsumer - PerSolarPanel: [1572447690|Panel 1]: 81.0:3
    5. ...SolarConsumer - PerSolarPanelFinal: [1572447660|Panel 1]: power:81.0 count:3 squareSum:54.0 variance:18.0 deviance:4.2
    6. ...SolarConsumer - ANOMALY module: [1572447660|Panel 1|Module 3]: sumPower:21.0 panelAvg:27.0 deviance:4.2

    Step 3. Configure the Kafka Integration.

    Let’s configure ThingsBoard to subscribe to the “solar-module-anomalies” topic and create alarms. We will use Kafka Integration that is available since ThingsBoard v2.4.2.

    Configure Uplink Converter

    Before setting up a Kafka integration, you need to create the Uplink data converter. The uplink data converter is responsible for parsing the incoming anomalies data.
    Example of the incoming message that is produced by our Kafka Streams application:

    1. {
    2. "moduleName": "Module 3",
    3. "panelName": "Panel 1",
    4. "count": 6,
    5. "sumPower": 21.0,
    6. "avgPower": 3.5,
    7. "solarPanelAggregator": {
    8. "panelName": "Panel 1",
    9. "count": 3,
    10. "sumPower": 81.0,
    11. "avgPower": 27.0,
    12. "squaresSum": 54.0,
    13. "variance": 18.0,
    14. "deviance": 4.2
    15. }
    16. }

    See the following script that is pasted to the Decoder function section:

    1. // Decode an uplink message from a buffer
    2. // payload - array of bytes
    3. // metadata - key/value object
    4. /** Decoder **/
    5. // decode payload to string
    6. var msg = decodeToJson(payload);
    7. // decode payload to JSON
    8. // var data = decodeToJson(payload);
    9. var deviceName = msg.moduleName;
    10. var deviceType = 'module';
    11. // Result object with device attributes/telemetry data
    12. var result = {
    13. deviceName: deviceName,
    14. deviceType: deviceType,
    15. attributes: {
    16. panel: msg.panelName
    17. },
    18. telemetry: {
    19. avgPower: msg.avgPower,
    20. sumPower: msg.sumPower,
    21. avgPowerFromPanel: msg.solarPanelAggregator.avgPower,
    22. deviance: msg.solarPanelAggregator.deviance
    23. }
    24. };
    25. /** Helper functions **/
    26. function decodeToString(payload) {
    27. return String.fromCharCode.apply(String, payload);
    28. }
    29. function decodeToJson(payload) {
    30. // covert payload to string.
    31. var str = decodeToString(payload);
    32. // parse string to JSON
    33. var data = JSON.parse(str);
    34. return data;
    35. }
    36. return result;

    Kafka - 图10
    The purpose of the decoder function is to parse the incoming data and metadata to a format that ThingsBoard can consume. deviceName and deviceType are required, while attributes and telemetry are optional. Attributes and telemetry are flat key-value objects. Nested objects are not supported.
    Kafka - 图11

    Configure Kafka Integration

    Let’s create kafka integration that will subscribe to “solar-module-anomalies” topic.
    Kafka - 图12

    Step 4. Configure Rule Engine to raise Alarms.

    Follow existing “Create and Clear Alarms” guide to raise the alarm based on the “anomaly” boolean flag in the incoming telemetry and use “Send email on alarm” guide to send email notifications. Explore other guides to learn mode.

    Step 5. Remove debug messages logging

    Although the Debug mode is very useful for development and troubleshooting, leaving it enabled in production mode may tremendously increase the disk space, used by the database, because all the debugging data is stored there. It is highly recommended to turn the Debug mode off when done debugging.

    Next steps

  • Getting started guides - These guides provide quick overview of main ThingsBoard features. Designed to be completed in 15-30 minutes.

  • Installation guides - Learn how to setup ThingsBoard on various available operating systems.
  • Data visualization - These guides contain instructions how to configure complex ThingsBoard dashboards.
  • Data processing & actions - Learn how to use ThingsBoard Rule Engine.
  • IoT Data analytics - Learn how to use rule engine to perform basic analytics tasks.
  • Hardware samples - Learn how to connect various hardware platforms to ThingsBoard.
  • Advanced features - Learn about advanced ThingsBoard features.
  • Contribution and Development - Learn about contribution and development in ThingsBoard.