Ingest 节点
ingest 节点可以看作是数据前置处理转换的节点,支持 pipeline 管道 设置,可以使用 ingest 对数据进行过滤、转换等操作,类似于 logstash 中 filter 的作用,功能相当强大。
我把 Ingest 节点的功能抽象为:大数据处理环节的 “ETL”——抽取、转换、加载。
Ingest节点基本概念
在实际文档索引发生之前,使用Ingest节点预处理文档。Ingest节点拦截批量和索引请求,它应用转换,然后将文档传递回索引或Bulk API。
强调一下: Ingest节点处理时机——在数据被索引之前,通过预定义好的处理管道对数据进行预处理。
默认情况下,所有节点都启用Ingest,因此任何节点都可以处理Ingest任务。我们也可以创建专用的Ingest节点。
要禁用节点的Ingest功能,需要在elasticsearch.yml 设置如下:
node.ingest:false
Ingest 节点能解决什么问题?
上面的 Ingest 节点介绍太官方,看不大懂怎么办?来个实战场景例子吧。
思考问题 1:线上写入数据改字段需求
如何在数据写入阶段修改字段名(不是修改字段值)?
思考问题 2:线上业务数据添加特定字段需求
如何在批量写入数据的时候,每条 document 插入实时时间戳?
这时,脑海里开始对已有的知识点进行搜索。
针对思考问题 1:字段值的修改无非:update, update_by_query?但是字段名呢?貌似没有相关接口或实现。
针对思考问题 2:插入的时候,业务层面处理,读取当前时间并写入貌似可以,有没有不动业务层面的字段的方法呢?
答案是有的,这就是 Ingest 节点的妙处。
Ingest 实践
写入数据改字段需求
PUT _ingest/pipeline/rename_hostname
{
"description" : "这是测试的管道内容",
"processors": [
{
"field": "hostname",
"target_field": "host",
"ignore_missing": true
}
}
]
}
PUT server
POST server/values/?pipeline=rename_hostname
{
"hostname": "myserver"
}
如上,借助 Ingest 节点的 rename_hostname 管道的预处理功能,实现了字段名称的变更:由 hostname 改成 host。
添加特定字段需求
通过indexed_at管道的set处理器与ms-test的索引层面关联操作,
ms-test索引每插入一篇document,都会自动添加一个字段index_at=最新时间戳。
PUT _ingest/pipeline/indexed_at
{
"description": "Adds indexed_at timestamp to documents",
"processors": [
{
"set": {
"field": "_source.indexed_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
PUT ms-test
{
"settings": {
"index.default_pipeline": "indexed_at"
}
}
POST ms-test/_doc/1
{"title":"just testing"}
这里就涉及几个知识点:
1、预处理 pre-process
要在数据索引化 (indexing) 之前预处理文档。
2、管道 pipeline
每个预处理过程可以指定包含一个或多个处理器的管道。
管道的实际组成:
{
"description" : "...",
"processors" : [ ... ]
}
description:管道功能描述。
processors:注意是数组,可以指定 1 个或多个处理器。
3、处理器 processors
每个处理器以某种特定方式转换文档。
例如,管道可能有一个从文档中删除字段的处理器,然后是另一个重命名字段的处理器。
为管道设置版本号
这里需要注意,根据官方文档的介绍,此版本号没有实际作用,仅仅方便使用者根据其业务环境进行外部的版本管理
PUT _ingest/pipeline/test_pipeline
{
"description" : "这是测试的管道内容",
"version": 100,
"processors" : [
{
"set" : {
"field": "des",
"value": "这是管道默认数据"
}
}
]
}
查询管道
查询索引中的管道
通过GET请求我们可以查询出,当前管道的配置信息
GET _ingest/pipeline/test_pipeline
返回
{
"test_pipeline" : {
"description" : "这是测试的管道内容",
"version" : 100,
"processors" : [
{
"set" : {
"field" : "des",
"value" : "这是管道默认数据"
}
}
]
}
}
查询管道版本
GET _ingest/pipeline/test_pipeline?filter_path=*.version
返回
{
"test_pipeline" : {
"version" : 100
}
}
删除管道
DELETE _ingest/pipeline/test_pipeline
删除所有管道
和文档操作类似,你可以使用*符号匹配所有的管道进行清除。
DELETE _ingest/pipeline/*
使用管道
现在我们存在一个这样的索引
"test_field2": {
"aliases": {},
"mappings": {
"properties": {
"channel": {
"type": "nested"
},
"des": {
"type": "text"
},
"name": {
"type": "keyword"
}
}
},
"settings": {
"index": {
"creation_date": "1574774756443",
"number_of_shards": "1",
"number_of_replicas": "1",
"uuid": "RCF17AZOR1GPs84LKw88lA",
"version": {
"created": "7020099"
},
"provided_name": "test_field2"
}
}
}
现在我们尝试向索引中插入数据。
使用已经存在管道
PUT test_field2/_doc/1?pipeline=test_pipeline
{
"name": "内容1",
"desc": "描述1",
"channel": [
{
"name": "one",
"num": 33
},
{
"name": "two",
"num": 44
}
]
}
然后查询ID为1的数据会得到下面内容
{
"_index" : "test_field2",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"des" : "这是管道默认数据",
"name" : "内容1",
"channel" : [
{
"num" : 33,
"name" : "one"
},
{
"num" : 44,
"name" : "two"
}
],
"desc" : "描述1"
}
}
会发现其des内容并非我们插入的数据,而是管道设置的参数。
从上面的内容可以发现,管道其实很像是我们平时使用的拦截器操作,会拦截一些操作然后对其进行修改。
请求数据的时候定义管道
除了上面的使用方式,我们也可以在请求数据的时候使用管道
PUT test_field2/_doc/5
{
"pipeline": {
"description": "这是自定义的管道内容",
"processors": [
{
"set": {
"field": "des",
"value": "这是管道默认数据"
}
}
]
},
"docs": [
{
"_source": {
"name": "内容1",
"desc": "描述1",
"channel": [
{
"name": "one",
"num": 33
},
{
"name": "two",
"num": 44
}
]
}
}
]
}
此时我们尝试查询数据会发现数据使用的是自定义的管道参数
{
"_index" : "test_field2",
"_type" : "_doc",
"_id" : "5",
"_version" : 1,
"_seq_no" : 2,
"_primary_term" : 1,
"found" : true,
"_source" : {
"pipeline" : {
"description" : "这是自定义的管道内容",
"processors" : [
{
"set" : {
"field" : "des",
"value" : "这是管道默认数据"
}
}
]
},
"docs" : [
{
"_source" : {
"name" : "内容1",
"desc" : "描述1",
"channel" : [
{
"name" : "one",
"num" : 33
},
{
"name" : "two",
"num" : 44
}
]
}
}
]
}
}
模拟管道
当然官方也提供了_simulate 方法让你模拟数据在经过了管道的处理后所得到的结果
POST _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
返回结果
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
查看管道对文件的影响
POST _ingest/pipeline/_simulate?verbose
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value2"
}
},
{
"set" : {
"field" : "field3",
"value" : "_value3"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
返回内容
{
"docs": [
{
"processor_results": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
}
}
}
]
},
{
"processor_results": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
}
}
}
]
}
]
}
Ingest API
Ingest API 共分为 4 种操作,分别对应:
PUT(新增)
GET(获取)
DELETE(删除)
Simulate (仿真模拟)
模拟管道 AP Simulate 针对请求正文中提供的文档集执行特定管道。
除此之外,高阶操作包括:
1、支持复杂条件的 Nested 类型的操作;
2、限定条件的管道操作;
3、限定条件的正则操作等。
详细内容,参见官网即可。
常见的处理器有如下 28 种,举例:Í
append处理器:添加1个或1组字段值; convert处理器:支持类型转换。
选型小结
1、两种方式各有利弊,建议小数据规模,建议使用 Ingest 节点。原因:架构模型简单,不需要额外的硬件设备支撑。
2、数据规模大之后,除了建议独立 Ingest 节点,同时建议架构中使用 Logstash 结合消息队列如 Kafka 的架构选型。
3、将 Logstash 和 Ingest 节点结合,也是架构选型参考方案之一。