集群原理

集群就是多个node统一对外提供服务。这样,就避免了单节点故障带来的服务的中断,保证了服务的高可用,同时,也因为多台节点的协同运作,提高了集群服务的计算能力和吞吐量。ES服务在实际应用中也是以集群的方式存在。

集群架构

image.png
对于用户来说, ES是一个无中心化的集群,ES集群内部运行原理是对外面来说是透明的。你操作一个节点跟操作一个集群是一样的。也就是说,ES集群没有中心节点,任何一个节点出现故障都不会影响其它节点。这是由ES本身特性所决定的。这是它的典型特性。但是通过集群内部来看ES是有节点的。

在ElasticSearch的架构中,有三类角色,分别是Client Node、Data Node和Master Node。搜索查询的请求一般是经过Client Node来向Data Node获取数据,而索引查询首先请求Master Node节点,然后Master Node将请求分配到多个Data Node节点完成一次索引查询。
image.png
集群中各节点的作用:

  • Master Node:用于元数据的处理,比如索引的新增、删除、分片分配等,以及管理集群各个节点的状态包括集群节点的协调、调度。elasticsearch集群中可以定义多个主节点,但是,在同一时刻,只有一个主节点起作用,其它定义的主节点,是作为主节点的候选节点存在。当一个主节点故障后,集群会从候选主节点中选举出新的主节点。也就是说,主节点的产生都是由选举产生的。Master节点它仅仅是对索引的管理、集群状态的管理。像其它的对数据的存储、查询都不需要经过这个Master节点。因此在ES集群中。它的压力是比较小的。所以,我们在构建ES的集群当中,Master节点可以不用选择太好的配置,但是我们一定要保证服务器的安全性。因此,必须要保证主节点的稳定性。
  • Data Node:存储数据的节点,数据的读取、写入最终的作用都会落到这个上面。数据的分片、搜索、整合等 这些操作都会在数据节点来完成。因此,数据节点的操作都是比较消耗CPU、内存、I/O资源。所以,我们在选择Data Node数据节点的时候,硬件配置一定要高一些。高的硬件配置可以获得高效的存储和分析能力。因为最终的结果都是需要到这个节点上来。
  • Client Node:可选节点,该节点处理路由请求的分发、汇总等,它也会存储一些元数据信息,但是不会对数据做任何修改,仅仅用来存储。它的好处是可以分担Data Node的一部分压力。因为ES查询是两层汇聚的结果,第一层是在Data Node上做查询结果的汇聚。然后把结果发送到Client Node 上来。Cllient Node收到结果后会再做第二次的结果汇聚。然后Cllient Node会把最终的结果返回给用户。

从上面的结构图可以看到ES集群的工作流程:

  1. 搜索查询,比如Kibana去查询ES的时候,默认走的是Client Node。然后由Client Node将请求转发到Data Node上。Data Node上的结构返回给Client Node.然后再返回给客户端。
  2. 索引查询,比如我们调用API去查询的时候,走的是Master Node,然后由Master Node将请求转发到相应的数据节点上,然后再由Master Node将结果返回。
  3. 最终所有的服务请求都到了Data Node上。所以,它的压力是最大的。

线上集群

节点配置
在生产环境下,如果不修改elasticsearch节点的角色信息,在高数据量,高并发的场景下集群容易出现脑裂等问题。

默认情况下,elasticsearch 集群中每个节点都有成为主节点的资格,也都存储数据,还可以提供查询服务。这些功能是由两个属性控制的:

  1. # 这个属性表示节点是否具有成为主节点的资格。此属性的值为 true,并不意味着这个节点就是主节点。
  2. # 因为真正的主节点,是由多个具有主节点资格的节点进行选举产生的。
  3. # 所以,这个属性只是代表这个节点是不是具有主节点选举资格。
  4. node.master: true
  5. # 这个属性表示节点是否存储数据
  6. node.data: true

组合方式

  1. node.master: true
  2. node.data: true
  3. # 预处理节点
  4. node.ingest: true

这种组合表示这个节点没有成为主节点的资格,也就不参与选举,只会存储数据。这个节点我们称为数据节点。在集群中需要单独设置几个这样的节点负责存储数据。后期提供存储和查询服务。


  1. node.master: true
  2. node.data: false
  3. node.ingest: false

这种组合表示这个节点不会存储数据,有成为主节点的资格,可以参与选举,有可能成为真正的主节点。这个节点我们称为Master Node。


  1. node.master: false
  2. node.data: false
  3. node.ingest: true

这种组合表示这个节点即不会成为主节点,也不会存储数据,这个节点的意义是作为一个Client Node,主要是针对海量请求的时候可以进行负载均衡。
在新版 ElasticSearch5.x 之后该节点称之为 Coordinate Node,其中还增加了一个叫 Ingest Node,用于预处理数据(索引和搜索阶段都可以用到)。当然,作为一般应用是不需要这个预处理节点做什么额外的预处理过程,那么这个节点和我们称之为 Client Node 之间可以看做是等同的。

总结
Master Node:普通服务器即可,因为CPU、内存消耗一般 ;
Data Node:主要消耗磁盘、内存 ;
Client | Ingest Node:普通服务器即可,如果要进行分组聚合操作的话,建议这个节点内存也分配多一点;

分片原理

分片介绍

分片是 Elasticsearch 在集群中分发数据的关键。把分片想象成数据的容器,文档存储在分片中,然后分片分配到集群中的节点上。当集群扩容或缩小,Elasticsearch 将会自动在节点间迁移分片,以使集群保持平衡。

一个分片是一个最小级别 “工作单元”,它只是保存了索引中所有数据的一部分。这类似于 MySql 的分库分表,只不过 Mysql 分库分表需要借助第三方组件,而 ES 内部自身实现了此功能。分片可以是 主分片 或者是 复制分片 。

主分片

在一个多分片的索引中写入数据时,通过路由来确定具体写入哪一个分片中,大致路由过程如下:

  1. # routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。
  2. # number_of_primary_shards 表示主分片的数量。
  3. shard = hash(routing) % number_of_primary_shards 3

routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards后得到余数 。这个在 0 到 number_of_primary_shards 之间的余数,就是所寻求的文档所在分片的位置。

这解释了为什么要在创建索引的时候就确定好主分片的数量并且永远不会改变这个数量:因为如果分片数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。

索引中的每个文档属于一个单独的主分片,所以主分片的数量决定了索引最多能存储多少数据(实际的数量取决于数据、硬件和应用场景)。


复制分片

复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请求,比如搜索或者从别的 分片中取回文档。

每个主分片都有一个或多个副本分片,当主分片异常时,副本可以提供数据的查询等操作。主分片和对应的副本分片是不会在同一个节点上的,所以副本分片数的最大值为节点数 - 1

当索引创建完成的时候,主分片的数量就固定了,但是复制分片的数量可以随时调整,根据需求扩大或者缩小规模。

分片本身就是一个完整的搜索引擎,它可以使用单一节点的所有资源。主分片或者复制分片都可以处理读请求(搜索或文档检索),所以数据的冗余越多,能处理的搜索吞吐量就越大。

对文档的新建、索引和删除请求都是写操作,必须在主分片上面完成之后才能被复制到相关的副本分片,ES 为了提高写入的能力这个过程是并发写的,同时为了解决并发写的过程中数据冲突的问题,ES通过乐观锁的方式控制,每个文档都有一个 _version 版本号,当文档被修改时版本号递增。一旦所有的副本分片都报告写成功才会向协调节点报告成功,协调节点向客户端报告成功。

分片的存储

写索引过程

ES 集群中每个节点通过路由都知道集群中的文档的存放位置,所以每个节点都有处理读写请求的能力。

在一个写请求被发送到某个节点后,该节点即为协调节点,协调节点会根据路由公式计算出需要写到哪个分片上,再将请求转发到该分片的主分片节点上。假设 shard = hash(routing) % 4 = 0 ,则过程大致如下:
image.png

  1. 客户端向 ES1节点(协调节点)发送写请求,通过路由计算公式得到值为0,则当前数据应被写到主分片S0上。
  2. ES1 节点将请求转发到 S0 主分片所在的节点 ES3,ES3 接受请求并写入到磁盘。
  3. 并发将数据复制到两个副本分片 R0 上,其中通过乐观并发控制数据的冲突。一旦所有的副本分片都报告成功,则节点 ES3 将向协调节点报告成功,协调节点向客户端报告成功。

存储原理

索引的不可变性

写入磁盘的倒排索引是不可变的,优势主要表现在:

  1. 不需要锁。因为如果从来不需要更新一个索引,就不必担心多个程序同时尝试修改,也就不需要锁。
  2. 一旦索引被读入内核的文件系统缓存,便会留在哪里,由于其不变性,只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。
  3. 其它缓存(像filter缓存),在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
  4. 写入单个大的倒排索引,可以压缩数据,较少磁盘 IO 和需要缓存索引的内存大小。

当然,不可变的索引有它的缺点:

  1. 当对旧数据进行删除时,旧数据不会马上被删除,而是在.del文件中被标记为删除。而旧数据只能等到段更新时才能被移除,这样会造成大量的空间浪费。
  2. 若有一条数据频繁的更新,每次更新都是新增新的标记旧的,则会有大量的空间浪费。
  3. 每次新增数据时都需要新增一个段来存储数据。当段的数量太多时,对服务器的资源例如文件句柄的消耗会非常大。
  4. 在查询的结果中包含所有的结果集,需要排除被标记删除的旧数据,这增加了查询的负担。

段的引入

在全文检索的早些时候,会为整个文档集合建立一个大索引,并且写入磁盘。只有新的索引准备好了,它就会替代旧的索引,最近的修改才可以被检索。这无疑是低效的。

因为索引的不可变性带来的好处,那如何在保持不可变同时更新倒排索引?
答案是,使用多个索引。不是重写整个倒排索引,而是增加额外的索引反映最近的变化。每个倒排索引都可以按顺序查询,从最老的开始,最后把结果聚合。

这就引入了段(segment):

  • 新的文档首先写入内存区的索引缓存,这时不可检索。
  • 时不时(默认 1s 一次),内存区的索引缓存被 refresh 到文件系统缓存(该过程比直接到磁盘代价低很多),成为一个新的段并被打开,这时可以被检索。
  • 新的段提交,写入磁盘,提交后,新的段加入提交点,缓存被清除,等待接收新的文档。

image.png
在底层采用了分段的存储模式,使它在读写时几乎完全避免了锁的出现,大大提升了读写性能。
分片下的索引文件被拆分为多个子文件,每个子文件叫作段, 每一个段本身都是一个倒排索引,并且段具有不变性,一旦索引的数据被写入硬盘,就不可再修改。

段被写入到磁盘后会生成一个提交点,提交点是一个用来记录所有提交后段信息的文件。一个段一旦拥有了提交点,就说明这个段只有读的权限,失去了写的权限。相反,当段在内存中时,就只有写的权限,而不具备读数据的权限,意味着不能被检索。

索引文件分段存储并且不可修改,那么新增、更新和删除如何处理呢?

  • 新增:新增很好处理,由于数据是新的,所以只需要对当前文档新增一个段就可以了。
  • 删除:由于不可修改,所以对于删除操作,不会把文档从旧的段中移除,而是通过新增一个.del文件(每一个提交点都有一个.del文件),包含了段上已经被删除的文档。当一个文档被删除,它实际上只是在.del文件中被标记为删除,依然可以匹配查询,但是最终返回之前会被从结果中删除。
  • 更新:不能修改旧的段来进行反映文档的更新,其实更新相当于是删除和新增这两个动作组成。会将旧的文档在 .del文件中标记删除,然后文档的新版本被索引到一个新的段中。可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就会被移除。

合并段

由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。每一个段都会消耗文件句柄、内存和 cpu 运行周期。更重要的是,每个搜索请求都必须轮流检查每个段然后合并查询结果,所以段越多,搜索也就越慢。

ES 通过后台合并段解决这个问题。小段被合并成大段,再合并成更大的段。这时旧的文档从文件系统删除的时候,旧的段不会再复制到更大的新段中。合并的过程中不会中断索引和搜索。

image.png
段合并在进行索引和搜索时会自动进行,合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中,这些段既可以是未提交的也可以是已提交的。
image.png
合并结束后老的段会被删除,新的段被 flush 到磁盘,同时写入一个包含新段且排除旧的和较小的段的新提交点,新的段被打开可以用来搜索。

合并大的段会消耗很多 IO 和 CPU,如果不检查会影响到搜素性能。默认情况下,ES 会限制合并过程,这样搜索就可以有足够的资源进行。

延迟写策略

ES 是怎么做到近实时全文搜索?
磁盘是瓶颈。提交一个新的段到磁盘需要 fsync 操作,确保段被物理地写入磁盘,即时电源失效也不会丢失数据。但是 fsync 严重影响性能,当写数据量大的时候会造成 ES 停顿卡死,查询也无法做到快速响应。

所以 fsync 不能在每个文档被索引时就触发,需要一种更轻量级的方式使新的文档可以被搜索,这意味移除 fsync 。

为了提升写的性能,ES 没有每新增一条数据就增加一个段到磁盘上,而是采用延迟写的策略。

每当有新增的数据时,就将其先写入到内存中,在内存和磁盘之间是文件系统缓存,当达到默认的时间(1秒钟)或者内存的数据达到一定量时,会触发一次刷新(Refresh),将内存中的数据生成到一个新的段上并缓存到文件缓存系统上,稍后再被刷新到磁盘中并生成提交点。

这里的内存使用的是ES的JVM内存,而文件缓存系统使用的是操作系统的内存。新的数据会继续的被写入内存,但内存中的数据并不是以段的形式存储的,因此不能提供检索功能。由内存刷新到文件缓存系统的时候会生成了新的段,并将段打开以供搜索使用,而不需要等到被刷新到磁盘。

在 Elasticsearch 中,这种写入和打开一个新段的轻量的过程叫做 refresh (即内存刷新到文件缓存系统)。默认情况下每个分片会每秒自动刷新一次。 这就是为什么说 Elasticsearch 是近实时的搜索了:文档的改动不会立即被搜索,但是会在一秒内可见。

尽管刷新是比提交段轻量很多的操作,它还是会有性能开销。当写测试的时候,手动刷新很有用,但是不要在生产环境下每次索引一个文档都去手动刷新。而且并不是所有的情况都需要每秒刷新。在使用 Elasticsearch 索引大量的日志文件,可能想优化索引速度而不是近实时搜索,这时可以在创建索引时在 settings 中通过 refresh_interval=”30s” 的值 , 降低每个索引的刷新频率,设值时需要注意后面带上时间单位,否则默认是毫秒。当 refresh_interval=-1 时表示关闭索引的自动刷新。

持久化

没用 fsync 同步文件系统缓存到磁盘,不能确保电源失效,甚至正常退出应用后,数据的安全。为了ES 的可靠性,需要确保变更持久化到磁盘。

虽然通过定时 Refresh 获得近实时的搜索,但是 Refresh 只是将数据挪到文件缓存系统,文件缓存系统也是内存空间,属于操作系统的内存,只要是内存都存在断电或异常情况下丢失数据的危险。

为了避免丢失数据,Elasticsearch添加了事务日志(Translog),事务日志记录了所有还没有持久化到磁盘的数据。

有了事务日志,过程现在如下:

  1. 当一个文档被索引,它被加入到内存缓存,同时加到事务日志。不断有新的文档被写入到内存,同时也都会记录到事务日志中。这时新数据还不能被检索和查询。

image.png

  1. 当达到默认的刷新时间或内存中的数据达到一定量后,会触发一次 refresh 。

image.png

  1. 将内存中的数据以一个新段形式刷新到文件缓存系统,但没有 fsync 。
  2. 段被打开,使得新的文档可以搜索。
  3. 缓存被清除。
  4. 随着更多的文档加入到缓存区,写入日志,这个过程会继续。

image.png

  1. 随着新文档索引不断被写入,当日志数据大小超过 512M 或者时间超过 30 分钟时,会进行一次全提交。

image.png

  1. 内存缓存区的所有文档会写入到新段中,同时清除缓存。
  2. 文件系统缓存通过 fsync 操作 flush 到硬盘,生成提交点。
  3. 事务日志文件被删除,创建一个空的新日志。

事务日志记录了没有 flush 到硬盘的所有操作。当故障重启后,ES 会用最近一次提交点从硬盘恢复所有已知的段,并且从日志里恢复所有的操作。

在 ES 中,进行一次提交并删除事务日志的操作叫做 flush 。分片每 30 分钟,或事务日志过大会进行一次 flush 操作。也可用来进行一次手动 flush ,通常很少需要手动 flush ,通常自动的就够了。总体的流程大致如下:
image.png

集群故障转移

概念

当Elasticsearch的某个节点出现故障时,集群会进行一系列的操作,用来保证整个集群的稳定性和数据不被丢失。

当集群中只有一个节点在运行时,意味着会有一个单点故障问题 — 没有冗余。 幸运的是,我们只需再启动一个节点(进行数据备份)即可防止数据丢失。

启动第二个节点后,集群状态如图:
image.png
第二个节点已经加入集群,三个复制分片(replica shards)也已经被分配了——分别对应三个主分片,这意味着在丢失任意一个节点的情况下依旧可以保证数据的完整性。

文档的索引将首先被存储在主分片中,然后并发复制到对应的复制节点上。这可以确保我们的数据在主节点和复制节点上都可以被检索。

集群健康状态

Elasticsearch提供API可以查询集群的健康状况:

  1. GET _cluster/health

输出参数:

  1. {
  2. "cluster_name": "elasticsearch",
  3. "status": "green",
  4. "timed_out": false,
  5. "number_of_nodes": 2,
  6. "number_of_data_nodes": 2,
  7. "active_primary_shards": 3,
  8. "active_shards": 6,
  9. "relocating_shards": 0,
  10. "initializing_shards": 0,
  11. "unassigned_shards": 0
  12. }

通过 status 参数的返回值来查看集群的状态:

状态值 说明
green 健康状态,指所有的主副分片都可用
yellow 指所有主分片都正常,但是有副分片不可用
red 有主分片不可用

故障转移过程

集群由3个节点组成,如下所示,此时集群的状态是 green 。
image.png
如果此时,node1所在的机器宕机导致服务终止,此时集群会如何处理呢?

  1. node2和node3发现node1无法响应一段时间后会发起mater选举,比如这里选择node2为master节点,此时由于主分片P0下线,集群的状态会变成red。

image.png

  1. node2发现主分片P0未分配,将R0提升为主分片。此时由于所有主分片都正常分配,集群状态变为Yellow。

image.png

  1. node2为P0和P1生成新的副本,此时集群的状态变为绿色。

image.png

脑裂问题

image.png
解决方案:

  1. 减少误判:discovery.zen.ping_timeout节点状态的响应时间,默认为3s,可以适当调大,如果master在该响应时间的范围内没有做出响应应答,判断该节点已经挂掉了。可适当减少误判。
  2. 选举触发 discovery.zen.minimum_master_nodes: 1 该参数是用于控制选举行为发生的最小集群主节点数量。当备选主节点的个数大于等于该参数的值,且备选主节点中有该参数个节点认为主节点挂了,进行选举。官方建议为(n/2)+1,n为主节点个数(即有资格成为主节点的节点个数)。增大该参数,当该值为2时,我们可以设置master的数量为3,这样,挂掉一台,其他两台都认为主节点挂掉了,才进行主节点选举。
  3. 角色分离:即master节点与data节点分离,限制角色,减少数据丢失的可能性。另外从节点禁用自动发现机制并为其指定主节点。 ```yaml

    主节点配置

    node.master: true node.data: false

从节点配置

node.master: false node.data: true discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: [“host1”, “host2:port”]

  1. <a name="EPakS"></a>
  2. # 环境搭建
  3. <a name="IZEj6"></a>
  4. ## 单节点
  5. <a name="AtUal"></a>
  6. ### es
  7. - 出于安全考虑 es 默认不允许以 root 账号运行。所以新建一个用户并用 root 账号授权,然后切换新用户
  8. ```bash
  9. # 创建用户
  10. useradd ble
  11. # 设置密码
  12. passwd ble
  13. # 给ble用户相应授权
  14. su root
  15. chown -R ble /usr/local/
  16. # 将ble用户添加到root组
  17. vi /etc/sudoers
  18. +---------------------------------
  19. | # 添加如下内容
  20. | ble ALL=(ALL) ALL
  21. +---------------------------------
  22. # 切换新用户
  23. su ble
  • 下载 es-7.14.0 ,上传到服务器 /usr/local/ 目录中并解压,然后将文件夹重命名

    1. cd /usr/local/
    2. tar -zxvf ./elasticsearch-7.14.0-linux-x86_64.tar.gz
    3. mv ./elasticsearch-7.14.0 es-7.14.0
    4. cd ./es-7.14.0/
  • 修改 jvm.options 文件的配置

    1. vi ./config/jvm.options
    2. +------------------------------
    3. | # 将默认内存配置修改的低一些
    4. | -Xms512m
    5. | -Xmx512m
    6. +------------------------------
  • 修改 elasticsearch.yml 文件的配置 ```bash

    用于存放数据与日志文件

    mkdir data logs

vi ./config/elasticsearch.yml +————————————————————————————————————————————— | # 节点名,es会默认随机指定一个名字,建议指定一个有意义的名称,方便管理 | node.name: node-1 | # 设置索引数据的存储路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开 | path.data: /usr/local/es-7.14.0/data | # 设置日志文件的存储路径,默认是es根目录下的logs文件夹 | path.logs: /usr/local/es-7.14.0/logs | # 允许外网访问 | network.host: 0.0.0.0 | http.port: 9200 | # 自动群集检查 | discovery.seed_hosts: [“192.168.73.141”] | # 手动指定可以成为mater的所有节点的name或者ip,确定在第一次选举中符合主机资格的节点的集合 | cluster.initial_master_nodes: [“node-1”] +—————————————————————————————————————————————

  1. - 配置 JDK 环境变量。es 解压后自带 JDK ,所以只需要配置一下即可
  2. ```bash
  3. vi ~/.bash_profile
  4. +---------------------------------------------
  5. | # 添加如下内容
  6. | JAVA_HOME=/usr/local/es-7.14.0/jdk/
  7. | PATH=$PATH:$JAVA_HOME/bin
  8. +---------------------------------------------
  9. source ~/.bash_profile
  • 运行时常见错误,最好提前修改以下内容 ```bash su root

文件权限问题

vi /etc/security/limits.conf +—————————————— | # 添加如下内容 | soft nofile 65536 | hard nofile 131072 | soft nproc 4096 | hard nproc 4096 +——————————————

线程数据量问题

vi /etc/security/limits.d/20-nproc.conf +———————————- | # 修改为如下内容 | * soft nproc 4096 +———————————-

虚拟内存过低

vi /etc/sysctl.conf +———————————- | # 添加如下内容 | vm.max_map_count=655360 +———————————-

修改后执行命令

sysctl -p

su ble

  1. - 开放防火墙访问权限
  2. ```bash
  3. # 开放防火墙端口,重启,查看开放端口
  4. sudo firewall-cmd --zone=public --add-port=9200/tcp --permanent
  5. sudo firewall-cmd --reload
  6. sudo firewall-cmd --list-ports

查看es进程

jps

  1. <a name="yCEOw"></a>
  2. ### kibana
  3. - 下载 [kibaina-7.14.0](https://artifacts.elastic.co/downloads/kibana/kibana-7.14.0-linux-x86_64.tar.gz) ,上传到服务器 /usr/local/ 目录中并解压,然后将文件夹重命名
  4. ```bash
  5. cd /usr/local/
  6. tar -zxvf ./kibana-7.14.0-linux-x86_64.tar.gz
  7. mv ./kibana-7.14.0-linux-x86_64 kibana-7.14.0
  8. cd ./kibana-7.14.0/
  • 修改 kibana.yml 文件的配置

    1. vi ./config/kibana.yml
    2. +--------------------------------------------------
    3. | # 允许任意ip访问
    4. | server.host: "0.0.0.0"
    5. | # es的集群地址
    6. | elasticsearch.hosts: ["http://192.168.73.141:9200"]
    7. +--------------------------------------------------
  • 开放防火墙访问权限

    1. # 开放防火墙端口,重启,查看开放端口
    2. sudo firewall-cmd --zone=public --add-port=5601/tcp --permanent
    3. sudo firewall-cmd --reload
    4. sudo firewall-cmd --list-ports
  • 启动 kibana ,访问 http://192.168.73.141:5601 ```bash nohup ./bin/kibana > /dev/null 2>&1 &

查看kibana进程

ps -ef | grep node

  1. <a name="R3OZt"></a>
  2. ### ik
  3. - 下载 [ik-7.14.0](https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.14.0/elasticsearch-analysis-ik-7.14.0.zip) ,上传到服务器 /usr/local/ 目录中并解压到指定目录中
  4. ```bash
  5. # 没有就安装
  6. yum install unzip
  7. # 解压到ik文件夹中
  8. unzip ./elasticsearch-analysis-ik-7.14.0.zip -d ./es-7.14.0/plugins/ik
  • 重启 es

    1. ./es-7.14.0/bin/elasticsearch -d
  • 验证 ``` POST _analyze { “analyzer”: “ik_smart”, “text”: “开心最重要” }

POST _analyze { “analyzer”: “ik_max_word”, “text”: “开心最重要” }

  1. <a name="hW4Xr"></a>
  2. ## 多节点
  3. 一主二从<br />主机:192.168.73.141<br />主节点:9200<br />从节点:9201、9202
  4. es本身就是集群的,所以集群非常简单,这里以9200节点为例,另外两个节点均做相应操作即可。
  5. - 下载 [es-7.14.0](https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.14.0-linux-x86_64.tar.gz) ,上传到服务器指定目录中。然后做如下操作。
  6. ```bash
  7. mkdir /usr/local/es-cluster/
  8. cd /usr/local/es-cluster/
  9. tar -zxvf ./elasticsearch-7.14.0-linux-x86_64.tar.gz
  10. mv ./elasticsearch-7.14.0 9200
  11. mkdir ./9200/data
  • 修改 elasticsearch.yml 文件的配置。

    1. vi ./9200/config/elasticsearch.yml
    1. # 集群名称必须一致
    2. cluster.name: es
    3. # 节点名称,每个es节点必须不一样
    4. node.name: node-1
    5. path.data: /usr/local/es-cluster/9200/data
    6. path.logs: /usr/local/es-cluster/9200/logs
    7. network.host: 0.0.0.0
    8. http.port: 9200
    9. discovery.seed_hosts: ["192.168.73.141"]
    10. # 这里让 node-1 有资格成为主节点
    11. cluster.initial_master_nodes: ["node-1"]
    12. # 表示该节点有资格选举为master
    13. node.master: true
    14. # 表示该节点可存储索引数据
    15. node.data: true
    16. # 表示集群最少的master数,如果集群的最少master数据少于指定的数,将无法启动,推荐 N/2+1
    17. discovery.zen.minimum_master_nodes: 2
    18. # 跨域
    19. http.cors.enabled: true
    20. http.cors.allow-origin: "*"
  • 其它两个节点也做相应操作,完成后启动三个es实例节点即可

    1. cd /usr/local/es-cluster/
    2. ./9200/bin/elasticsearch -d
    3. ./9201/bin/elasticsearch -d
    4. ./9202/bin/elasticsearch -d

集群管理工具

  • 拉取并启动镜像

    1. docker pull lmenezes/cerebro:latest
    2. docker run -d -p 9000:9000 --name cerebro lmenezes/cerebro:latest
  • 开放防火墙访问权限

    1. # 开放防火墙端口,重启,查看开放端口
    2. sudo firewall-cmd --zone=public --add-port=9000/tcp --permanent
    3. sudo firewall-cmd --reload
    4. sudo firewall-cmd --list-ports

SpringBoot集成

前提

  • 引入maven依赖
    • spring-boot-starter-data-elasticsearch
  • application.yml配置

    1. spring:
    2. elasticsearch:
    3. rest:
    4. uris: 192.168.73.141:9200
    5. # username:
    6. # password:
  • 使用注解标记生成的索引库及其字段类型。

    • 值得注意的是,设置合适的字段类型非常非常重要,直接影响到查询结果,甚至异常:

      • text 类型不能做聚合操作,但是可以分词;
      • keyword 类型可以做聚合操作,但是不能分词;
      • 做聚合的字段如果需要做数学运算应设置为数字类型,否则无法计算;
      • 日期类型可以设置成 Date 类型,还可以设置成 Long 类型,这样更省空间;
      • 等等诸多注意的地方。 ```java @Document(indexName = “student”) public class Student { @Id private String id;

      @Field(type = FieldType.Keyword) private String studentId;

      @Field(type = FieldType.Keyword) private String studentName;

      @Field(type = FieldType.Integer) private Integer gender;

      @Field(type = FieldType.Date, format = DateFormat.date_time) private Date birthday;

      @Field(type = FieldType.Integer) private Integer studentScore;

      @Field(type = FieldType.Text, analyzer = “ik_max_word”) private String[] hobby;

      @Field(type = FieldType.Text, analyzer = “ik_max_word”) private String desc;

      // setter and getter } ```

  • 定义一个接口,继承ElasticsearchRepository<T, ID>接口,即可在项目启动时自动在es中生成索引库。

    1. // 如果是简单的文档操作,例如: 新增单个文档、删除单个文档、简单查询等可以使用这个接口
    2. public interface TestRepository extends ElasticsearchRepository<Student, String> {}

索引操作

  • 创建索引库

    1. // 指定文档的数据实体类
    2. IndexOperations indexOperations = this.elasticsearchRestTemplate.indexOps(Student.class);
    3. // 创建索引
    4. Map<String, Object> settings = new HashMap<>();
    5. settings.put("number_of_shards", 1);
    6. settings.put("number_of_replicas", 0);
    7. indexOperations.create(settings);
    8. // 创建字段映射
    9. Document mapping = indexOperations.createMapping();
    10. // 给索引设置字段映射
    11. boolean res = indexOperations.putMapping(mapping);
    12. System.out.println(res);
  • 删除索引库

    1. IndexOperations indexOperations = this.elasticsearchRestTemplate.indexOps(Student.class);
    2. boolean res = indexOperations.delete();
    3. System.out.println(res);

文档操作

新增

  • 添加一条文档信息

    1. Student student = new Student();
    2. student.setStudentId("0644181");
    3. student.setStudentName("ble");
    4. student.setGender(1);
    5. student.setBirthday(new Date());
    6. student.setStudentScore(100);
    7. student.setHobby(new String[] {"吃饭", "睡觉", "打豆豆"});
    8. student.setDesc("没什么可说的");
    9. Student res = this.elasticsearchRestTemplate.save(student);
    10. System.out.println(res);
  • 批量添加文档信息

    1. List<Student> students = new ArrayList<>();
    2. for (int i = 1; i < 10; i++) {
    3. Student student = new Student();
    4. student.setStudentId("06441810" + i);
    5. student.setStudentName("ble" + i);
    6. student.setGender(i % 2);
    7. student.setBirthday(new Date());
    8. student.setStudentScore(i * 10);
    9. student.setHobby(new String[] {"吃饭", "睡觉", "打豆豆"});
    10. student.setDesc("没什么可说的");
    11. students.add(student);
    12. }
    13. List<IndexQuery> indexQueryList = new ArrayList<>();
    14. students.forEach(
    15. student -> {
    16. IndexQuery indexQuery = new IndexQuery();
    17. indexQuery.setObject(student);
    18. indexQueryList.add(indexQuery);
    19. });
    20. List<IndexedObjectInformation> res =
    21. this.elasticsearchRestTemplate.bulkIndex(indexQueryList, Student.class);
    22. System.out.println(res);

更新

  • 根据文档Id更新文档信息

    1. String id = "rsEqxnsBcaj-o5mAlYOn";
    2. String indexName = "student";
    3. Document document = Document.create();
    4. document.put("birthday", new Date());
    5. UpdateResponse res =
    6. this.elasticsearchRestTemplate.update(
    7. UpdateQuery.builder(id).withDocument(document).build(),
    8. IndexCoordinates.of(indexName));
    9. System.out.println(res.getResult());
  • 根据文档Id批量更新

    1. String indexName = "student";
    2. List<UpdateQuery> updateQueryList = new ArrayList<>();
    3. for (String id : ids) {
    4. Document document = Document.create();
    5. document.put("birthday", new Date());
    6. UpdateQuery updateQuery = UpdateQuery.builder(id).withDocument(document).build();
    7. updateQueryList.add(updateQuery);
    8. }
    9. this.elasticsearchRestTemplate.bulkUpdate(updateQueryList, IndexCoordinates.of(indexName));

移除

  • 根据文档Id删除文档信息

    1. String id = "rsEqxnsBcaj-o5mAlYOn";
    2. // 返回被删除的文档Id
    3. String res = this.elasticsearchRestTemplate.delete(id, Student.class);
    4. System.out.println(res);
  • 根据条件删除文档信息

    1. String indexName = "student";
    2. NativeSearchQuery nativeSearchQuery =
    3. new NativeSearchQueryBuilder()
    4. .withQuery(QueryBuilders.termQuery("studentName", "ble"))
    5. .build();
    6. ByQueryResponse res =
    7. this.elasticsearchRestTemplate.delete(
    8. nativeSearchQuery, Student.class, IndexCoordinates.of(indexName));
    9. System.out.println(res);

查询

  • 根据文档Id查询

    1. String id = "r8FVxnsBcaj-o5mAAIP0";
    2. Student student = this.elasticsearchRestTemplate.get(id, Student.class);
    3. System.out.println(student);
  • 查询单个文档信息,可组合条件

    1. Criteria criteria = Criteria.where("studentId").is("064418101");
    2. criteria.and(Criteria.where("gender").is(1));
    3. CriteriaQuery criteriaQuery = new CriteriaQuery(criteria);
    4. SearchHit<Student> hit =
    5. this.elasticsearchRestTemplate.searchOne(criteriaQuery, Student.class);
    6. System.out.println(hit != null ? hit.getContent() : null);

精确查询

  • 查询条件不会进行分词,但是查询内容可能会分词,导致查询不到信息
    1. NativeSearchQuery nativeSearchQuery =
    2. new NativeSearchQueryBuilder()
    3. .withQuery(QueryBuilders.termQuery("hobby", "吃饭啦"))
    4. .withQuery(QueryBuilders.termQuery("hobby", "睡觉"))
    5. .build();
    6. SearchHits<Student> hits =
    7. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
    8. hits.get().forEach(hit -> System.out.println(hit.getContent()));

匹配查询

  • 查询条件会进行分词

    1. NativeSearchQuery nativeSearchQuery =
    2. new NativeSearchQueryBuilder()
    3. .withQuery(QueryBuilders.matchQuery("hobby", "吃饭啦"))
    4. .withQuery(QueryBuilders.matchQuery("hobby", "睡觉啦"))
    5. .build();
    6. SearchHits<Student> hits =
    7. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
    8. hits.get().forEach(hit -> System.out.println(hit.getContent()));
  • 内容在多字段中进行查询

    1. NativeSearchQuery nativeSearchQuery =
    2. new NativeSearchQueryBuilder()
    3. .withQuery(QueryBuilders.multiMatchQuery("hobby", "吃饭啦", "打豆豆"))
    4. .build();
    5. SearchHits<Student> hits =
    6. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
    7. hits.get().forEach(hit -> System.out.println(hit.getContent()));

模糊查询

  1. NativeSearchQuery nativeSearchQuery =
  2. new NativeSearchQueryBuilder()
  3. .withQuery(QueryBuilders.fuzzyQuery("studentName", "ble"))
  4. .build();
  5. SearchHits<Student> hits =
  6. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
  7. hits.get().forEach(hit -> System.out.println(hit.getContent()));

通配符查询

  • *表示0个或多个字符
  • ?表示单个字符
    1. NativeSearchQuery nativeSearchQuery =
    2. new NativeSearchQueryBuilder()
    3. .withQuery(QueryBuilders.wildcardQuery("studentName", "ble*"))
    4. .build();
    5. SearchHits<Student> hits =
    6. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
    7. hits.get().forEach(hit -> System.out.println(hit.getContent()));

短语匹配

  • 查询条件不会进行分词。例如输入字段值为 没什么 ,不会出现 没xxx什xxx么 这样内容的文档
    1. NativeSearchQuery nativeSearchQuery =
    2. new NativeSearchQueryBuilder()
    3. .withQuery(QueryBuilders.matchPhraseQuery("desc", "没什么"))
    4. .build();
    5. SearchHits<Student> hits =
    6. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
    7. hits.get().forEach(hit -> System.out.println(hit.getContent().getStudentId()));

分页查询

  1. NativeSearchQuery nativeSearchQuery =
  2. new NativeSearchQueryBuilder()
  3. .withQuery(QueryBuilders.matchAllQuery())
  4. .withPageable(PageRequest.of(1, 2, Sort.by("birthday").ascending()))
  5. .build();
  6. SearchHits<Student> hits =
  7. this.elasticsearchRestTemplate.search(nativeSearchQuery, Student.class);
  8. hits.get().forEach(hit -> System.out.println(hit.getContent()));

聚合操作

  • 聚合分桶

    1. // 根据 gender 字段分组
    2. TermsAggregationBuilder groupByGender =
    3. AggregationBuilders.terms("groupBy_gender").field("gender");
    4. SumAggregationBuilder scoreSum =
    5. AggregationBuilders.sum("totalScore").field("studentScore");
    6. // 分组后求每组中 studentScore 的和
    7. NativeSearchQuery nativeSearchQuery =
    8. new NativeSearchQueryBuilder()
    9. .withQuery(QueryBuilders.matchAllQuery())
    10. .addAggregation(groupByGender.subAggregation(scoreSum))
    11. .build();
    12. Aggregations aggregations =
    13. this.elasticsearchRestTemplate
    14. .search(nativeSearchQuery, Student.class)
    15. .getAggregations();
    16. if (aggregations != null) {
    17. Terms terms = aggregations.get("groupBy_gender");
    18. for (Terms.Bucket bucket : terms.getBuckets()) {
    19. Sum totalScore = (Sum) bucket.getAggregations().asMap().get("totalScore");
    20. // 分组key
    21. Number groupKey = bucket.getKeyAsNumber();
    22. String gender = groupKey.intValue() == 1 ? "男" : "女";
    23. System.out.println(gender);
    24. // 聚合值
    25. System.out.println(totalScore.getValue());
    26. // 每组文档数量
    27. System.out.println(bucket.getDocCount());
    28. }
    29. }
  • 按数值范围聚合

    1. RangeAggregationBuilder scoreRange =
    2. AggregationBuilders.range("scoreRangeCount")
    3. .field("studentScore")
    4. .addUnboundedTo("不及格", 60)
    5. .addRange("良好", 60, 80)
    6. .addUnboundedFrom("优秀", 80);
    7. NativeSearchQuery nativeSearchQuery =
    8. new NativeSearchQueryBuilder()
    9. .withQuery(QueryBuilders.matchAllQuery())
    10. .addAggregation(scoreRange)
    11. .build();
    12. Aggregations aggregations =
    13. this.elasticsearchRestTemplate
    14. .search(nativeSearchQuery, Student.class)
    15. .getAggregations();
    16. if (aggregations != null) {
    17. Range range = aggregations.get("scoreRangeCount");
    18. for (Range.Bucket bucket : range.getBuckets()) {
    19. System.out.println(bucket.getKey());
    20. System.out.println(bucket.getDocCount());
    21. }
    22. }
  • 按日期范围聚合

    1. DateRangeAggregationBuilder birthdayRange =
    2. AggregationBuilders.dateRange("birthdayRangeCount")
    3. .field("birthday")
    4. .format("yyyy")
    5. .addRange("2000-2020", 2000, 2020)
    6. .addRange("2020-2040", 2020, 2040);
    7. NativeSearchQuery nativeSearchQuery =
    8. new NativeSearchQueryBuilder()
    9. .withQuery(QueryBuilders.matchAllQuery())
    10. .addAggregation(birthdayRange)
    11. .build();
    12. Aggregations aggregations =
    13. this.elasticsearchRestTemplate
    14. .search(nativeSearchQuery, Student.class)
    15. .getAggregations();
    16. if (aggregations != null) {
    17. Range range = aggregations.get("birthdayRangeCount");
    18. for (Range.Bucket bucket : range.getBuckets()) {
    19. System.out.println(bucket.getKey());
    20. System.out.println(bucket.getDocCount());
    21. }
    22. }
  • 根据间隔聚合

    1. HistogramAggregationBuilder scoreInterval =
    2. // 这是日期的 dateHistogram
    3. AggregationBuilders.histogram("scoreIntervalCount")
    4. .field("studentScore")
    5. .extendedBounds(60, 100)
    6. .interval(20);
    7. NativeSearchQuery nativeSearchQuery =
    8. new NativeSearchQueryBuilder()
    9. .withQuery(QueryBuilders.matchAllQuery())
    10. .addAggregation(scoreInterval)
    11. .build();
    12. Aggregations aggregations =
    13. this.elasticsearchRestTemplate
    14. .search(nativeSearchQuery, Student.class)
    15. .getAggregations();
    16. if (aggregations != null) {
    17. // DateHistogramInterval
    18. Histogram range = aggregations.get("scoreIntervalCount");
    19. for (Histogram.Bucket bucket : range.getBuckets()) {
    20. System.out.println(bucket.getKey());
    21. System.out.println(bucket.getDocCount());
    22. }
    23. }

Sql方式

这种方式玩玩就好🤐。以下是个简单的例子,可根据需要自行封装:

  • es目前版本已经支持sql查询方式,方式如下: ```http POST /_sql?format=json { “query”: “select gender,sum(studentScore) totalScore from student where studentName like ‘ble%’ group by gender order by totalScore desc limit 2” }

POST /_sql?format=text { “query”: “select gender,sum(studentScore) totalScore from student where studentName like ‘ble%’ group by gender order by totalScore desc limit 2” }

POST /_sql?format=csv { “query”: “select gender,sum(studentScore) totalScore from student where studentName like ‘ble%’ group by gender order by totalScore desc limit 2” }

  1. - Java客户端中,无非就是个Http请求,这里简单封装一下
  2. ```java
  3. public class QueryParam {
  4. private String query;
  5. // setter and getter
  6. }
  1. public String query(String requestUrl, QueryParam queryParam) throws Exception {
  2. String result = "";
  3. URL url = new URL(requestUrl);
  4. HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  5. conn.setRequestMethod("POST");
  6. conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
  7. conn.setDoOutput(true);
  8. byte[] queryParamBytes = JSON.toJSONBytes(queryParam);
  9. conn.setRequestProperty("Content-Length", String.valueOf(queryParamBytes.length));
  10. // 写入数据流
  11. OutputStream outputStream = conn.getOutputStream();
  12. outputStream.write(queryParamBytes);
  13. if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
  14. InputStream inputStream = conn.getInputStream();
  15. byte[] adaptBytes = new byte[1024];
  16. int index = inputStream.read(adaptBytes);
  17. StringBuilder content = new StringBuilder();
  18. while (index != -1) {
  19. content.append(new String(adaptBytes, 0, index, StandardCharsets.UTF_8));
  20. index = inputStream.read(adaptBytes);
  21. }
  22. result = content.toString();
  23. }
  24. return result;
  25. }
  • 调用 query 方法
    1. String requestUrl = "http://192.168.73.141:9200/_sql?format=json";
    2. String sql =
    3. "select gender,sum(studentScore) totalScore from student "
    4. + "where studentName like 'ble%' "
    5. + "group by gender "
    6. + "order by totalScore desc "
    7. + "limit 2";
    8. QueryParam queryParam = new QueryParam();
    9. queryParam.setQuery(sql);
    10. String query = query(requestUrl, queryParam);
    11. System.out.println(query);
    image.png