在 Elasticsearch 中,写入单个文档的请求称为 Index 请求,批量写入的请求称为 Bulk 请求。写入单个文档和多个文档在 Elasticsearch 内部使用相同的处理逻辑,请求被统一封装为 BulkRequest。写操作必须先在主分片执行成功后才能复制到相关的副分片,写单个文档的基本流程如下图所示:
image.png
(1)客户端向 NODE1 发送写请求。

(2)NODE1 使用文档 ID 来确定文档属于分片 0,通过集群状态中的内容路由表信息获知分片 0 的主分片位于 NODE3,因此请求被转发到 NODE3 上。

(3)NODE3 上的主分片执行写操作。如果写入成功,则它将请求并行转发到 NODE1 和 NODE2 并等待返回结果。当所有的副分片都报告成功,NODE3 将向协调节点报告成功,协调节点再向客户端报告成功。

Index/Bulk 详细流程

在写入过程中,不同角色的节点执行的任务如下图所示:
image.png
下面分别讨论在各个节点上执行的流程

1. 协调节点流程

协调节点负责创建索引、转发请求到主分片节点、等待响应、回复客户端。源码实现为 TransportBulkAction

1)参数检查
协调节点收到用户请求后首先检测请求的合法性,把检查操作放在处理流程的第一步,有问题就直接拒绝,这样对异常请求的处理代价是最小的。检查操作会对 index、type、id、source、contentType 等参数检查,每项检查遇到异常都会拒绝当前请求。

2)处理 pipeline 请求
协调节点默认也是预处理(Ingest)节点,数据预处理工作通过定义 pipeline 和 processors 实现。如果 Index 或 Bulk 请求中指定了 pipeline 参数,则先使用相应的 pipeline 进行处理。如果本节点不具备预处理资格,则将请求随机转发到其他具备预处理资格的节点。

3)自动创建索引
如果配置为允许自动创建索引(默认允许),则计算请求涉及的索引中哪些索引是不存在的,然后创建它。如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败,其他索引正常执行写流程。

创建索引的请求会被发送到 Master 节点,而 Master 节点在执行完创建索引的工作并将新的 clusterState 发布完毕(默认收到半数以上的节点 Response 就认为发布成功)才会返回 Response,协调节点会等待收到全部索引创建请求的 Response(无论成功还是失败的)后才进入下一个流程。

4)对请求的预先处理
这里不同于对数据的预处理,对请求的预先处理只是检查参数、自动生成 id、处理 routing 等。由于上一步可能有创建索引操作,所以在此先获取最新集群状态信息,然后遍历所有请求,从集群状态中获取对应索引的元信息对 mapping、routing、id 等信息进行检查。如果 id 不存在,则自动生成一个 UUID。

5)检测集群状态
协调节点在开始处理时会先检测集群状态,若集群异常则取消写入。例如,Master 节点不存在时会阻塞等待 Master 节点的响应直至超时。

6)构建基于 shard 的请求
将用户的 BulkRequest 重新组织为基于 shard 的请求列表。例如,原始的用户请求可能有 10 个写操作,如果这些文档的主分片都属于同一个,则写请求会被合并为 1 个。所以这里本质上是合并请求的过程。

7)路由算法
路由算法就是根据 routing 和文档 id 计算目标 shardid 的过程。一般情况下,路由计算方式为下面的公式:

  1. shard = hash(routing) % number_of_primary_shards

默认 routing 值为文档 id,Elasticsearch 使用随机文档 id 和 Hash 算法来确保文档会均匀地分配给集群内的各个分片。但要注意如果使用自定义 id 或 routing 时,id 或 routing 的值可能不够随机,造成数据倾斜。

8)转发请求并等待响应
主要是根据集群状态中的内容路由表确定主分片所在节点,转发请求并等待响应。如果主分片不在本机,则转发到相应的节点,否则在本地执行。

该过程会遍历所有需要写的 shard,将位于某个 shard 的请求封装为 BulkShardRequest 类,发送到对应 shard 上执行并等待响应,每个响应也是以 shard 为单位的。如果某个 shard 的响应中部分文档写失败了,则将异常信息填充到 Response 中,整体请求做成功处理。

等收到所有 shard 的响应后(无论成功还是失败的)再回复给客户端。

2. 主分片节点流程

主分片所在节点负责在本地写主分片,写成功后,转发写副本分片的请求,然后等待响应并回复协调节点。

1)检查请求
主分片所在节点收到协调节点发来的请求后也是先做了校验工作,主要检测要写的是否是主分片,Allocationid 是否符合预期,索引是否处于关闭状态等。

2)是否延迟执行
判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继续下面的流程。

3)判断主分片是否已发生迁移
如果己经发生迁移,则转发请求到迁移的节点。

4)检测写一致性
在开始写之前,检测本次写操作涉及的 shard 以及活跃 shard 数量是否足够,不足则不执行写入。默认为 1 表示只要主分片可用就执行写入。

5)写 Lucene 和事务日志
遍历请求,逐条对文档进行索引。Elasticsearch 的 Engine 接口封装了 Lucene 和 translog 的调用,对外提供读写接口。在写入 Lucene 前先生成 seq_no 和 version,seq_no 每次递增 1,version 则根据当前文档的最大版本加 1。索引过程为先写 Lucene 再写 translog,因为 Lucene 写入时会对数据进行检查,写操作有可能会失败,所以要确保写入 Lucene 成功后再去写 translog。

文档会先被写入到内存中,然后 Elasticsearch 默认每秒执行一次 refresh 操作将文档写到 Segment 中,此时会先写入到文件系统缓存,写入文件系统缓存后文档就能够被读取了,只是还没有持久化,之后会通过定时执行的 flush 操作调用 fsync 异步刷新到磁盘,并记录 Segment 的提交点。

在写入 Lucene 成功后,translog 也会将文档信息追加写入,在 ES 2.x 版本后,每次写请求(index、delete、update、bulk)后都会强制刷新 translog 到磁盘中以保证数据不丢失。
image.png
index.translog.durability 属性还可以配置为 ASYNC 类型,表示定时异步刷新磁盘,默认每隔 5s 会 fsync 到磁盘(index.translog.sync_interval)。所以此时如果节点异常宕机,最多丢失 5 秒的数据(即使 Segment 还没有持久化到磁盘),节点重启后会重放 translog。

参考链接:https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html

6)flush translog
根据配置的 translog flush 策略进行刷盘控制。

Elasticsearch 默认每隔 30 分钟执行一次 flush 操作或 translog 太大(默认 512M)也会执行,该操作会先清空内存缓冲区,将未合并到 Segment 上的文档进行合并,然后调用 fsync 将文件系统缓存中的 Segment 文件持久化到磁盘,并记录 Segment 的提交点,最后清空 translog。
image.png
7)写副分片
统计要写的副本 shard 列表,循环处理每个 shard 并跳过 unassigned 状态的 shard,然后向目标节点发送请求并等待响应。这个过程是异步并行的。本节点发出了多少个 Request 就要等待多少个 Response,无论这些响应成功的还是失败的,直到超时为止。当收集到全部的 Response 后将其返回给协调节点,告知哪些节点执行成功、哪些节点执行失败。

在主分片写入过程中,写入是阻塞的。只有写入成功,才会发起写副本请求。如果分片写入失败,则整个请求被认为处理失败。如果有部分副本分片写入失败,则整个请求还是被认为处理成功。

8)处理分片写入失败的情况
写入期间可能会发生很多错误——硬盘损坏、节点离线或者某些配置错误,这些错误都可能导致无法在副分片上执行某个操作,虽然这比较少见,但是主分片必须汇报这些错误信息。

对于主分片自身错误的情况,它所在的节点会发送一个消息到 Master 节点。这个索引操作会等待(默认一分钟)Master 节点提升一个副分片为主分片,然后把这个写入操作转发给新的主分片。

当副本分片写入失败时,主分片所在节点将发送一个 shardFailed 请求给 Master 节点,然后 Master 会更新集群状态,在新的集群状态中,这个 shard 将:

  • 从 in_sync_allocations 列表中删除
  • 在 routing_table 的 shard 列表中将 state 由 STARTED 更改为 UNASSIGNED
  • 添加到 routingNodes 的 unassignedShards 列表

3. refresh

image.png
在写操作中,一般会先在 ES 的 JVM 内存中(Index Buffer)缓冲一段数据,再将这些数据写入硬盘,每次写入硬盘的这批数据被称为一个分段(Segment)。一般情况下,通过操作系统 write 接口写到磁盘的数据先到达文件系统缓存,write 函数返回成功时,数据未必被刷到磁盘。随后可以通过手工调用 flush 或者操作系统通过一定策略将系统缓存刷到磁盘中。这种策略大幅提升了写入效率。从 write 函数返回成功开始,无论数据有没有被刷到磁盘,该数据已经对读取是可见的了。

ES 正是利用这种特性实现了近实时搜索。默认每秒产生一个新分段,可通过 index.refresh_interval 参数修改,新段先写入文件系统缓存,稍后再执行 flush 刷盘操作,因此写操作很快会执行完,一旦写成功,就可以像其他文件一样被打开和读取了,数据就可以被搜索到了。

由于文档先写 Index Buffer 再写 Segment 且新段写入后不会立即刷到磁盘中,在这两个过程中如果节点异常宕机则存在丢失数据的风险。为了保证数据不会丢失,Elasticsearch 会在索引文档时,同时记录事务日志,事务日志默认 5s 刷盘或者每次写请求完成后执行(而 flush 默认每 30 分钟执行一次),每个分片都有一个对应的事务日志。当 Elasticsearch 重新启动时,会重放 translog 中所有在最后一次提交后发生的变更操作。

Index Buffer 被占满时也会触发 Refresh,默认值为 JVM 的 10%

4. merge

在 ES 中,每秒会清空一次内存中的写缓冲,然后将这些数据写入文件,每次写入会创建一个新的 Lucene 段。但是 Segment 数量太多会带来较大的麻烦,每个 Segment 都会消耗文件句柄、内存。并且每个搜索请求都需要轮流检查每个 Segment,查询完再对结果进行合并;所以 Segment 越多,搜索也就越慢。

因此需要通过一定的策略将这些较小的 Segment 合并为大的 Segment,在 Elasticsearch 后台会有一个线程专门进行 Segment 的合并,通常是选择大小相似的 Segment 进行合并。这些 Segment 既可以是未提交的也可以是已提交的。
image.png
Segment 一旦创建就不可改变,如果我们要删除或更新文档时,Elasticsearch 会先进行逻辑更新或删除,待更新或删除的文档会先保存在 .del 文件中,查询时会进行过滤。在 merge 时,标记为删除的数据不会写入到新的 Segment 中,这样当 merge 结束后,旧的 Segment 被删除,新的 Segment 被 Flush 到磁盘上,这样之前标记删除的数据才从磁盘上真正删除掉了。
image.png
HBase、Cassandra 等系统都有类似的分段机制,写过程中先在内存缓冲一批数据,不时地将这些数据写入文件作为一个分段,分段具有不变性,再通过一些策略合并分段。分段合并过程中,新段的产生需要一定的磁盘空间,所以我们要保证系统有足够的剩余可用空间。

5. flush

image.png

  • 调用 refresh,Index Buffer 会清空并生成 Segment 写入文件系统缓存
  • 调用 fsync,将文件系统缓存中的 Segments 写入磁盘
  • 清空(删除)旧的 translog
  • 默认 30 分钟调用一次,如果 translog 满了也会触发(默认 512 MB)

写入性能优化

Elasticsearch 提供的默认设置,是综合考虑了数据可靠性、搜索实时性、写入速度等多种因素的。当更改默认设置、需要追求极致的写入速度时,很多时候是以牺牲可靠性和搜索实时性为代价的。但有时业务上对数据可靠性和搜索实时性要求并不高,反而对写入速度要求很高,此时可以调整一些策略,最大化写入速度。

1. Translog flush 间隔调整

从 ES 2.x 开始,translog 的默认持久化策略为:每个写请求(index、bulk、delete)完成都 sync 到磁盘

  1. index.translog.durability: request

这是影响 ES 写入速度的最大因素。但是只有这样,写操作才有可能是可靠的。

如果系统可以接受一定概率的数据丢失,例如,数据写入主分片成功但尚未复制到副分片时节点宕机,此时数据既没有刷到 Lucene 也没有刷到 translog 上,恢复时 translog 中没有这个数据,导致数据丢失。我们可以通过调整 translog 持久化策略为 async 并为异步刷新设置一个间隔(默认 5s)来降低系统 iops、writeblock。

  1. index.translog.durability: async

设置为 async 表示 translog 的刷盘策略按 index.translog.sync_interval 配置的时间周期进行

  1. index.translog.sync_interval: 120s

此外,当 translog 文件过大(默认 512M)时也会进行 flush 操作,我们可以通过如下配置适当调大:

  1. index.translog.flush_threshold_size: 512MB

2. refresh_intervaI 间隔调整

默认情况下索引的 refresh_interval 为 1 秒,这意味着数据写入 1 秒后就可以被搜索到,每次索引的 refresh 会产生一个新的 Lucene 段,而分段过多会导致频繁的 segment merge 行为,如果不需要这么高的搜索实时性可以降低索引的 refresh 周期,这样可以降低系统 I/O 及 segment merge 的频率。

  1. index.refresh_interval: 120s

如果设置为 -1 会禁止自动 refresh。

3. segment merge 优化

segment merge 操作对系统 I/O 和内存占用都比较高,从 ES 2.0 开始,merge 行为不再由 ES 控制,而是交由 Lucene 控制,ES 控制 merge 操作的线程数由如下配置控制:

  1. index.merge.scheduler.max_thread_count

最大线程数 max_thread_count 的默认值为:

  1. Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))

以上是一个比较理想的值,如果只有一块硬盘并且非 SSD,则应该把它设置为 1,因为在旋转存储介质上并发写的话,由于寻址的原因只会降低写入的速度。

此外,我们还可以降低最大分段的大小,让 Segment 在达到一定大小时就不再进行 merge 操作,避免了较大的 Segment 还继续参与 merge 操作,节省系统资源。

  1. # 默认为 10,表示每次合并的分段数,值越小就需要越多次的合并操作
  2. index.merge.policy.segments_per_tier: 10
  3. # 默认 5GB,Segment达到此大小后就不在参与合并操作
  4. index.merge.policy.max_merged_segment: 5GB

4. indexing buffer 优化

我们知道文档在写入时会先写入到 Index Buffer 中,然后通过 refresh_interval 配置的间隔定时刷新到一批文档到 Lucene 中作为一个 Segment。但如果还未到刷新间隔,Index Buffer 却已经被占满也会触发 refresh,这是另一个生成新 Segment 的机会。

每个 shard 有自己的 Index Buffer,默认每个节点上占用的 Index Buffer 大小为整个堆空间的 10%,由这个节点上的所有 shard 平分。

  1. indices.memory.index_buffer_size

在执行大量的索引操作时,indices.memory.index_buffer_size 的默认设置可能不够,这和可用堆内存、单节点上的 shard 数量相关,此时我们可以考虑适当增大该值。

5. 使用 bulk 请求

批量写比一个索引请求只写单个文档的效率高得多,但是要注意 bulk 请求的整体字节数不要太大,太大的请求可能会给集群带来内存压力。官方建议大约 5~15 MB。

bulk 写请求是个长任务,请求超时时间需要足够长。同时为了给系统增加足够的写入压力,写入过程应该多个客户端、多线程地并行执行,如果要验证系统的极限写入能力,那么目标就是把 CPU 压满。磁盘 util、内存等一般都不是瓶颈。如果 CPU 没有压满,则应该提高写入端的并发数量。

但是要注意 bulk 线程池队列的 reject 情况,出现 reject 代表 ES 的 bulk 队列已满,客户端请求被拒绝,此时客户端会收到 429 错误(TOO_MANY_REQUESTS),客户端对此的处理策略应该是延迟重试并且可以适当调整线程数量。不可忽略这个异常,否则写入系统的数据会少于预期。

即使客户端正确处理了 429 错误,我们仍然应尽量避免产生 reject。因此,在评估极限的写入能力时,客户端的极限写入并发量应该控制在不产生 reject 前提下的最大值为宜。

6. 索引过程调整和优化

1)自动生成 doc ID
通过 ES 写入流程可以看出,写入 doc 时如果在外部显式指定了文档 id,则 ES 会先尝试读取原来 doc 的版本号以判断是否需要进行更新。这会涉及一次读取磁盘的操作,通过自动生成 doc ID 可以避免这个环节。

2)调整字段 Mappings
将不需要建立索引的字段 index 属性设置为 not_analyzed 或 no。对字段不分词或者不索引可以减少很多运算操作,降低 CPU 占用。尤其是 binary 类型,默认情况下占用 CPU 非常高,而对这种字段类型进行分词通常没有什么意义。还可以对字段的 index_options 属性进行优化,控制在创建倒排索引时,哪些内容会被添加到倒排索引中。

此外,尽量根据业务场景减少字段数量及内容长度,如果原始数据的大段内容无须全部建立索引,则可以尽量减少不必要的内容。并且文档的字段应尽量保证相同的顺序,可以提高文档的压缩率。

3)调整 _source 字段
_source 字段用于存储 doc 原始数据,对于部分不需要存储的字段,可以通过 includes excludes 属性过滤或直接将 _source 禁用,这样可以降低 I/O 压力,不过实际场景中大多不会禁用 _source,而即使过滤掉某些字段对于写入速度的提升作用也不大。因为在满负荷写入情况下,基本是 CPU 先跑满了,瓶颈在于 CPU。

4)禁用 _all 字段
从 ES 6.0 开始,_all 字段默认为不启用,而在此前的版本中,_all 字段默认是开启的。_all 字段中包含所有字段分词后的关键词,作用是可以在搜索的时候不指定特定字段,从所有字段中检索。在 ES 6.0 默认禁用 _all 字段的主要有以下几点原因:

  • 由于需要从其他的全部字段复制所有字段值,导致 _all 字段占用非常大的空间。
  • _all 字段有自己的分析器,在进行某些查询时(例如,同义词)结果不符合预期,因为没有匹配对应字段的同一个分析器。
  • 由于数据重复引起的额外建立索引的开销。
  • 有更好的替代方法。

在 ES 6.0 之前的版本中,可以在 mapping 中将 enabled 设置为 false 来禁用 _all 字段:

  1. curl -XPUT "localhost:9200/my_index" -H 'Content-Type:application/json' -d'
  2. {
  3. "mappings" : {
  4. "_all" : {
  5. "enabled" : false
  6. },
  7. ......
  8. }
  9. }

关于这个问题的更多讨论可以参考:https://github.com/elastic/elasticsearch/issues/19784。禁用 _all 字段可以明显降低对 CPU 和 I/O 的压力。

5)对 Analyzed 的字段禁用 Norms
Norms 用于在搜索时计算 doc 的评分,如果不需要评分,则可以将其禁用。

6)禁用 dynamic
不要对字符串使用默认的 dynamic mapping,ES 默认会对字符串字段生成 text 类型及 keyword 子类型,如果字段数量过多会对性能产生比较大的影响。