Ingest 节点

ingest 节点可以看作是数据前置处理转换的节点,支持 pipeline 管道 设置,可以使用 ingest 对数据进行过滤、转换等操作,类似于 logstash 中 filter 的作用,功能相当强大。
我把 Ingest 节点的功能抽象为:大数据处理环节的 “ETL”——抽取、转换、加载。

Ingest节点基本概念
在实际文档索引发生之前,使用Ingest节点预处理文档。Ingest节点拦截批量和索引请求,它应用转换,然后将文档传递回索引或Bulk API。

强调一下: Ingest节点处理时机——在数据被索引之前,通过预定义好的处理管道对数据进行预处理。

默认情况下,所有节点都启用Ingest,因此任何节点都可以处理Ingest任务。我们也可以创建专用的Ingest节点。

要禁用节点的Ingest功能,需要在elasticsearch.yml 设置如下:

  1. node.ingestfalse

Ingest 节点能解决什么问题?

上面的 Ingest 节点介绍太官方,看不大懂怎么办?来个实战场景例子吧。
思考问题 1:线上写入数据改字段需求
如何在数据写入阶段修改字段名(不是修改字段值)?

思考问题 2:线上业务数据添加特定字段需求
如何在批量写入数据的时候,每条 document 插入实时时间戳?
这时,脑海里开始对已有的知识点进行搜索。
针对思考问题 1:字段值的修改无非:update, update_by_query?但是字段名呢?貌似没有相关接口或实现。
针对思考问题 2:插入的时候,业务层面处理,读取当前时间并写入貌似可以,有没有不动业务层面的字段的方法呢?
答案是有的,这就是 Ingest 节点的妙处。

Ingest 实践

写入数据改字段需求

  1. PUT _ingest/pipeline/rename_hostname
  2. {
  3. "description" : "这是测试的管道内容",
  4. "processors": [
  5. {
  6. "field": "hostname",
  7. "target_field": "host",
  8. "ignore_missing": true
  9. }
  10. }
  11. ]
  12. }
  1. PUT server
  2. POST server/values/?pipeline=rename_hostname
  3. {
  4. "hostname": "myserver"
  5. }

如上,借助 Ingest 节点的 rename_hostname 管道的预处理功能,实现了字段名称的变更:由 hostname 改成 host。

添加特定字段需求

通过indexed_at管道的set处理器与ms-test的索引层面关联操作,
ms-test索引每插入一篇document,都会自动添加一个字段index_at=最新时间戳。

  1. PUT _ingest/pipeline/indexed_at
  2. {
  3. "description": "Adds indexed_at timestamp to documents",
  4. "processors": [
  5. {
  6. "set": {
  7. "field": "_source.indexed_at",
  8. "value": "{{_ingest.timestamp}}"
  9. }
  10. }
  11. ]
  12. }
  13. PUT ms-test
  14. {
  15. "settings": {
  16. "index.default_pipeline": "indexed_at"
  17. }
  18. }
  19. POST ms-test/_doc/1
  20. {"title":"just testing"}

这里就涉及几个知识点:
1、预处理 pre-process
要在数据索引化 (indexing) 之前预处理文档。
2、管道 pipeline
每个预处理过程可以指定包含一个或多个处理器的管道。
管道的实际组成:

  1. {
  2. "description" : "...",
  3. "processors" : [ ... ]
  4. }

description:管道功能描述。
processors:注意是数组,可以指定 1 个或多个处理器。
3、处理器 processors
每个处理器以某种特定方式转换文档。
例如,管道可能有一个从文档中删除字段的处理器,然后是另一个重命名字段的处理器。

为管道设置版本号

这里需要注意,根据官方文档的介绍,此版本号没有实际作用,仅仅方便使用者根据其业务环境进行外部的版本管理

  1. PUT _ingest/pipeline/test_pipeline
  2. {
  3. "description" : "这是测试的管道内容",
  4. "version": 100,
  5. "processors" : [
  6. {
  7. "set" : {
  8. "field": "des",
  9. "value": "这是管道默认数据"
  10. }
  11. }
  12. ]
  13. }

查询管道

查询索引中的管道
通过GET请求我们可以查询出,当前管道的配置信息

  1. GET _ingest/pipeline/test_pipeline
  2. 返回
  3. {
  4. "test_pipeline" : {
  5. "description" : "这是测试的管道内容",
  6. "version" : 100,
  7. "processors" : [
  8. {
  9. "set" : {
  10. "field" : "des",
  11. "value" : "这是管道默认数据"
  12. }
  13. }
  14. ]
  15. }
  16. }

查询管道版本

  1. GET _ingest/pipeline/test_pipeline?filter_path=*.version
  2. 返回
  3. {
  4. "test_pipeline" : {
  5. "version" : 100
  6. }
  7. }

删除管道

  1. DELETE _ingest/pipeline/test_pipeline

删除所有管道
和文档操作类似,你可以使用*符号匹配所有的管道进行清除。

  1. DELETE _ingest/pipeline/*

使用管道

现在我们存在一个这样的索引

  1. "test_field2": {
  2. "aliases": {},
  3. "mappings": {
  4. "properties": {
  5. "channel": {
  6. "type": "nested"
  7. },
  8. "des": {
  9. "type": "text"
  10. },
  11. "name": {
  12. "type": "keyword"
  13. }
  14. }
  15. },
  16. "settings": {
  17. "index": {
  18. "creation_date": "1574774756443",
  19. "number_of_shards": "1",
  20. "number_of_replicas": "1",
  21. "uuid": "RCF17AZOR1GPs84LKw88lA",
  22. "version": {
  23. "created": "7020099"
  24. },
  25. "provided_name": "test_field2"
  26. }
  27. }
  28. }

现在我们尝试向索引中插入数据。
使用已经存在管道

  1. PUT test_field2/_doc/1?pipeline=test_pipeline
  2. {
  3. "name": "内容1",
  4. "desc": "描述1",
  5. "channel": [
  6. {
  7. "name": "one",
  8. "num": 33
  9. },
  10. {
  11. "name": "two",
  12. "num": 44
  13. }
  14. ]
  15. }

然后查询ID为1的数据会得到下面内容

  1. {
  2. "_index" : "test_field2",
  3. "_type" : "_doc",
  4. "_id" : "1",
  5. "_version" : 1,
  6. "_seq_no" : 0,
  7. "_primary_term" : 1,
  8. "found" : true,
  9. "_source" : {
  10. "des" : "这是管道默认数据",
  11. "name" : "内容1",
  12. "channel" : [
  13. {
  14. "num" : 33,
  15. "name" : "one"
  16. },
  17. {
  18. "num" : 44,
  19. "name" : "two"
  20. }
  21. ],
  22. "desc" : "描述1"
  23. }
  24. }

会发现其des内容并非我们插入的数据,而是管道设置的参数。

从上面的内容可以发现,管道其实很像是我们平时使用的拦截器操作,会拦截一些操作然后对其进行修改。

请求数据的时候定义管道
除了上面的使用方式,我们也可以在请求数据的时候使用管道

  1. PUT test_field2/_doc/5
  2. {
  3. "pipeline": {
  4. "description": "这是自定义的管道内容",
  5. "processors": [
  6. {
  7. "set": {
  8. "field": "des",
  9. "value": "这是管道默认数据"
  10. }
  11. }
  12. ]
  13. },
  14. "docs": [
  15. {
  16. "_source": {
  17. "name": "内容1",
  18. "desc": "描述1",
  19. "channel": [
  20. {
  21. "name": "one",
  22. "num": 33
  23. },
  24. {
  25. "name": "two",
  26. "num": 44
  27. }
  28. ]
  29. }
  30. }
  31. ]
  32. }

此时我们尝试查询数据会发现数据使用的是自定义的管道参数

  1. {
  2. "_index" : "test_field2",
  3. "_type" : "_doc",
  4. "_id" : "5",
  5. "_version" : 1,
  6. "_seq_no" : 2,
  7. "_primary_term" : 1,
  8. "found" : true,
  9. "_source" : {
  10. "pipeline" : {
  11. "description" : "这是自定义的管道内容",
  12. "processors" : [
  13. {
  14. "set" : {
  15. "field" : "des",
  16. "value" : "这是管道默认数据"
  17. }
  18. }
  19. ]
  20. },
  21. "docs" : [
  22. {
  23. "_source" : {
  24. "name" : "内容1",
  25. "desc" : "描述1",
  26. "channel" : [
  27. {
  28. "name" : "one",
  29. "num" : 33
  30. },
  31. {
  32. "name" : "two",
  33. "num" : 44
  34. }
  35. ]
  36. }
  37. }
  38. ]
  39. }
  40. }

模拟管道

当然官方也提供了_simulate 方法让你模拟数据在经过了管道的处理后所得到的结果

POST _ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

返回结果

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

查看管道对文件的影响

POST _ingest/pipeline/_simulate?verbose
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value2"
        }
      },
      {
        "set" : {
          "field" : "field3",
          "value" : "_value3"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}


返回内容

{
   "docs": [
      {
         "processor_results": [
            {
               "doc": {
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
                     "field2": "_value2",
                     "foo": "bar"
                  },
                  "_ingest": {
                     "timestamp": "2017-05-04T22:46:09.674Z"
                  }
               }
            },
            {
               "doc": {
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
                     "field3": "_value3",
                     "field2": "_value2",
                     "foo": "bar"
                  },
                  "_ingest": {
                     "timestamp": "2017-05-04T22:46:09.675Z"
                  }
               }
            }
         ]
      },
      {
         "processor_results": [
            {
               "doc": {
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
                     "field2": "_value2",
                     "foo": "rab"
                  },
                  "_ingest": {
                     "timestamp": "2017-05-04T22:46:09.676Z"
                  }
               }
            },
            {
               "doc": {
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
                     "field3": "_value3",
                     "field2": "_value2",
                     "foo": "rab"
                  },
                  "_ingest": {
                     "timestamp": "2017-05-04T22:46:09.677Z"
                  }
               }
            }
         ]
      }
   ]
}

Ingest API

Ingest API 共分为 4 种操作,分别对应:

PUT(新增) 
GET(获取)
DELETE(删除)
Simulate (仿真模拟)

模拟管道 AP Simulate 针对请求正文中提供的文档集执行特定管道。
除此之外,高阶操作包括:
1、支持复杂条件的 Nested 类型的操作;
2、限定条件的管道操作;
3、限定条件的正则操作等。
详细内容,参见官网即可。
常见的处理器有如下 28 种,举例:Í
append处理器:添加1个或1组字段值; convert处理器:支持类型转换。
image.png

选型小结


1、两种方式各有利弊,建议小数据规模,建议使用 Ingest 节点。原因:架构模型简单,不需要额外的硬件设备支撑。
2、数据规模大之后,除了建议独立 Ingest 节点,同时建议架构中使用 Logstash 结合消息队列如 Kafka 的架构选型。
3、将 Logstash 和 Ingest 节点结合,也是架构选型参考方案之一。