Prometheus Prometheus 作为云原生时代的时序数据库, 是当下最流行的监控平台之一,尽管其整体架构一直没怎么变,但其底层的存储引擎却演进了几个版本,感兴趣的读者可参考 Prometheus 存储层的演进(https://tech.ipalfish.com/blog/2020/03/31/the-evolution-of-prometheus-storage-layer/)。本文主要介绍 Prometheus V2(即现在使用的)版本的存储格式细节,以及查询是如何定位到符合条件的数据,旨在通过本文的分析,对 Prometheus 的存储引擎有更深入了解。 说明:本文并不会涉及到查询的解析与函数求值过程。代码分析基于 v2.25.2 版本。

背景知识

时序特点

时序数据的特点可以用一话概括:垂直写(最新数据),水平查。

Prometheus 存储引擎分析 - 图1

对于云原生场景来说,另一个特点是数据生命周期短,一次容器的扩缩容会导致时间线膨胀一倍。了解这两个特点后,来看看 Prometheus 是如何存储数据来迎合上述模式:
  1. ├── 01BKGV7JC0RY8A6MACW02A2PJD // block 的 ULID
  2. ├── chunks
  3. └── 000001
  4. ├── tombstones
  5. ├── index
  6. └── meta.json
  7. ├── chunks_head
  8. └── 000001
  9. └── wal
  10. ├── 000000002
  11. └── checkpoint.00000001
  12. └── 00000000
可以看到,数据目录主要有以下几部分:
  • block,一个时间段内(默认 2 小时)的所有数据,只读,用 ULID 命名。每一个 block 内主要包括: - chunks 固定大小(最大 128M)的 chunks 文件 - index 索引文件,主要包含倒排索引的信息 - meta.json 元信息,主要包括 block 的 minTime/maxTime,方便查询时过滤
  • chunks_head,当前在写入的 block 对应的 chunks 文件,只读,最多 120 个数据点,时间跨度最大 2 小时。
  • wal,Prometheus 采用攒批的方式来异步刷盘,因此需要 WAL 来保证数据可靠性

Prometheus 存储引擎分析 - 图2

通过上面的目录结构,不难看出 Prometheus 的设计思路:
  • 通过数据按时间分片的方式来解决数据生命周期短的问题
  • 通过内存攒批的方式来对应只写最新数据的场景

数据模式

Prometheus 支持的模式比较简单,只支持单值模式,如下:
  1. cpu_usage{core="1", ip="130.25.175.171"} 14.04 1618137750
  2. metric labels value timesample

倒排索引

索引是支持多维搜索的主要手段,时序中的索引结构和搜索引擎的类似,是个倒排索引,可参考下图

Prometheus 存储引擎分析 - 图3

在一次查询中,会对涉及到的 label 分别求对应的 postings lists(即时间线集合),然后根据 filter 类型进行集合运算,最后根据运算结果得出的时间线,去查相应数据即可。

磁盘存储格式

数据格式

  1. ┌──────────────────────────────┐
  2. magic(0x0130BC91) <4 byte>
  3. ├──────────────────────────────┤
  4. version(1) <1 byte>
  5. ├──────────────────────────────┤
  6. padding(0) <3 byte>
  7. ├──────────────────────────────┤
  8. ┌──────────────────────────┐
  9. Chunk 1
  10. ├──────────────────────────┤
  11. ...
  12. ├──────────────────────────┤
  13. Chunk N
  14. └──────────────────────────┘
  15. └──────────────────────────────┘
  16. # 单个 chunk 内的结构
  17. ┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐
  18. | series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len <uvarint> | data <bytes> CRC32 <4 byte>
  19. └─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘
chunk 为数据在磁盘中的最小组织单元,需要明确以下两点:
  1. 单个 chunk 的时间跨度默认是 2 小时,Prometheus 后台会有合并操作,把时间相邻的 block 合到一起
  2. series ref 为时间线的唯一标示,由 8 个字节组成,前 4 个表示文件 id,后 4 个表示在文件内的 offset,需配合后文的索引结构来实现数据的定位

索引格式

  1. ┌────────────────────────────┬─────────────────────┐
  2. magic(0xBAAAD700) <4b> version(1) <1 byte>
  3. ├────────────────────────────┴─────────────────────┤
  4. ┌──────────────────────────────────────────────┐
  5. Symbol Table
  6. ├──────────────────────────────────────────────┤
  7. Series
  8. ├──────────────────────────────────────────────┤
  9. Label Index 1
  10. ├──────────────────────────────────────────────┤
  11. ...
  12. ├──────────────────────────────────────────────┤
  13. Label Index N
  14. ├──────────────────────────────────────────────┤
  15. Postings 1
  16. ├──────────────────────────────────────────────┤
  17. ...
  18. ├──────────────────────────────────────────────┤
  19. Postings N
  20. ├──────────────────────────────────────────────┤
  21. Label Offset Table
  22. ├──────────────────────────────────────────────┤
  23. Postings Offset Table
  24. ├──────────────────────────────────────────────┤
  25. TOC
  26. └──────────────────────────────────────────────┘
  27. └──────────────────────────────────────────────────┘
在一个索引文件中,最主要的是以下几部分(从下往上):
  1. TOC 存储的是其他部分的 offset
  2. Postings Offset Table,用来存储倒排索引,Key 为 label name/value 序对,Value 为 Postings 在文件中的 offset。
  3. Postings N,存储的是具体的时间线序列
  4. Series,存储的是当前时间线,对应的 chunk 文件信息
  5. Label Offset Table 与 Label Index 目前在查询时没有使用到,这里不再讲述
每个部分的具体编码格式,可参考官方文档 Index Disk Format,这里重点讲述一次查询是如何找到符合条件的数据的:
  • 首先在 Posting Offset Table 中,找到对应 label 的 Postings 位置

Prometheus 存储引擎分析 - 图4

  • 然后再根据 Postings 中的 series 信息,找到对应的 chunk 位置,即上文中的 series ref。

Prometheus 存储引擎分析 - 图5

使用方式

Prometheus 在启动时,会去加载数据元信息到内存中。主要有下面两部分:
  • block 的元信息,最主要的是 mint/maxt,用来确定一次查询是否需要查看当前 block 文件,之后把 chunks 文件以 mmap 方式打开
  1. // open all blocks
  2. bDirs, err := blockDirs(dir)
  3. for _, bDir := range bDirs {
  4. meta, _, err := readMetaFile(bDir)
  5. // See if we already have the block in memory or open it otherwise.
  6. block, open := getBlock(loaded, meta.ULID)
  7. if !open {
  8. block, err = OpenBlock(l, bDir, chunkPool)
  9. if err != nil {
  10. corrupted[meta.ULID] = err
  11. continue
  12. }
  13. }
  14. blocks = append(blocks, block)
  15. }
  16. // open chunk files
  17. for _, fn := range files {
  18. f, err := fileutil.OpenMmapFile(fn)
  19. if err != nil {
  20. return nil, tsdb_errors.NewMulti(
  21. errors.Wrap(err, "mmap files"),
  22. tsdb_errors.CloseAll(cs),
  23. ).Err()
  24. }
  25. cs = append(cs, f)
  26. bs = append(bs, realByteSlice(f.Bytes()))
  27. }
  • block 对应的索引信息,主要是倒排索引。由于单个 label 对应的 Postings 可能会非常大,Prometheus 不是全量加载,而是每隔 32 个加载,来减轻内存压力。并且保证第一个与最后一个一定被加载,查询时采用类似跳表的方式进行 posting 定位。
    下面代码为 DB 启动时,读入 postings 的逻辑:
  1. // For the postings offset table we keep every label name but only every nth
  2. // label value (plus the first and last one), to save memory.
  3. ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
  4. if _, ok := r.postings[key[0]]; !ok {
  5. // Next label name.
  6. r.postings[key[0]] = []postingOffset{}
  7. if lastKey != nil {
  8. // Always include last value for each label name.
  9. r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
  10. }
  11. lastKey = nil
  12. valueCount = 0
  13. }
  14. if valueCount%32 == 0 {
  15. r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
  16. lastKey = nil
  17. } else {
  18. lastKey = key
  19. lastOff = off
  20. }
  21. valueCount++
  22. }
  23. if lastKey != nil {
  24. r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
  25. }
下面代码为根据 label 查询 postings 的逻辑,完整可见 index 的 Postings 方法:
  1. e, ok := r.postings[name] // name 为 label key
  2. if !ok || len(values) == 0 { // values 为当前需要查询的 label values
  3. return EmptyPostings(), nil
  4. }
  5. res := make([]Postings, 0, len(values))
  6. skip := 0
  7. valueIndex := 0
  8. for valueIndex < len(values) && values[valueIndex] < e[0].value {
  9. // Discard values before the start.
  10. valueIndex++
  11. }
  12. for valueIndex < len(values) {
  13. value := values[valueIndex]
  14. // 用二分查找,找到当前 value 在 postings 中的位置
  15. i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
  16. if i == len(e) {
  17. // We're past the end.
  18. break
  19. }
  20. if i > 0 && e[i].value != value { // postings 中没有该 value,需要用前面一个来在文件中搜索
  21. // Need to look from previous entry.
  22. i--
  23. }
  24. // Don't Crc32 the entire postings offset table, this is very slow
  25. // so hope any issues were caught at startup.
  26. d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
  27. d.Skip(e[i].off)
  28. // Iterate on the offset table.
  29. var postingsOff uint64 // The offset into the postings table.
  30. for d.Err() == nil {
  31. // ... skip 逻辑省略
  32. v := d.UvarintBytes() // Label value.
  33. postingsOff = d.Uvarint64() // Offset.
  34. for string(v) >= value {
  35. if string(v) == value {
  36. // Read from the postings table.
  37. d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
  38. _, p, err := r.dec.Postings(d2.Get())
  39. res = append(res, p)
  40. }
  41. valueIndex++
  42. if valueIndex == len(values) {
  43. break
  44. }
  45. value = values[valueIndex]
  46. }
  47. if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
  48. // Need to go to a later postings offset entry, if there is one.
  49. break
  50. }
  51. }
  52. }

内存结构

Block 在 Prometheus 实现中,主要分为两类:
  • 当前正在写入的,称为 head。当超过 2 小时或超过 120 个点时,head 会将 chunk 写入到本地磁盘中,并使用 mmap 映射到内存中,保存在下文的 mmappedChunk 中。
  • 历史只读的,存放在一数组中
  1. type DB struct {
  2. blocks []*Block
  3. head *Head
  4. // ... 忽略其他字段
  5. }
  6. // Block 内的主要字段是 IndexReader,其内部主要是 postings,即倒排索引
  7. // Map of LabelName to a list of some LabelValues's position in the offset table.
  8. // The first and last values for each name are always present.
  9. postings map[string][]postingOffset
  10. type postingOffset struct {
  11. value string // label value
  12. off int // posting 在对于文件中的 offset
  13. }
在上文磁盘结构中介绍过,postingOffset 不是全量加载,而是每隔 32 个。

Head

  1. type DB struct {
  2. blocks []*Block
  3. head *Head
  4. // ... 忽略其他字段
  5. }
  6. // Block 内的主要字段是 IndexReader,其内部主要是 postings,即倒排索引
  7. // Map of LabelName to a list of some LabelValues's position in the offset table.
  8. // The first and last values for each name are always present.
  9. postings map[string][]postingOffset
  10. type postingOffset struct {
  11. value string // label value
  12. off int // posting 在对于文件中的 offset
  13. }
  • MemPostings 是 Head 中的索引结构,与 Block 的 postingOffset 不同,posting 是全量加载的,毕竟 Head 保存的数据较小,对内存压力也小。
  1. type stripeSeries struct {
  2. size int
  3. series []map[uint64]*memSeries
  4. hashes []seriesHashmap
  5. locks []stripeLock
  6. seriesLifecycleCallback SeriesLifecycleCallback
  7. }
  8. type memSeries struct {
  9. sync.RWMutex
  10. mmappedChunks []*mmappedChunk // 只读
  11. headChunk *memChunk // 读写
  12. ...... // 省略其他字段
  13. }
  14. type mmappedChunk struct {
  15. // 数据文件在磁盘上的位置,即上文中的 series ref
  16. ref uint64
  17. numSamples uint16
  18. minTime, maxTime int64
  19. }
  • stripeSeries 是比较的核心结构,series 字段的 key 为时间线,采用自增方式生成;value 为 memSeries,内部有存储具体数据的 chunk,采用分段锁思路来减少锁竞争。

使用方式

对于一个查询,大概涉及的步骤:
  1. 根据 label 查出所涉及到的时间线,然后根据 filter 类型,进行集合运算,找出符合要求的时间线
  2. 根据时间线信息与时间范围信息,去 block 内查询符合条件的数据
在第一步主要在 PostingsForMatchers 函数中完成,主要有下面几个优化点:
  • 对于取反的 filter( != !~ ),转化为等于的形式,这样因为等于形式对应的时间线往往会少于取反的效果,最后在合并时,减去这些取反的时间线即可。
  • 不同 label 的时间线合并时,利用了时间线有序的特点,采用类似 mergesort 的方式来惰性合并,大致过程如下:
  1. type intersectPostings struct {
  2. arr []Postings // 需要合并的时间线数组
  3. cur uint64 // 当前的时间线
  4. }
  5. func (it *intersectPostings) doNext() bool {
  6. Loop:
  7. for {
  8. for _, p := range it.arr {
  9. if !p.Seek(it.cur) {
  10. return false
  11. }
  12. if p.At() > it.cur {
  13. it.cur = p.At()
  14. continue Loop
  15. }
  16. }
  17. return true
  18. }
  19. }
  20. func (it *intersectPostings) Next() bool {
  21. for _, p := range it.arr {
  22. if !p.Next() {
  23. return false
  24. }
  25. if p.At() > it.cur {
  26. it.cur = p.At()
  27. }
  28. }
  29. return it.doNext()
  30. }
在第一步查出符合条件的 chunk 所在文件以及 offset 信息之后,第二步的取数据则相对简单,直接使用 mmap 读数据即可,这间接利用操作系统的 page cache 来做缓存,自身不需要再去实现 Buffer Pool 之类的数据结构。

总结

通过上文的分析,大体上把 Prometheus 的存储结构以及查询流程分析了一遍,还有些细节没再展开去介绍,比如为了节约内存使用,label 使用了字典压缩,但这并不妨碍读者理解其原理。 此外,Prometheus 默认 2 小时一个 Block 对大时间范围查询不友好,因此其后台会对定期 chunk 文件进行 compaction,合并后的文件大小为 <font style="color:rgb(38, 38, 38);">min(31d, retention_time * 0.1)</font> ,相关细节后面有机会再单独介绍吧。

参考

Prometheus时序数据库-数据的查询 https://my.oschina.net/alchemystar/blog/4985328

Prometheus时序数据库-磁盘中的存储结构 https://my.oschina.net/alchemystar/blog/4965684

Prometheus TSDB (Part 1): The Head Block https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/

Prometheus TSDB (Part 4): Persistent Block and its Index https://ganeshvernekar.com/blog/prometheus-tsdb-persistent-block-and-its-index/

Prometheus TSDB (Part 5): Queries https://ganeshvernekar.com/blog/prometheus-tsdb-queries/

Prometheus: The Unicorn in Metrics https://www.alibabacloud.com/blog/prometheus-the-unicorn-in-metrics_595168

Writing a Time Series Database from Scratch https://fabxc.org/tsdb/

https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/index.md