在前面的课程中,我们完成了 IotHub 上行数据处理的功能。截止目前,IotHub 使用 MongoDB 作为数据存储,这很好,不过在物联网的应用中,在某些情况下,我们可能还会用到别的存储方案,比如说时序数据库,那么这一节我们就来聊聊时序数据库。
时序数据
首先让我们来看一下什么是时序数据。时序数据是一类按照时间维度进行索引的数据,它记录了某个被测量的实体在一定时间范围内,每个时间点上的一组测试值。比如说,传感器上传的大棚每小时的湿度和温度数据、A 股中某支股票每个时间点的股价、计算机系统的监控数据等,都属于时序数据,时序数据有以下一些特点:
- 数据量较大,写入操作是持续且平稳的,而且写多读少;
- 只有写入操作,几乎没有更新操作,比如去修改大棚温度和湿度的历史数据,那是没有什么意义的;
- 没有随机删除,即使删除也是按照时间范围进行删除。 删除大棚 08:35 的温度记录没有任何实际意义,但是删除 6 个月以前的记录是有意义的。
- 数据实时性和时效性很强,数据随着时间的推移不断追加,旧数据很快失去意义。
- 大部分以时间和实体为维度进行查询,很少以测试值为维度查询,比如会去查询某个大棚某个时间段的温度数据,但是很少会去查温度高于多少度的记录。
如果说你的业务数据符合上面的条件,比如,你的业务数据属于监控、运维类的数据,或者你的数据需要用折线图之类的进行可视化,那么你就可以考虑使用时序数据库。
时序数据库
时序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有 InfluxDB、OpenTSDB、TimeScaleDB 等。在本课程中,我们使用 InfluxDB 数据库进行演示。
时序数据库有以下几个概念。
- Metric:度量,可以当作关系型数据库中的表(table)。
- Data Point:数据点,可以当作关系型数据库的中的行(row)。
- Timestamp:时间戳,数据点生成时的时间戳。
- Field:测量值,比如温度和湿度
- Tag:标签,用于标识数据点,通常用来标识数据点的来源,比如温度和湿度数据来自哪个大棚,可以当作关系型数据库表的主键。
我画了一张图来方便大家理解这几个概念:
Vents:度量 Metric,存储所有大棚的温度和湿度数据
Humidiity 和 Temperature:测量值 Field
Vent No. 和 Vent Section: Tag 标签,标识测量值来自于哪个大棚
Time:时间戳 Timestamp
框起来的这部分就是一个 Data Point,包含 Timestamp、Field、Tag
收集设备连接状态变化的数据
本课程选择将 IotHub 中的一些状态监控的数据存入 InfluxDB 来演示如何使用时序数据库,目前我们的数据中,设备连接状态变化(上线/下线)可以算作一种时序数据,我们将这个数据加入时序数据库,这样就可以看到设备连接状态的变化情况,对故障排查也是有帮助的。
安装 InfluxDB
根据 InfluxDB 1.7 documentation 下载安装对应平台的 InfluxDB,本课程里使用的 InfluxDB 版本为 1.7.6。
请按照文档里的步骤配置并运行 InfluxDB,默认情况下 InfluxDB 运行在localhost:8086
。
首先,我们创建一个数据库:
运行 InfluxDB CLI:
influx
onnected to http://localhost:8086 version 1.7.6
InfluxDB shell version: 1.7.6
Enter an InfluxQL query
然后输入
create database iothub
,回车,数据库就创建好了。存储数据
接下来做的事情就比较简单了,IotHub 在获取到设备上线和下线时间时,将对应的数据写入 InfluxDB,这里我们使用 InfluxDB 的 Node.js 库 Influx。
在 InfluxDB 中,度量 Metric 被称为 Measurement。
//IotHub_Server/services/influxdb_service.js
const Influx = require('influx')
const influx = new Influx.InfluxDB({
host: process.env.INFLUXDB,
database: 'iothub',
schema: [
{
measurement: 'device_connections',
fields: {
connected: Influx.FieldType.BOOLEAN
},
tags: [
'product_name', 'device_name'
]
}
]
})
class InfluxDBService{
sstatic writeConnectionData({productName, deviceName, connected, ts}) {
var timestamp = ts == null ? Math.floor(Date.now() / 1000) : ts
influx.writePoints([
{
measurement: 'device_connections',
tags: {product_name: productName, device_name: deviceName},
fields: {connected: connected},
timestamp: timestamp
}
], {
precision: 's',
}).catch(err => {
console.error(`Error saving data to InfluxDB! ${err.stack}`)
})
}
}
module.exports = InfluxDBService
这里使用 Measurement:”device_connections” 来存储设备连接状态变化的数据,Tag 为一个二元组(ProductName, DeviceName),可以唯一标识一台设备,Field 是一个布尔值,标识设备的连接状态。我们在设备上线/下线时,调用相应的方法:
//IotHub_Server/models/device.js
deviceSchema.statics.addConnection = function (event) {
var username_arr = event.username.split("/")
let productName = username_arr[0];
let deviceName = username_arr[1];
this.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
if (err == null && device != null) {
...
influxDBService.writeConnectionData({
productName: productName,
deviceName: deviceName,
connected: true,
ts: event.connected_at
})
}
})
}
deviceSchema.statics.removeConnection = function (event) {
var username_arr = event.username.split("/")
let productName = username_arr[0];
let deviceName = username_arr[1];
this.findOne({product_name: productName, device_name: deviceName}, function (err, device) {
if (err == null && device != null) {
...
influxDBService.writeConnectionData({
productName: productName,
deviceName: deviceName,
connected: false
})
}
})
}
最后运行IotHub_Device/samples/connect_to_server.js
,等看到device is online
的输出以后,按Ctrl+C
终止程序,重复操作几次之后运行:influx -precision -s
,接着查询device_connections
:
use iothub
select * from device_connections
这时,我们会看到,设备的连接状态变化已经被存入了 InfluxDB:
Connected to http://localhost:8086 version 1.7.6
InfluxDB shell version: 1.7.6
Enter an InfluxQL query
> use iothub
Using database iothub
> select * from device_connections
name: device_connections
time connected device_name product_name
---- --------- ----------- ------------
1559046440 true 60de4bqyu IotApp
1559046442 false 60de4bqyu IotApp
1559046443 true 60de4bqyu IotApp
1559046444 false 60de4bqyu IotApp
>
influx -precision -s 告诉 InfluxDB 使用 UNIX 时间戳格式来显示 time 字段。
通常我们可以将这些监控数据可视化,但这已经超出了本课程的范围,就留给大家自行去实现了。
本课程的第二部分,上行数据处理就结束了。接下来我们进入第三部分: 下行数据处理。