ES写数据过程
ElasticSearch会先给任意一个节点去写数据,这个节点就是协调节点,协调节点会对这个数据进行hash操作,hash计算完了之后会根据hash结果来分配这个数据是写到哪个shard上面去.然后就开始写到这个shard的primary,写完了primary shard之后就会给数据同步到replica shard上. 等数据同步完了之后就返回响应结果给客户端.
1、客户端选择一个 coordinating node (协调节点)发送请求过去
2、协调节点对document(文档)进行路由,将请求转发给对应的node(有primary shard)
路由算法:shard = hash(routing) % number_of_primary_shards
3、实际node上的primary shard处理请求,然后将数据同步倒replica node
4、协调节点如果发现primary shard和所有的replica shard都搞定的时候,就返回响应结果给客户端
写数据底层原理
1)先写入buffer;同时将数据写入translog日志文件,
因为你给数据只是写到buffer里面,如果此时宕机了,那么数据就丢掉了,所以就会同时将数据写入到translog日志文件件里面去.
当数据暂存在buffer和translog里面的时候,如果有客户端来搜索这条数据暂时是搜索不到的.
2)如果buffer快满了,或者到一定时间(默认情况下是每隔一秒种),就会将buffer数据refresh到一个新的segment file(本地磁盘里面)中,但是此时数据不是直接进入segment file的磁盘文件的,而是先进入os cache的。这个过程就是refresh。
每隔1秒钟,es将buffer中的数据写入一个新的segment file,每秒钟会产生一个新的磁盘文件,segment file,这个segment file中就存储最近1秒内buffer中写入的数据
但是如果buffer里面此时没有数据,那当然不会执行refresh操作咯,每秒创建换一个空的segment file,如果buffer里面有数据,默认1秒钟执行一次refresh操作,刷入一个新的segment file中
操作系统里面,磁盘文件其实都有一个东西,叫做os cache,操作系统缓存,就是说数据写入磁盘文件之前,会先进入os cache,先进入操作系统级别的一个内存缓存中去.
只要buffer中的数据被refresh操作,刷入os cache中,就代表这个数据就可以被外部客户端搜索到了.
为什么叫es是准实时的?NRT,near real-time,准实时。默认是每隔1秒refresh一次的,所以es是准实时的,因为写入的数据1秒之后才能被看到。
可以通过es的restful api或者java api,手动执行一次refresh操作,就是手动将buffer中的数据刷入os cache中,让数据立马就可以被搜索到。
只要数据被输入os cache中,buffer就会被清空了,因为不需要保留buffer了,数据在translog里面已经持久化到磁盘去一份了
3)只要数据进入os cache,此时就可以让这个segment file的数据对外提供搜索了
4)重复1~3步骤,新的数据不断进入buffer和translog,不断将buffer数据写入一个又一个新的segment file中去,每次refresh完buffer清空,translog保留。随着这个过程推进,translog会变得越来越大。当translog达到一定长度的时候,就会触发commit操作。
buffer中的数据,倒是好,每隔1秒就被刷到os cache中去,然后这个buffer就被清空了。所以说这个buffer的数据始终是可以保持住不会填满es进程的内存的。
每次一条数据写入buffer,同时会写入一条日志到translog日志文件中去,所以这个translog日志文件是不断变大的,当translog日志文件大到一定程度的时候,就会执行commit操作。
5)commit操作发生第一步,就是将buffer中现有数据refresh到os cache中去,清空buffer
6)将一个commit point写入磁盘文件,里面标识着这个commit point对应的所有segment file
7)强行将os cache中目前所有的数据都fsync到磁盘文件中去
translog日志文件的作用是什么?就是在你执行commit操作之前,数据要么是停留在buffer中,要么是停留在os cache中,无论是buffer还是os cache都是内存,一旦这台机器死了,内存中的数据就全丢了。
所以需要将数据对应的操作写入一个专门的日志文件,translog日志文件中,一旦此时机器宕机,再次重启的时候,es会自动读取translog日志文件中的数据,恢复到内存buffer和os cache中去。
commit操作:1、写commit point;2、将os cache数据fsync强刷到磁盘上去;3、清空translog日志文件
8)将现有的translog清空,然后再次重启启用一个translog,此时commit操作完成。默认每隔30分钟会自动执行一次commit,但是如果translog过大,也会触发commit。整个commit的过程,叫做flush操作。我们可以手动执行flush操作,就是将所有os cache数据刷到磁盘文件中去。
不叫做commit操作,flush操作。es中的flush操作,就对应着commit的全过程。我们也可以通过es api,手动执行flush操作,手动将os cache中的数据fsync强刷到磁盘上去,记录一个commit point,清空translog日志文件。
9)translog其实也是先写入os cache的,默认每隔5秒刷一次到磁盘中去,所以默认情况下,可能有5秒的数据会仅仅停留在buffer或者translog文件的os cache中,如果此时机器挂了,会丢失5秒钟的数据。但是这样性能比较好,最多丢5秒的数据。也可以将translog设置成每次写操作必须是直接fsync到磁盘,但是性能会差很多。
实际上你在这里,如果面试官没有问你es丢数据的问题,你可以在这里给面试官炫一把,你说,其实es第一是准实时的,数据写入1秒后可以搜索到;可能会丢失数据的,你的数据有5秒的数据,停留在buffer、translog os cache、segment file os cache中,有5秒的数据不在磁盘上,此时如果宕机,会导致5秒的数据丢失。
如果你希望一定不能丢失数据的话,你可以设置个参数,官方文档,百度一下。每次写入一条数据,都是写入buffer,同时写入磁盘上的translog,但是这会导致写性能、写入吞吐量会下降一个数量级。本来一秒钟可以写2000条,现在你一秒钟只能写200条,都有可能。
- 如果是删除操作,commit操作的时候就会生成一个.del文件,将这个document标识为deleted状态,在搜索的搜索的时候就不会被搜索到了。
12. 如果是更新操作,就是将原来的document标识为deleted状态,然后新写入一条数据
13. buffer每次refresh一次,就会产生一个segment file,所以默认情况下是1秒钟一个segmentfile,segment file会越来越多,当躲到一定程度的时候,es就会自动触发merge(合并)造作,将所有segment file文件 merge成一个segment file,并同时物理删除掉标识为deleted的doc,
es 第一是准实时的,数据写入 1 秒后可以搜索到;可能会丢失数据的。有 5 秒的数据,停留在 buffer、translog os cache、segment file os cache 中,而不在磁盘上,此时如果宕机,会导致 5 秒的数据丢失。
总结一下,数据先写入内存 buffer,然后每隔 1s,将数据 refresh 到 os cache,到了 os cache 数据就能被搜索到(所以我们才说 es 从写入到能被搜索到,中间有 1s 的延迟)。每隔 5s,将数据写入 translog 文件(这样如果机器宕机,内存数据全没,最多会有 5s 的数据丢失),translog 大到一定程度,或者默认每隔 30mins,会触发 commit 操作,将缓冲区的数据都 flush 到 segment file 磁盘文件中。
数据写入 segment file 之后,同时就建立好了倒排索引。
一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。
每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的
更新和删除文档过程
删除和更新也都是写操作,但是Elasticsearch中的文档是不可变的,因此不能被删除或者改动以展示其变更;
删除文档过程:
磁盘上的每个段都有一个相应的.del文件。当删除请求发送后,文档并没有真的被删除,而是在.del文件中被标记为删除。该文档依然能匹配查询,但是会在结果中被过滤掉,从而不让用户看到。当段合并时,在.del文件中被标记为删除的文档将不会被写入新段。
更新文档过程:
1如果是更新操作,就是将原来的doc标识为deleted状态,然后新写入一条数据
2.buffer每次refresh一次,就会产生一个segment file,所以默认情况下是1秒钟一个segment file,segment file会越来越多,此时会定期执行merge
3.每次merge的时候,会将多个segment file合并成一个,同时这里会将标识为deleted的doc给物理删除掉,然后将新的segment file写入磁盘,这里会写一个commit point,标识所有新的segment file,然后打开segment file供搜索使用,同时删除旧的segment file。
es里的写流程,有4个底层的核心概念,refresh、flush、translog、merge
当segment file多到一定程度的时候,es就会自动触发merge(合并)操作,将多个segment file给merge成一个segment file,在merge的时候,会看一下如果某条数据被标识成了.del删除标识,那么在merge后的新文件里面,这条数据就没了.
ES读数据过程
查询,GET某一条数据,
比如说写入了某个document,这个document会自动给你分配一个全局唯一的id,doc id,同时也是根据doc id进行hash路由到对应的primary shard上面去。也可以手动指定doc id,比如用订单id,用户id。
你可以通过doc id来查询,会根据doc id进行hash,判断出来当时把doc id分配到了哪个shard上面去,从那个shard去查询
- 客户端发送请求到任意一个 node,成为
coordinate node
(协调节点) 。 协调节点
对doc id
进行哈希路由,将请求转发到对应的 node,此时会使用round-robin
随机轮询算法,在primary shard
以及其所有 replica 中随机选择一个,让读请求负载均衡。- 接收请求的 node 返回 document 给
协调节点
。 协调节点
返回 document 给客户端。
ES搜索数据的过程
ElasticSearch是一个分布式的存储和检索系统,在存储的时候默认是根据每条记录的_id字段做路由分发的,这意味着ElasticSearch服务端是准确知道每个document分布在那个shard上的。
相对比于CURD上操作,search一个比较复杂的执行模式,因为我们不知道哪些document会被匹配到,任何一个shard上都有可能,所以一个search请求必须查询一个索引或多个索引里面的所有shard才能完整的查询到我们想要的结果。
找到所有匹配的结果是查询的第一步,来自多个shard上的数据集在分页返回到客户端的之前会被合并到一个排序后的list列表,由于需要经过一步取top N的操作,所以search需要进过两个阶段才能完成,分别是query和fetch。
比如你有三条数据:"java 真好玩儿啊" "java 好难学啊" "j2ee 特别牛"
你根据 java
关键词来搜索,将包含 java
的 document
给搜索出来。es 就会给你返回:”java 真好玩儿啊,java 好难学啊”。
- 客户端发送请求到一个协调节点 。
- 协调节点将搜索请求转发到所有的 shard 对应的
primary shard
或replica shard
,都可以。 - query phase:每个 shard 将自己的搜索结果(其实就是一些
doc id
)返回给协调节点,由协调节点进行数据的合并、排序、分页等操作,产出最终结果。 - fetch phase:接着由协调节点根据
doc id
去各个节点上拉取实际的document
数据,最终返回给客户端。
写请求是写入 primary shard,然后同步给所有的 replica shard;读请求可以从 primary shard 或 replica shard 读取,采用的是随机轮询算法。
详细说明
一.query(查询阶段)
当一个search请求发出的时候,这个query会被广播到索引里面的每一个shard(主shard或副本shard),每个shard会在本地执行查询请求后会生成一个命中文档的优先级队列。
这个队列是一个排序好的top N数据的列表,它的size等于from+size的和,也就是说如果你的from是10,size是10,那么这个队列的size就是20,所以这也是为什么深度分页不能用from+size这种方式,因为from越大,性能就越低。
es里面分布式search的查询流程如下:
客户端发送一个search请求到Node 3上,然后Node 3会创建一个优先级队列它的大小=from+size2,接着Node 3转发这个search请求到索引里面每一个主shard或者副本shard上,每个shard会在本地查询然后添加结果到本地的排序好的优先级队列里面。3,每个shard返回docId和所有参与排序字段的值例如_score到优先级队列里面,然后再返回给coordinating节点也就是Node 3,然后Node 3负责将所有shard里面的数据给合并到一个全局的排序的列表。
上面提到一个术语叫coordinating node,这个节点是当search请求随机负载的发送到一个节点上,然后这个节点就会成为一个coordinating node,它的职责是广播search请求到所有相关的shard上,然后合并他们的响应结果到一个全局的排序列表中然后进行第二个fetch阶段,注意这个结果集仅仅包含docId和所有排序的字段值,search请求可以被主shard或者副本shard处理,这也是为什么我们说增加副本的个数就能增加搜索吞吐量的原因,coordinating节点将会通过round-robin的方式自动负载均衡。
二.fetch(读取阶段)
流程如下:
1,coordinating 节点标识了那些document需要被拉取出来,并发送一个批量的mutil get请求到相关的shard上 2,每个shard加载相关document,如果需要他们将会被返回到coordinating 节点上 3,一旦所有的document被拉取回来,coordinating节点将会返回结果集到客户端上。
这里需要注意,coordinating节点拉取的时候只拉取需要被拉取的数据,比如from=90,size=10,那么fetch只会读取需要被读取的10条数据,这10条数据可能在一个shard上,也可能在多个shard上所以 coordinating节点会构建一个multi-get请求并发送到每一个shard上,每个shard会根据需要从_source字段里面获取数据,一旦所有的数据返回,coordinating节点会组装数据进入单个response里面然后将其返回给最终的client。
三.总结:
本文介绍了es的分布式search的查询流程分为query和fetch两个阶段,在query阶段会从所有的shard上读取相关document的docId及相关的排序字段值,并最终在coordinating节点上收集所有的结果数进入一个全局的排序列表后,然后获取根据from+size指定page页的数据,获取这些docId后再构建一个multi-get请求发送相关的shard上从_source里面获取需要加载的数据,最终再返回给client端,至此整个search请求流程执行完毕