Elasticsearch 5.0 后,引入的一种新的节点类型。默认配置下,每个节点都是 Ingest Node。

  • 具有预处理数据的能力,可拦截 Index 或 Bulk API 的请求。
    • 为某个字段设置默认值:重命名某个字段的字段名;对字段值进行 Split 操作
    • 支持设置 Painless 脚本,对数据进行更加复杂的加工。
  • 对数据进行转换,并重新返回给 Index 或 Bulk API。

    Pinpeline & Processor 概念

  • Pipeline:管道会对通过的数据(文档),按照顺序进行加工

  • Processor:Elasticsearch 很多内置的 Processor,也支持通过插件的方式,实现自己的 Processor。

image.png

Pinpeline 使用

  1. POST _ingest/pipeline/_simulate
  2. {
  3. "pipeline": {
  4. "description": "to split blog tags",
  5. "processors": [
  6. {
  7. "split": {
  8. "field": "tags",
  9. "separator": ","
  10. }
  11. },
  12. {
  13. "set":{
  14. "field": "views",
  15. "value": 0
  16. }
  17. }
  18. ]
  19. },
  20. "docs": [
  21. {
  22. "_index": "index",
  23. "_id": "id",
  24. "_source": {
  25. "title": "Introducing big data......",
  26. "tags": "hadoop,elasticsearch,spark",
  27. "content": "You konw, for big data"
  28. }
  29. },
  30. {
  31. "_index": "index",
  32. "_id": "idxx",
  33. "_source": {
  34. "title": "Introducing cloud computering",
  35. "tags": "openstack,k8s",
  36. "content": "You konw, for cloud"
  37. }
  38. }
  39. ]
  40. }
  1. PUT _ingest/pipeline/blog_pipeline
  2. {
  3. "description": "a blog pipeline",
  4. "processors": [
  5. {
  6. "split": {
  7. "field": "tags",
  8. "separator": ","
  9. }
  10. },
  11. {
  12. "set":{
  13. "field": "views",
  14. "value": 0
  15. }
  16. }
  17. ]
  18. }
  1. GET _ingest/pipeline/blog_pipeline
  2. POST _ingest/pipeline/blog_pipeline/_simulate
  3. {
  4. "docs": [
  5. {
  6. "_source": {
  7. "title": "Introducing cloud computering",
  8. "tags": "openstack,k8s",
  9. "content": "You konw, for cloud"
  10. }
  11. }
  12. ]
  13. }
  1. PUT tech_blogs/_doc/2?pipeline=blog_pipeline
  2. {
  3. "title": "Introducing cloud computering",
  4. "tags": "openstack,k8s",
  5. "content": "You konw, for cloud"
  6. }

内置的 Processors

Set Processor(设置处理器)

  1. {
  2. "set": {
  3. "field": "price_doc",
  4. "value": "价格:{{_source.price}}"
  5. }
  6. }

Append Processor(追加处理器)

  1. {
  2. "append": {
  3. "field": "field1",
  4. "value": ["item2", "item3", "item4"]
  5. }
  6. }

Convert Processor(转换处理器)

  1. {
  2. "convert": {
  3. "field" : "foo",
  4. "target_field": "foo",
  5. "type": "integer",
  6. "ignore_missing": false
  7. }
  8. }
  9. # field:将要转换其值的 field(字段)。
  10. # target_field:可选,指定转换结果的 field(字段),默认情况下更新到原来的 field 上。
  11. # type:转换现有值的 typeintegerfloatstringboolean,和 auto
  12. # ignore_missing:如果为 true 的情况下,并且该 field 不存在或者是 null,则该 processor(处理器)将静默退出而不修改文档。

Date Processor(日期处理器)

  1. {
  2. "date" : {
  3. "field" : "initial_date",
  4. "target_field" : "timestamp",
  5. "formats" : ["dd/MM/yyyy hh:mm:ss"],
  6. "timezone" : "Europe/Amsterdam"
  7. }
  8. }

Date Index Name Processor(日期索引名称处理器)

  1. {
  2. "date_index_name" : {
  3. "field" : "date1",
  4. "index_name_prefix" : "myindex-",
  5. "date_rounding" : "M"
  6. }
  7. }

Grok Processor(Grok 处理器)

  1. {
  2. "grok": {
  3. "field": "message",
  4. "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]
  5. }
  6. }
  7. # "message": "55.3.244.1 GET /index.html 15824 0.043"

Gsub Processor(Gsub 处理器)

  1. {
  2. "gsub": {
  3. "field": "field1",
  4. "pattern": "\\.",
  5. "replacement": "-"
  6. }
  7. }

Join Processor(连接处理器)

  1. {
  2. "join": {
  3. "field": "joined_array_field",
  4. "separator": "-"
  5. }
  6. }

JSON Processor(JSON 处理器)

  1. {
  2. "json" : {
  3. "field" : "string_source",
  4. "target_field" : "json_target"
  5. }
  6. }

KV Processor(KV 处理器)

  1. {
  2. "kv": {
  3. "field": "message",
  4. "field_split": "&",
  5. "value_split": "="
  6. }
  7. }

Lowercase Processor(小写处理器)

  1. {
  2. "lowercase": {
  3. "field": "field1"
  4. }
  5. }

Uppercase Processor(大写处理器)

  1. {
  2. "lowercase": {
  3. "field": "field1"
  4. }
  5. }

Remove Processor(删除处理器)

  1. {
  2. "remove": {
  3. "field": "field1"
  4. }
  5. }

Rename Processor(重命名处理器)

  1. {
  2. "rename": {
  3. "field": "field3",
  4. "target_field": "field4"
  5. }
  6. }

Split Processor(拆分处理器)

  1. {
  2. "split": {
  3. "field": "tags",
  4. "separator": ","
  5. }
  6. }

Sort Processor(排序处理器)

  1. {
  2. "sort": {
  3. "field": "tags",
  4. "order": "desc"
  5. }
  6. }

Trim Processor(修剪处理器)

  1. {
  2. "trim": {
  3. "field": "foo"
  4. }
  5. }

Script Processor(脚本处理器)

上下文 语法
Ingestion ctx.field_name
Update ctx._source.fidld_name
Search & Aggregation doc[‘field_name’]
  1. # 增加一个 Script Prcessor
  2. POST _ingest/pipeline/_simulate
  3. {
  4. "pipeline": {
  5. "description": "to split blog tags",
  6. "processors": [
  7. {
  8. "split": {
  9. "field": "tags",
  10. "separator": ","
  11. }
  12. },
  13. {
  14. "script": {
  15. "source": """
  16. if(ctx.containsKey("content")){
  17. ctx.content_length = ctx.content.length();
  18. }else{
  19. ctx.content_length=0;
  20. }
  21. """
  22. }
  23. },
  24. {
  25. "set":{
  26. "field": "views",
  27. "value": 0
  28. }
  29. }
  30. ]
  31. },
  32. "docs": [
  33. {
  34. "_index":"index",
  35. "_id":"id",
  36. "_source":{
  37. "title":"Introducing big data......",
  38. "tags":"hadoop,elasticsearch,spark",
  39. "content":"You konw, for big data"
  40. }
  41. },
  42. {
  43. "_index":"index",
  44. "_id":"idxx",
  45. "_source":{
  46. "title":"Introducing cloud computering",
  47. "tags":"openstack,k8s",
  48. "content":"You konw, for cloud"
  49. }
  50. }
  51. ]
  52. }
  53. DELETE tech_blogs
  54. PUT tech_blogs/_doc/1
  55. {
  56. "title":"Introducing big data......",
  57. "tags":"hadoop,elasticsearch,spark",
  58. "content":"You konw, for big data",
  59. "views":0
  60. }
  61. POST tech_blogs/_update/1
  62. {
  63. "script": {
  64. "source": "ctx._source.views += params.new_views",
  65. "params": {
  66. "new_views":100
  67. }
  68. }
  69. }
  70. # 查看views计数
  71. POST tech_blogs/_search
  72. {
  73. }
  74. #保存脚本在 Cluster State
  75. POST _scripts/update_views
  76. {
  77. "script":{
  78. "lang": "painless",
  79. "source": "ctx._source.views += params.new_views"
  80. }
  81. }
  82. POST tech_blogs/_update/1
  83. {
  84. "script": {
  85. "id": "update_views",
  86. "params": {
  87. "new_views":1000
  88. }
  89. }
  90. }
  91. GET tech_blogs/_search
  92. {
  93. "script_fields": {
  94. "rnd_views": {
  95. "script": {
  96. "lang": "painless",
  97. "source": """
  98. java.util.Random rnd = new Random();
  99. doc['views'].value+rnd.nextInt(1000);
  100. """
  101. }
  102. }
  103. },
  104. "query": {
  105. "match_all": {}
  106. }
  107. }

数据重建索引

_update_by_query需要指定条件,避免重建已经经过 Pinpeline 的文档,导致数据类型异常报错。

  1. POST tech_blogs/_update_by_query?pipeline=blog_pipeline
  2. {
  3. "query": {
  4. "bool": {
  5. "must_not": {
  6. "exists": {
  7. "field": "views"
  8. }
  9. }
  10. }
  11. }
  12. }

Ingest Node v.s Logstash

Logstash Ingest Node
数据输入与输出 支持从不同的数据源读取,
并写入不同的数据源
支持从 ES REST API 获取数据,并且写入 Elasticsearch
数据缓冲 实现了简单的数据队列,支持重写 不支持缓冲
数据处理 支持大量的插件,也支持定制开发 内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启)
配置和使用 增加了一定的架构复杂度 无需额外部署