Ingest Node
- ES5.0后引入的一种新的节点类型。默认配置下,每个节点都是Ingest Node
- 具有预处理数据的功能,可拦截Index或Bulk API请求
- 对数据进行转换,并重新返回给Index 或Bulk API
无需Logstash,就可以进行数据的预处理,例如:
Pipeline - 管道会对通过的数据(文档),按照顺序进行加工
Processor - ES对一些加工的行为进行了抽象包装
模拟Pipeline
post _ingest/pipeline/_simulate
{
"pipeline":{
"description":"切割tags",
"processors":[
{
// 字段切割
"split":{
"field":"tags",
"separator":","
}
},
{
// 增加字段
"set":{
"field":"views",
"value":0
}
}
]
},
// 测试文档
"docs":[
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Big Data",
"tags":"hadoop,spark,elasticsearch"
}
}
]
}
// 新增pipeline
put _ingest/pipeline/blog_pipeline -- pipeline 名字
{
"description":"切割tags",
"processors":[
{
// 字段切割
"split":{
"field":"tags",
"separator":","
}
},
{
// 增加字段
"set":{
"field":"views",
"value":0
}
}
]
}
// 查看pipeline
get _ingest/pipeline/blog_pipeline
// 测试pipeline
post _ingest/pipeline/blog_pipeline
{
"docs":[
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Big Data",
"tags":"hadoop,spark,elasticsearch"
}
}
]
}
// 使用pipeline插入数据
put tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title":"Big Data",
"tags":"hadoop,spark,elasticsearch"
}
// update_by_query使用pipeline
post tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query":{
"bool":{
"must_not":{
"exist":{
"field":"views
}
}
}
}
}
Painless用途
可对文档字段进行加工处理
- 更新或删除字段,处理数据聚合操作
- script field:对返回的字段进行提前计算
- function score:对文档的算分进行处理
- 在ingest pipeline中执行脚本
- 在reindex API,update_by_query时,对数据进行计算
painless脚本访问字段
| 上下文 | 语法 | | :—-: | :—-: | | ingest | ctx.field_name | | update | ctx.source.field_name | | Search & Aggregation | doc[“field_name”] |
// script
put _ingest/pipeline/blog_pipeline -- pipeline 名字
{
"description":"切割tags",
"processors":[
{
// 字段切割
"split":{
"field":"tags",
"separator":","
}
},
{
"script":{
"source": """
if(ctx.containsKey("content")){
ctx.content_length = ctx.content.length();
}else {
ctx.content_length=0;
}
"""
}
},
{
// 增加字段
"set":{
"field":"views",
"value":0
}
}
]
}
// update script
post tech_blogs/_update/1
{
"script":{
"source":"ctx._source.views += params.new_views",
"params":{
"new_views":100
}
}
}
// 脚本保存到Cluster State
post _scripts/update_views
{
"script":{
"lang":"painless",
"source":"ctx._source.views += params.new_views"
}
}
// 使用保存的脚本
post tech_blogs/_update/1
{
"script":{
"id":"update_views",
"params":{
"new_views":1000
}
}
}
// search增加script字段
get tech_blogs/_search
{
"script_fields":{
"rnd_views:{
"script":{
"lang":"painless",
"source":"""
java.util.Random rnd = new Random;
doc['views'].value + rnd.nextInt(1000);
"""
}
}
}
}
脚本缓存
- 编译开销较大
- ES会将脚本编译后缓存在cache中
- Inline scripts 和 Stored Scripts 都会被缓存
- 默认缓存100个脚本 | 参数 | 说明 | | —- | —- | | script.cache.max_size | 设置最大缓存数 | | script.cache.expire | 设置缓存超时 | | script.max_compilations_rate | 默认5分钟最多75次编译(75/5m) |