在前面的课程中,我们完成了 IotHub 上行数据处理的功能。截止目前,IotHub 使用 MongoDB 作为数据存储,这很好,不过在物联网的应用中,在某些情况下,我们可能还会用到别的存储方案,比如说时序数据库,那么这一节我们就来聊聊时序数据库。

时序数据

首先让我们来看一下什么是时序数据。时序数据是一类按照时间维度进行索引的数据,它记录了某个被测量的实体在一定时间范围内,每个时间点上的一组测试值。比如说,传感器上传的大棚每小时的湿度和温度数据、A 股中某支股票每个时间点的股价、计算机系统的监控数据等,都属于时序数据,时序数据有以下一些特点:

  • 数据量较大,写入操作是持续且平稳的,而且写多读少
  • 只有写入操作,几乎没有更新操作,比如去修改大棚温度和湿度的历史数据,那是没有什么意义的;
  • 没有随机删除,即使删除也是按照时间范围进行删除。 删除大棚 08:35 的温度记录没有任何实际意义,但是删除 6 个月以前的记录是有意义的。
  • 数据实时性和时效性很强,数据随着时间的推移不断追加,旧数据很快失去意义。
  • 大部分以时间和实体为维度进行查询,很少以测试值为维度查询,比如会去查询某个大棚某个时间段的温度数据,但是很少会去查温度高于多少度的记录。

如果说你的业务数据符合上面的条件,比如,你的业务数据属于监控、运维类的数据,或者你的数据需要用折线图之类的进行可视化,那么你就可以考虑使用时序数据库。

时序数据库

时序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有 InfluxDB、OpenTSDB、TimeScaleDB 等。在本课程中,我们使用 InfluxDB 数据库进行演示。
时序数据库有以下几个概念。

  • Metric:度量,可以当作关系型数据库中的表(table)。
  • Data Point:数据点,可以当作关系型数据库的中的行(row)。
  • Timestamp:时间戳,数据点生成时的时间戳
  • Field:测量值,比如温度和湿度
  • Tag:标签,用于标识数据点,通常用来标识数据点的来源,比如温度和湿度数据来自哪个大棚,可以当作关系型数据库表的主键

我画了一张图来方便大家理解这几个概念:
17.时序数据库 - 图1
Vents:度量 Metric,存储所有大棚的温度和湿度数据
Humidiity 和 Temperature:测量值 Field
Vent No. 和 Vent Section: Tag 标签,标识测量值来自于哪个大棚
Time:时间戳 Timestamp
框起来的这部分就是一个 Data Point,包含 Timestamp、Field、Tag

收集设备连接状态变化的数据

本课程选择将 IotHub 中的一些状态监控的数据存入 InfluxDB 来演示如何使用时序数据库,目前我们的数据中,设备连接状态变化(上线/下线)可以算作一种时序数据,我们将这个数据加入时序数据库,这样就可以看到设备连接状态的变化情况,对故障排查也是有帮助的。

安装 InfluxDB

根据 InfluxDB 1.7 documentation 下载安装对应平台的 InfluxDB,本课程里使用的 InfluxDB 版本为 1.7.6。
请按照文档里的步骤配置并运行 InfluxDB,默认情况下 InfluxDB 运行在localhost:8086
首先,我们创建一个数据库:

  • 运行 InfluxDB CLI:influx

    1. onnected to http://localhost:8086 version 1.7.6
    2. InfluxDB shell version: 1.7.6
    3. Enter an InfluxQL query
  • 然后输入create database iothub,回车,数据库就创建好了。

    存储数据

    接下来做的事情就比较简单了,IotHub 在获取到设备上线和下线时间时,将对应的数据写入 InfluxDB,这里我们使用 InfluxDB 的 Node.js 库 Influx

    在 InfluxDB 中,度量 Metric 被称为 Measurement。

  1. //IotHub_Server/services/influxdb_service.js
  2. const Influx = require('influx')
  3. const influx = new Influx.InfluxDB({
  4. host: process.env.INFLUXDB,
  5. database: 'iothub',
  6. schema: [
  7. {
  8. measurement: 'device_connections',
  9. fields: {
  10. connected: Influx.FieldType.BOOLEAN
  11. },
  12. tags: [
  13. 'product_name', 'device_name'
  14. ]
  15. }
  16. ]
  17. })
  18. class InfluxDBService{
  19. sstatic writeConnectionData({productName, deviceName, connected, ts}) {
  20. var timestamp = ts == null ? Math.floor(Date.now() / 1000) : ts
  21. influx.writePoints([
  22. {
  23. measurement: 'device_connections',
  24. tags: {product_name: productName, device_name: deviceName},
  25. fields: {connected: connected},
  26. timestamp: timestamp
  27. }
  28. ], {
  29. precision: 's',
  30. }).catch(err => {
  31. console.error(`Error saving data to InfluxDB! ${err.stack}`)
  32. })
  33. }
  34. }
  35. module.exports = InfluxDBService

这里使用 Measurement:”device_connections” 来存储设备连接状态变化的数据,Tag 为一个二元组(ProductName, DeviceName),可以唯一标识一台设备,Field 是一个布尔值,标识设备的连接状态。我们在设备上线/下线时,调用相应的方法:

  1. //IotHub_Server/models/device.js
  2. deviceSchema.statics.addConnection = function (event) {
  3. var username_arr = event.username.split("/")
  4. let productName = username_arr[0];
  5. let deviceName = username_arr[1];
  6. this.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
  7. if (err == null && device != null) {
  8. ...
  9. influxDBService.writeConnectionData({
  10. productName: productName,
  11. deviceName: deviceName,
  12. connected: true,
  13. ts: event.connected_at
  14. })
  15. }
  16. })
  17. }
  18. deviceSchema.statics.removeConnection = function (event) {
  19. var username_arr = event.username.split("/")
  20. let productName = username_arr[0];
  21. let deviceName = username_arr[1];
  22. this.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
  23. if (err == null && device != null) {
  24. ...
  25. influxDBService.writeConnectionData({
  26. productName: productName,
  27. deviceName: deviceName,
  28. connected: false
  29. })
  30. }
  31. })
  32. }

最后运行IotHub_Device/samples/connect_to_server.js,等看到device is online的输出以后,按Ctrl+C终止程序,重复操作几次之后运行:influx -precision -s,接着查询device_connections:

  1. use iothub
  2. select * from device_connections

这时,我们会看到,设备的连接状态变化已经被存入了 InfluxDB:

  1. Connected to http://localhost:8086 version 1.7.6
  2. InfluxDB shell version: 1.7.6
  3. Enter an InfluxQL query
  4. > use iothub
  5. Using database iothub
  6. > select * from device_connections
  7. name: device_connections
  8. time connected device_name product_name
  9. ---- --------- ----------- ------------
  10. 1559046440 true 60de4bqyu IotApp
  11. 1559046442 false 60de4bqyu IotApp
  12. 1559046443 true 60de4bqyu IotApp
  13. 1559046444 false 60de4bqyu IotApp
  14. >

influx -precision -s 告诉 InfluxDB 使用 UNIX 时间戳格式来显示 time 字段。

通常我们可以将这些监控数据可视化,但这已经超出了本课程的范围,就留给大家自行去实现了。

本课程的第二部分,上行数据处理就结束了。接下来我们进入第三部分: 下行数据处理