Ingest Node

  • ES5.0后引入的一种新的节点类型。默认配置下,每个节点都是Ingest Node
    • 具有预处理数据的功能,可拦截Index或Bulk API请求
    • 对数据进行转换,并重新返回给Index 或Bulk API
  • 无需Logstash,就可以进行数据的预处理,例如:

    • 为某个字段设置默认值;重命名某个字段的字段名;对字段进行Split操作
    • 支持Painless脚本,对数据进行更复杂的加工

      Pipline & Processor

  • Pipeline - 管道会对通过的数据(文档),按照顺序进行加工

  • Processor - ES对一些加工的行为进行了抽象包装

    • ES提供了很多内置的Processors。也支持 通过插件的方式,实现自己的Processor
      • split - 例如:将给定字段切割成数组
      • remove / rename - 例如:移除/重命名一个字段
      • append - 例如:为商品新增加标签
      • convert - 例如:将商品价格从字符串转换为float
      • Date / JSON - 例如:日期格式转换,字符串转json对象

        Simulate API

  • 模拟Pipeline

    1. post _ingest/pipeline/_simulate
    2. {
    3. "pipeline":{
    4. "description":"切割tags",
    5. "processors":[
    6. {
    7. // 字段切割
    8. "split":{
    9. "field":"tags",
    10. "separator":","
    11. }
    12. },
    13. {
    14. // 增加字段
    15. "set":{
    16. "field":"views",
    17. "value":0
    18. }
    19. }
    20. ]
    21. },
    22. // 测试文档
    23. "docs":[
    24. {
    25. "_index":"index",
    26. "_id":"id",
    27. "_source":{
    28. "title":"Big Data",
    29. "tags":"hadoop,spark,elasticsearch"
    30. }
    31. }
    32. ]
    33. }
    34. // 新增pipeline
    35. put _ingest/pipeline/blog_pipeline -- pipeline 名字
    36. {
    37. "description":"切割tags",
    38. "processors":[
    39. {
    40. // 字段切割
    41. "split":{
    42. "field":"tags",
    43. "separator":","
    44. }
    45. },
    46. {
    47. // 增加字段
    48. "set":{
    49. "field":"views",
    50. "value":0
    51. }
    52. }
    53. ]
    54. }
    55. // 查看pipeline
    56. get _ingest/pipeline/blog_pipeline
    57. // 测试pipeline
    58. post _ingest/pipeline/blog_pipeline
    59. {
    60. "docs":[
    61. {
    62. "_index":"index",
    63. "_id":"id",
    64. "_source":{
    65. "title":"Big Data",
    66. "tags":"hadoop,spark,elasticsearch"
    67. }
    68. }
    69. ]
    70. }
    71. // 使用pipeline插入数据
    72. put tech_blogs/_doc/2?pipeline=blog_pipeline
    73. {
    74. "title":"Big Data",
    75. "tags":"hadoop,spark,elasticsearch"
    76. }
    77. // update_by_query使用pipeline
    78. post tech_blogs/_update_by_query?pipeline=blog_pipeline
    79. {
    80. "query":{
    81. "bool":{
    82. "must_not":{
    83. "exist":{
    84. "field":"views
    85. }
    86. }
    87. }
    88. }
    89. }

    Painless用途

  • 可对文档字段进行加工处理

    • 更新或删除字段,处理数据聚合操作
    • script field:对返回的字段进行提前计算
    • function score:对文档的算分进行处理
  • 在ingest pipeline中执行脚本
  • 在reindex API,update_by_query时,对数据进行计算

    painless脚本访问字段

    | 上下文 | 语法 | | :—-: | :—-: | | ingest | ctx.field_name | | update | ctx.source.field_name | | Search & Aggregation | doc[“field_name”] |

  1. // script
  2. put _ingest/pipeline/blog_pipeline -- pipeline 名字
  3. {
  4. "description":"切割tags",
  5. "processors":[
  6. {
  7. // 字段切割
  8. "split":{
  9. "field":"tags",
  10. "separator":","
  11. }
  12. },
  13. {
  14. "script":{
  15. "source": """
  16. if(ctx.containsKey("content")){
  17. ctx.content_length = ctx.content.length();
  18. }else {
  19. ctx.content_length=0;
  20. }
  21. """
  22. }
  23. },
  24. {
  25. // 增加字段
  26. "set":{
  27. "field":"views",
  28. "value":0
  29. }
  30. }
  31. ]
  32. }
  33. // update script
  34. post tech_blogs/_update/1
  35. {
  36. "script":{
  37. "source":"ctx._source.views += params.new_views",
  38. "params":{
  39. "new_views":100
  40. }
  41. }
  42. }
  43. // 脚本保存到Cluster State
  44. post _scripts/update_views
  45. {
  46. "script":{
  47. "lang":"painless",
  48. "source":"ctx._source.views += params.new_views"
  49. }
  50. }
  51. // 使用保存的脚本
  52. post tech_blogs/_update/1
  53. {
  54. "script":{
  55. "id":"update_views",
  56. "params":{
  57. "new_views":1000
  58. }
  59. }
  60. }
  61. // search增加script字段
  62. get tech_blogs/_search
  63. {
  64. "script_fields":{
  65. "rnd_views:{
  66. "script":{
  67. "lang":"painless",
  68. "source":"""
  69. java.util.Random rnd = new Random;
  70. doc['views'].value + rnd.nextInt(1000);
  71. """
  72. }
  73. }
  74. }
  75. }

脚本缓存

  • 编译开销较大
  • ES会将脚本编译后缓存在cache中
    • Inline scripts 和 Stored Scripts 都会被缓存
    • 默认缓存100个脚本 | 参数 | 说明 | | —- | —- | | script.cache.max_size | 设置最大缓存数 | | script.cache.expire | 设置缓存超时 | | script.max_compilations_rate | 默认5分钟最多75次编译(75/5m) |