https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs.html

新增(覆盖)记录到索引

语法:

  1. PUT /<target>/_doc/<_id>
  2. {
  3. ...<body>...
  4. }
  • <target>,可以是index也可以是data stream
  • <_id>,是我们自己定义的一个ID
  • <body>,要新增的记录,JSON格式

支持的查询的参数:

  • if_seq_no,并发控制用。类似CAS,给出现在的文档_seq_no值(查询出来的)。
  • if_primary_term,并发控制用。类似CAS,给出现在文档所在分片的_primary_term值(查询出来的)。
  • op_type,主要用来判断某个文档/索引是否存在,有两个固定值可以选择:
    • index,判断索引。目前不知道这个干嘛用的,裂开。
    • create,判断文档记录。如果已经存在了会返回 409 错误码并报version conflict, document already exists
  • version_type,版本类型。有三种:
    • internalES内部维护的版本号
    • external,开发人员自定义的版本号,只要大于原来的版本号就能生效
    • external_gte,这个不太清楚
  • wait_for_active_shards,要求此次操作执行前,判断活跃的副本分片数量必须满足这个值才能进行,否则等待。
  • timeout,指定等待时间

请求:

  1. PUT /product/_doc/1
  2. {
  3. "name" : "xiaomi phone",
  4. "desc" : "shouji zhong de zhandouji",
  5. "price" : 3999,
  6. "tags": [ "xingjiabi", "fashao", "buka" ]
  7. }

响应:

  1. {
  2. "_index" : "product",
  3. "_type" : "_doc",
  4. "_id" : "1",
  5. "_version" : 1,
  6. "result" : "created",
  7. "_shards" : {
  8. "total" : 2,
  9. "successful" : 2,
  10. "failed" : 0
  11. },
  12. "_seq_no" : 0,
  13. "_primary_term" : 1
  14. }
  • _index,被新增数据的索引名称
  • _type,文档类型,目前支持单一文档,_doc
  • _id,新增的这条记录的唯一标识符
  • _version,整个文档的版本号。跟着_id走的,哪怕发生了覆盖。每次发生更新时,该值都会发生递增。(7.x后基本不采用这个方式来控制并发)
  • _seq_no,此次操作的编号。与_primary_term共同定位此次更新发生的位置与顺序。
  • _primary_term此次操作发生在的主分片序号上。这玩意会发生变化的,难道这个文档所在的主分片也会变更序号吗?其实这货记录了 当前文档所在分片发生更新的次数,保证新分片的操作不会被旧分片覆盖。这货和_seq_no一起保证,文档操作的最新,分片操作的最新。(大伙可以重启master节点,当发生选举后,你们可以看分片的_primary_term发生递增)
  • result,此次操作的结果,是发生了更新还是发生了插入

    在ES7.10里,通过internal version来控制并发旧保存了: Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use if_seq_no and if_primary_term instead;

其他的创建方法

  • POST /<target>/_doc/
  • PUT /<target>/_create/<_id>
  • POST /<target>/_create/<_id>

仅在不存在时创建

通过使用PUT /<target>/_create/<_id>请求,在文档不存在时创建记录

自动创建ID

当使用POST /<target>/_doc/请求,默认op_type=create,索引会为这个文档自动创建一个ID。

PUT /<target>/_doc/ 请求无法实现,会报405错误:Incorrect HTTP method for uri [/product/_doc/?pretty=true] and method [PUT], allowed: [POST]

控制版本并发

通过指定if_seq_noif_primary_term可以类似CAS方式操作文档。如果检测到不匹配,会报409: VersionConflictException

路由

一个索引被存储于多个分片中,那么一个记录存入时究竟存入哪个分片呢?绝大多数情况,这是由我们的输入值来确定的:<TargetShardId> = hashFunction(<input>)。一般情况下,我们都是用id作为 <input>。所以如果指定了id属性,那么我们就无法确保存入哪个分片;如果想确定存入哪个分片,咱们就不能指定id。所以,若想固定某些文档存入某个分片中,需要在?routing=<input>查询参数中给一个固定值,ES拿这个固定值去获取<TargetShardId>
<TargetShardId> = hashFunction(<我们给定的固定值>)
咱们这里通过 查看分片大小是否变动 来判断 记录是否插入某一个分片中

步骤1

  1. GET _cat/shards/product?v
  1. index shard prirep state docs store ip node
  2. product 1 r STARTED 0 208b 172.18.0.4 es01
  3. product 1 r STARTED 0 208b 172.18.0.3 es03
  4. product 1 p STARTED 0 208b 172.18.0.2 es02
  5. product 2 r STARTED 0 208b 172.18.0.4 es01
  6. product 2 p STARTED 0 208b 172.18.0.3 es03
  7. product 2 r STARTED 0 208b 172.18.0.2 es02
  8. product 0 p STARTED 0 208b 172.18.0.4 es01
  9. product 0 r STARTED 0 208b 172.18.0.3 es03
  10. product 0 r STARTED 0 208b 172.18.0.2 es02

此时,我们创建一条新记录:

  1. POST /product/_doc/111?routing=CoDeleven
  2. {
  3. "name": "xiaomi phone",
  4. "desc": "shouji zhong de zhandouji",
  5. "price": 13999,
  6. "tags": [
  7. "xingjiabi",
  8. "fashao",
  9. "buka"
  10. ]
  11. }

然后查看分片大小变化。如果发现新增了,但是数据没发生变化的,请执行Flush,可能还没有落盘。笔者是通过Elasticsearch-Header去直接刷新的:

  1. GET _cat/shards/product?v
  1. index shard prirep state docs store ip node
  2. product 1 r STARTED 1 5.9kb 172.18.0.4 es01
  3. product 1 r STARTED 1 5.9kb 172.18.0.3 es03
  4. product 1 p STARTED 1 5.9kb 172.18.0.2 es02
  5. product 2 r STARTED 0 208b 172.18.0.4 es01
  6. product 2 p STARTED 0 208b 172.18.0.3 es03
  7. product 2 r STARTED 0 208b 172.18.0.2 es02
  8. product 0 p STARTED 0 208b 172.18.0.4 es01
  9. product 0 r STARTED 0 208b 172.18.0.3 es03
  10. product 0 r STARTED 0 208b 172.18.0.2 es02

步骤2

此时我们再通过routing=CoDeleven创建一条记录,看看新记录会不会也放在分片1上:

  1. POST /product/_doc/222?routing=CoDeleven
  2. {
  3. "name": "xiaomi nfc phone",
  4. "desc": "zhichi quangongneng nfc,shouji zhong de jianjiji",
  5. "price": 4999,
  6. "tags": [
  7. "xingjiabi",
  8. "fashao",
  9. "gongjiaoka"
  10. ]
  11. }

然后查看分片大小情况,如果没有变化的,请使用POST /<index_name>/_flush将数据落盘:

  1. index shard prirep state docs store ip node
  2. product 1 r STARTED 2 11.8kb 172.18.0.4 es01
  3. product 1 r STARTED 2 11.8kb 172.18.0.3 es03
  4. product 1 p STARTED 2 11.8kb 172.18.0.2 es02
  5. product 2 r STARTED 0 208b 172.18.0.4 es01
  6. product 2 p STARTED 0 208b 172.18.0.3 es03
  7. product 2 r STARTED 0 208b 172.18.0.2 es02
  8. product 0 p STARTED 0 208b 172.18.0.4 es01
  9. product 0 r STARTED 0 208b 172.18.0.3 es03
  10. product 0 r STARTED 0 208b 172.18.0.2 es02

咱们看到文档222也被放到了分片1中。所以通过routing确实能将文档放入到固定的某个分片中去。

路由拓展思考

笔者发现有时候不加?routing=...会发现查询不到参数,这是为什么呢?我在官网上看到这样一句话,给我了一些想法:

The default value used for _routing is the document’s _id.

也就是说,如果我们不加_routing这个查询参数时,所有的增删改查都是根据Document ID来定位分片的。加了_routing,那么所有的增删改查都是根据_routing来定位分片的;咱们可以试一下,这里的111Document ID

  1. GET /product/_search_shards?routing=111
  1. {
  2. ...
  3. "shards" : [
  4. [
  5. {
  6. "state" : "STARTED",
  7. "primary" : false,
  8. "node" : "9Mlx22sfQ1OsM_e5Wbnr2Q",
  9. "relocating_node" : null,
  10. "shard" : 1,
  11. "index" : "product",
  12. "allocation_id" : {
  13. "id" : "1Zrtv9mvRvi9mdA35qXEag"
  14. }
  15. },
  16. {
  17. "state" : "STARTED",
  18. "primary" : true,
  19. "node" : "6nRL1BqLS-CqTbe8afqJWA",
  20. "relocating_node" : null,
  21. "shard" : 1,
  22. "index" : "product",
  23. "allocation_id" : {
  24. "id" : "oXonANDpRiOJ2MTj9GfJzg"
  25. }
  26. },
  27. {
  28. "state" : "STARTED",
  29. "primary" : false,
  30. "node" : "L_nIj1u_RISetoN0BR80jw",
  31. "relocating_node" : null,
  32. "shard" : 1,
  33. "index" : "product",
  34. "allocation_id" : {
  35. "id" : "NLTfZ45DQrmysZ5KixEkYw"
  36. }
  37. }
  38. ]
  39. ]
  40. }

此时hashFunction(DocumentId=111) == hashFunction("CoDeleven"),都在分片1中。所以GET /product/_doc/111可以正常查询到数据;
倘若此时用DocumentId=222来查询:

  1. GET /product/_search_shards?routing=222
  1. {
  2. ...
  3. "shards" : [
  4. [
  5. {
  6. "state" : "STARTED",
  7. "primary" : false,
  8. "node" : "6nRL1BqLS-CqTbe8afqJWA",
  9. "relocating_node" : null,
  10. "shard" : 2,
  11. "index" : "product",
  12. "allocation_id" : {
  13. "id" : "PfnnpTMdRbOK34RTij5pNw"
  14. }
  15. },
  16. {
  17. "state" : "STARTED",
  18. "primary" : false,
  19. "node" : "L_nIj1u_RISetoN0BR80jw",
  20. "relocating_node" : null,
  21. "shard" : 2,
  22. "index" : "product",
  23. "allocation_id" : {
  24. "id" : "_7X474xiRIm3eDalRL_tXg"
  25. }
  26. },
  27. {
  28. "state" : "STARTED",
  29. "primary" : true,
  30. "node" : "9Mlx22sfQ1OsM_e5Wbnr2Q",
  31. "relocating_node" : null,
  32. "shard" : 2,
  33. "index" : "product",
  34. "allocation_id" : {
  35. "id" : "mNpL3TZgQoKZE3Z_AmpJEA"
  36. }
  37. }
  38. ]
  39. ]
  40. }

发现如果仅有DocumentId=222理论上是会被放到 分片2 中去的。我们进一步来论证我们的想法:

  1. GET /product/_doc/222
  1. {
  2. "_index" : "product",
  3. "_type" : "_doc",
  4. "_id" : "222",
  5. "found" : false
  6. }

确实。如果仅通过DocumentId=222来查询,是查询不到的,因为默认是通过DocumentId=222来计算分片文职的,而之前DocumentId=222的记录在routing查询参数的“加持”下被放到了 分片1 里去,所以此时在分片2中查询不到记录。如果加上routing=CoDeleven,那么就又能查到了。所以可以给出一个结论,如果增删改查时使用了routing参数,那么所有操作发生的位置都是由routing决定的;如果没有使用,则由ID决定。

分布式

一个操作请求是先分发到主分片上进行操作,更新完毕后再分发给下面的副分片从而实现数据统一性。

活跃分片数要求

刷新

https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs-refresh.html

无变更更新

使用API更新文档时,即使文档内容没发生变化,也会递增版本号。使用 _update API 并 带上 detect_noop参数。没有硬性规定什么时候要拒绝这样 文档内容没有变化 的更新,有多种因素与之相关:

  1. 比如更新的频率
  2. 接收更新请求的节点上 每秒要执行多少次查询

超时设置

如果没有发生什么错误或者异常,默认是1分钟。这有个例子设置timeout参数:

  1. PUT my-index-000001/_doc/1?timeout=5m
  2. {
  3. "@timestamp": "2099-11-15T13:12:00",
  4. "message": "GET /search HTTP/1.1 200 1070000",
  5. "user": {
  6. "id": "kimchy"
  7. }
  8. }

版本号

版本号分为两个类型:

  1. 内部版本号:使用内部版本控制,从 1 开始并随着每次更新而递增,包括删除
  2. 外部版本号:设置version_type=external 并且 version的值要大于等于 文档存储的版本号记录

    A nice side effect is that there is no need to maintain strict ordering of async indexing operations executed as a result of changes to a source database, as long as version numbers from the source database are used. Even the simple case of updating the Elasticsearch index using data from a database is simplified if external versioning is used, as only the latest version will be used if the index operations arrive out of order for whatever reason. 没看懂,备注一下

这里的version_type有三个选项:

  1. internal,内部的版本号,默认就是这个。
  2. external,外部的版本号,要求版本号得大于当前文档里记录得版本号。因为这个要求大于,所以版本号会变动。
  3. external_gte,也是外部得版本号,外部版本号大于等于当前文档里记录得版本号就能更新。注意!在版本号相同的情况下,使用此方法,不会改变当前版本号!!。(已测试,ES7.10版本)

    使用外部的版本号后,文档的版本后由请求的那个版本号决定。所以如果请求的版本号与文档的版本号相同,那么就不会发生改变!

查询记录

语法:

  1. GET <index>/_doc/<_id>
  2. HEAD <index>/_doc/<_id> // 查询数据+文档属性
  3. GET <index>/_source/<_id> // 仅查询数据
  4. HEAD <index>/_source/<_id>

查询参数:

  • preference,指示当前查询在哪个分片上执行,有两个值:
    • [local](https://stackoverflow.com/questions/44109931/using-preference-local-in-elasticsearch),直接在当前查询的节点上查询,如果查询到了直接返回,查询不到就再分发下去给副节点处理。
    • 自定义值,通过一个固定的自定义值,固定查询某一个节点。
  • refresh,如果设置为true,会在查询之前把数据落盘;设置这个选项时需要考虑会降低索引速度的情况。
  • realtime,默认为true,在查询一个已经更新了但是还没refresh的记录时,会去内存/更新事务中获取最新的 stored_fields ;反之则从最近一次落盘的数据里获取记录。
  • stored_fields,大伙直接看文档
  • _source_excludes,查询源数据时,排除一些字段
  • _source_includes,查询源数据时,仅包含这些字段。
  • _source,布尔值,为true查询数据记录;为false,不显示数据记录。

参考文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs-get.html

关于 realtimerefresh 的区别: 前者可能会导致数据不一致性,比如数据还未落盘,我先按照实时数据查询出来并使用了;如果此时 ES 宕机了,那么就会导致这个数据消失。而 refresh 会保证在查询前将数据落盘,所有查到的数据都是可靠的。

删除记录

参考文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs-delete.html

更新记录

参考文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs-update.html

无论是 partial update 还是覆盖形式的 put ,它们都需要先查询出原来的文档,然后修改,ES里删除原文档,最后提交新修改的文档。

不过一种 partial update 是在ES内部执行的,覆盖形式的 put 是由Java应用来执行的。

parital update: 所有的查询、修改、回写操作都发生在shard内部,避免了所有的网络请求。 减少了查询和修改的事件间隔,减少了并发冲突的情况

批量查询

https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docs-multi-get.html

Bulk