一个最简单的示例就是在索引中的每个文档上执行更新而不改变 source。这对于选择新属性,或者在线映射修改是很有用的:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?conflicts=proceed&pretty"

返回示例:

  1. {
  2. "took" : 147,
  3. "timed_out": false,
  4. "updated": 120,
  5. "deleted": 0,
  6. "batches": 1,
  7. "version_conflicts": 0,
  8. "noops": 0,
  9. "retries": {
  10. "bulk": 0,
  11. "search": 0
  12. },
  13. "throttled_millis": 0,
  14. "requests_per_second": -1.0,
  15. "throttled_until_millis": 0,
  16. "total": 120,
  17. "failures" : [ ]
  18. }

_update_by_query 在它执行时获取索引的快照,并且索引通过内部版本找到的数据。在快照生成与请求执行期间,如果有文档更新,那么将会得到一个版本冲突。当更新的文档版本匹配时,版本数就会增加。

因为内部版本不支持使用 0 作为有效版本值,因此无法使用 _update_by_query更新版本为 0 的文档,请求将会失败。

所有的更新和查询失败会导致_update_by_query执行终止,并返回失败的响应。已经执行的更新依然保持原样。换句话来说就是执行不会回滚,只会终止。当第一个失败导致终止,失败的批量请求返回的所有错误,都将在 failure 元素中返回;因此,可能会有更多条失败记录。

如果仅仅想统计版本冲突,而不终止整个请求,你可以在 url 中使用conflicts=proceed或者在 请求体中使用 “conflicts”: “proceed”。第一个例子中这样做,是因为它仅仅试图获取最新映射的更改,版本冲突仅仅意味着在执行 _update_by_query 开始到该冲突文档试图更新期间,有其他进程更新了该文档。这样很好,那次更新也获取到了最新映射的更新。

回到 API 格式,这将更新 twitter 索引中的 tweets:

  1. curl -X POST "localhost:9200/twitter/_doc/_update_by_query?conflicts=proceed&pretty"

也可以限制 _update_by_query 使用 查询 DSL。以下操作将会更新 tweeter 索引中的所有用户为 kimchy 的文档:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?conflicts=proceed&pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "query": {
  4. "term": {
  5. "user": "kimchy"
  6. }
  7. }
  8. }
  9. '

查询必须作为 query键的值,像 search api 的方式一样。

目前为止,我们仅仅更新了文档,但是并没有改变数据源。这种对获取属性这样的操作是真正有用的,但这只是乐趣的一半。_update_by_query支持已脚本方式更新文档。以下操作将会更新 kimchy 的所有 tweet,并为 like 字段加一:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "script": {
  4. "source": "ctx._source.likes++",
  5. "lang": "painless"
  6. },
  7. "query": {
  8. "term": {
  9. "user": "kimchy"
  10. }
  11. }
  12. }
  13. '

就像在 update api 中一样,可以设置 ctx.op来更改执行的操作。

noop

如果脚本决定什么都不做,可以设置 ctx.op 的值为 noop。这会导致 update_by_query 忽略文档的更新。在响应体的 noop 计数器中会报告忽略数。

delete

设置 ctx.op=delete 可以使脚本删除文档。在响应体的 deleted 计数器中会报告删除数。

设置 ctx.op 为任何其他值都是错误。设置 ctx 的其他字段也是错误的。

注意,这里没有指定 confilcts=processd。这个例子中,我们希望版本冲突终止进程,以便于处理错误。

该 API 不允许移动命中的文档,只能更改他们的源属性。这是故意的!对于将文档从原始位置删除,我们没有做任何规定。

就像 search API 一样,也可以一次对多个索引,多个类型执行全部操作:

  1. curl -X POST "localhost:9200/twitter,blog/_doc,post/_update_by_query?pretty"

routing

如果你提供了 routing,那么 routing 的值会拷贝到 scroll 查询上,限制该操作到与路由值匹配的分片上:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?routing=1&pretty"

scroll_size

默认情况下,_update_by_query 使用滚动方式的批次是 1000。可以通过 scroll_size URL 参数改变批次的大小:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?scroll_size=100&pretty"

_update_by_query 可以通过指定 pipeline 使用 Ingest Node 的特性:

  1. curl -X PUT "localhost:9200/_ingest/pipeline/set-foo?pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "description" : "sets foo",
  4. "processors" : [ {
  5. "set" : {
  6. "field": "foo",
  7. "value": "bar"
  8. }
  9. } ]
  10. }
  11. '
  12. curl -X POST "localhost:9200/twitter/_update_by_query?pipeline=set-foo&pretty"

在 ingest 节点增加一个处理器(更新 foo 的值为 bar)
执行更新,并指定处理器的 id

URL 参数

除了标准参数,像 pretty这样的,该 API 也支持refresh,wait_for_completion,wait_for_active_shards,timeout 以及 scroll

发送 refresh 参数,会在请求完成后,更新受影响的索引中所有的分片。 index API 中的 refresh 跟这个不太一样,它只会索引有新数据的分片。

如果请求包含wait_for_completion=false,Elasticsearch 会执行一些预处理检查,启动请求,然后返回一个任务task ,该任务可以使用 Tasks APIs 取消,或者获取任务状态。Elasticsearch 也会在 .tasks/task/${taskId} 下创建一条记录作为文档。保留还是删除,一切都看你,合适就行。当完成时,删除它,elasticsearch 就可以回收它使用的空间。

在处理请求前,wait_for_active_shards 控制着一个分片的复制集需要有多个保持活跃。timeout 控制着每个写请求等待不可用分片变成可用分片的时间。这两者的工作方式与在 Bulk API 中的工作方式是一样的。因为 _update_by_query 使用滚动查询,所以也可以指明一些 scroll 参数来控制保持“搜索上下文”存活的时间(如使用 scroll=10m,默认情况下是 5min)。

requests_per_second 可以设置为任意的正数数字(1.4,6,1000 等等),节流率是 requests_per_second 在索引操作批处理之间填充一个等待时间。可以通过设置 requests_per_second-1 来禁用节流。

节流是通过批之间等待来完成的,这样_update_by_query 内部使用的 scroll 可以有一个考虑到填充时间的超时。填充时间是批处理大小除以requests_per_second 后与花费的写入时间之间的差值。默认批处理大小是 1000,如果 requests_per_second 设置为 500 那么:

  1. target_time = 1000 / 500 per second = 2 seconds
  2. wait_time = target_time - delete_time = 2 seconds - .5 seconds = 1.5 seconds

因为每一批操作都是单个 _bulk 请求,因此较大的请求会造成 elasticsearch 创建大量请求,然后等待一段时间,然后再开始下一组请求。这个阵发性的,而不是平滑的。默认值为 -1。

响应体

  1. {
  2. "took" : 147,
  3. "timed_out": false,
  4. "total": 5,
  5. "updated": 5,
  6. "deleted": 0,
  7. "batches": 1,
  8. "version_conflicts": 0,
  9. "noops": 0,
  10. "retries": {
  11. "bulk": 0,
  12. "search": 0
  13. },
  14. "throttled_millis": 0,
  15. "requests_per_second": -1.0,
  16. "throttled_until_millis": 0,
  17. "failures" : [ ]
  18. }

took
整个操作从开始到结束的毫秒数;
time_out
该值设置为 true 表明任何请求在更新期间执行超时;
total
成功执行的文档数;
updated
成功更新的文档数;
deleted
成功删除的文档数;
batches
更新操作拉回来的 scroll 响应次数;
version_conflicts
更新操作遇到的版本冲突次数;
noops
被忽略的文档数,因为脚本中使用了 ctx.op 而返回 noop 值;
retries
更新操作的重试次数。bulk是批量操作重试次数,search是查询操作的重试次数;
throttled_millis
请求符合requests_per_second设置的睡眠毫秒数;
requests_per_second
在操作期间每秒有效执行次数;
throttled_until_millis
在按查询删除操作中,该字段因始终为零。仅在使用 Task API 中有意义,在满足 requests_per_second时,它指示节流请求下次执行的时间(毫秒时间戳);
failures
记录在执行过程中不可恢复的错误数组。如果这个值非空,那么请求会因为这些错误中止。Update-by-query 通过批处理实施,任何错误都会导致整个进程中止,但是当前批处理的所有错误都会记录到数组中。可以使用 conflicts 来阻止重建索引时因为版本冲突而中止。

与 Task API 配合使用

可以通过 Task API 获取所有正在执行的 update-by-query 请求的状态:

  1. curl -X GET "localhost:9200/_tasks?detailed=true&actions=*byquery&pretty"

相应结果示例

  1. {
  2. "nodes" : {
  3. "r1A2WoRbTwKZ516z6NEs5A" : {
  4. "name" : "r1A2WoR",
  5. "transport_address" : "127.0.0.1:9300",
  6. "host" : "127.0.0.1",
  7. "ip" : "127.0.0.1:9300",
  8. "attributes" : {
  9. "testattr" : "test",
  10. "portsfile" : "true"
  11. },
  12. "tasks" : {
  13. "r1A2WoRbTwKZ516z6NEs5A:36619" : {
  14. "node" : "r1A2WoRbTwKZ516z6NEs5A",
  15. "id" : 36619,
  16. "type" : "transport",
  17. "action" : "indices:data/write/update/byquery",
  18. "status" : {
  19. "total" : 6154,
  20. "updated" : 3500,
  21. "created" : 0,
  22. "deleted" : 0,
  23. "batches" : 4,
  24. "version_conflicts" : 0,
  25. "noops" : 0,
  26. "retries": {
  27. "bulk": 0,
  28. "search": 0
  29. },
  30. "throttled_millis": 0
  31. },
  32. "description" : ""
  33. }
  34. }
  35. }
  36. }
  37. }

该对象包含实际状态。类似于带重要附加 total 字段的响应 json。total 指的是重新索引期望执行的操作总数。你可以通过额外的 createdupdateddeleted 来估算进度。当总执行次数等于 total 的值时请求完成。

使用任务 Id,你可以直接查看任务。以下的例子获取了任务 ID(r1A2WoRbTwKZ516z6NEs5A:36619) 的信息:

  1. curl -X GET "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619?pretty"

该 API 的优势在于,它集成了 wait_for_completion=false,以透明的方式返回了完成任务的状态。如果任务已经完成,并且在其上设置了wait_for_completion=false 那么将会返回 result 或者 error 字段。这个特性的代价是 wait_for_completion=false.tasks/task/${taskId} 创建的文档。是否删除该文档取决于你。

与 Cancel Task API 一起使用
任何 update-by-query 操作都可以使用 Task Cancel API 取消:

  1. curl -X POST "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel?pretty"

任务 ID 可以通过使用 Task API 找到。

取消应该很快发生,但可能需要几秒。上面的任务状态 API 会继续列任务,直到被唤醒,并取消自身。

重新调节(rethrottling)

一个正在执行的 update-by-query 操作中,requests_per_second 的值也可以修改,需要用到 _rethrottle API:

  1. curl -X POST "localhost:9200/_update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1&pretty"

任务 ID 可以通过使用 Task API 找到。

就像在设置 _update_by_query API 中的一样,requests_per_second 也可以设置为 -1 来禁用限流,或者任何正数,像 1.7,12 这样,就限流到那个层级了。重新调整已加快速度会立即生效,但是调整为减慢速度,那么查询会在完成当前批次后才生效。这样可以防止 scroll 超时。

切片(Slicing)

Update-by-query 支持 sliced scroll 来并行化更新过程。该并行化可以提高效率,并且提供了一种便利的方式将请求分解为较小部分。

手动分片

在每次请求中提供一个分片 id,分片总数来实现手动分片 Update-by-query:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "slice": {
  4. "id": 0,
  5. "max": 2
  6. },
  7. "script": {
  8. "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  9. }
  10. }
  11. '
  12. curl -X POST "localhost:9200/twitter/_update_by_query?pretty" -H 'Content-Type: application/json' -d'
  13. {
  14. "slice": {
  15. "id": 1,
  16. "max": 2
  17. },
  18. "script": {
  19. "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  20. }
  21. }
  22. '

可以通过以下方式验证:

  1. curl -X GET "localhost:9200/_refresh?pretty"
  2. curl -X POST "localhost:9200/twitter/_search?size=0&q=extra:test&filter_path=hits.total&pretty"

结果中合乎情理的 total 像这样:

  1. {
  2. "hits": {
  3. "total": 120
  4. }
  5. }

自动分片

使用 Sliced scroll 分片 _uid 可以让 update-by-query 自动并行化 。使用 sclices 指明分片的总数:

  1. curl -X POST "localhost:9200/twitter/_update_by_query?refresh&slices=5&pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "script": {
  4. "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  5. }
  6. }
  7. '

验证:

  1. curl -X POST "localhost:9200/twitter/_search?size=0&q=extra:test&filter_path=hits.total&pretty"

结果:

  1. {
  2. "hits": {
  3. "total": 120
  4. }
  5. }

设置 slice 为 auto,Elasticsearch 会自动选择使用分片数。该设置每个 shard 使用一个 slice,达到某个极限。如果这里有多个源索引,就会选择拥有最下分片数的索引作为 slice 的数量。

update_by_query 上添加 slices 只会自动执行上一节中的手动过程,创建子请求,这意味着会有一些特殊点(怪癖):

  • 你可以在 Tasks API 中看到这些请求。这些子请求是带 slice 请求的任务的 “chilld”任务。
  • 获取带 slices 请求的任务状态时,只会获取到已经完成的分片状态。
  • 这些子请求可以分别用于取消和重新调节等操作。
  • 使用了 slices 参数的重新节流请求,会按比例调整重新调整未完成的子请求。
  • 使用了 slices 参数的取消请求会取消每一个子请求。
  • 由于 slices 的性质,每一个子查询获取的文档都不会完全均匀。所有的文件都会被处理,但是有些 slices 会比较大些。期望更大的切片具有更均匀的分布。
  • slices 请求中的参数,如requests_per_secondsize,也会按比例分配给每个子请求。结合以上分配不均匀的观点,你应该会得出,将 sizeslices 一起使用可能不会导致 _update_by_query 处理的准确文档数量。
  • 每个子请求获取到了一个略微不同的源索引的快照,尽管这些快照大约是同一时间产生的。

挑选切片数

如果自动分片,对于大部分索引,设置 slicesauto 会选择一个合理的分片。如果要手动切片,或者修改自动切片,请遵循以下准则。

当索引中分片的数值与 slices 数值相同时,查询的性能更高效。如果那个数值很大(比如 500),选择一个小点的数值,因为太大也会降低性能。设置 slices 的值高于索引的分片数,通常不会提高消息,反而增加开销。

更新性能随着可用资源的 slices 数线性扩展。

无论是查询性能还是更新性能主导运行时,都取决于重新索引的文档数和集群资源。

挑选新属性

假设创建了一个没有动态映射的索引,将其填充了数据,然后添加了映射,并从数据中获取更多的字段:

  1. curl -X PUT "localhost:9200/test?pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "mappings": {
  4. "_doc": {
  5. "dynamic": false,
  6. "properties": {
  7. "text": {"type": "text"}
  8. }
  9. }
  10. }
  11. }
  12. '
  13. curl -X POST "localhost:9200/test/_doc?refresh&pretty" -H 'Content-Type: application/json' -d'
  14. {
  15. "text": "words words",
  16. "flag": "bar"
  17. }
  18. '
  19. curl -X POST "localhost:9200/test/_doc?refresh&pretty" -H 'Content-Type: application/json' -d'
  20. {
  21. "text": "words words",
  22. "flag": "foo"
  23. }
  24. '
  25. curl -X PUT "localhost:9200/test/_mapping/_doc?pretty" -H 'Content-Type: application/json' -d'
  26. {
  27. "properties": {
  28. "text": {"type": "text"},
  29. "flag": {"type": "text", "analyzer": "keyword"}
  30. }
  31. }
  32. '
  1. 这意味着新字段不会被索引,仅会存在 _source 中;
    2.更新映射,新增加字段 flag。为了选择新字段,必须使用它重新索引所有文档。

查询数据不会找到任何文档:

  1. curl -X POST "localhost:9200/test/_search?filter_path=hits.total&pretty" -H 'Content-Type: application/json' -d'
  2. {
  3. "query": {
  4. "match": {
  5. "flag": "foo"
  6. }
  7. }
  8. }
  9. '
  10. {
  11. "hits" : {
  12. "total" : 0
  13. }
  14. }

但是可以通过发起一个 update_by_query 请求来获取新的映射:

  1. curl -X POST "localhost:9200/test/_update_by_query?refresh&conflicts=proceed&pretty"
  2. curl -X POST "localhost:9200/test/_search?filter_path=hits.total&pretty" -H 'Content-Type: application/json' -d'
  3. {
  4. "query": {
  5. "match": {
  6. "flag": "foo"
  7. }
  8. }
  9. }
  10. '
  11. {
  12. "hits" : {
  13. "total" : 1
  14. }
  15. }

将字段添加到多字段时,可以执行完全相同的操作。