Ingest Node
- ES5.0后引入的一种新的节点类型。默认配置下,每个节点都是Ingest Node
- 具有预处理数据的功能,可拦截Index或Bulk API请求
- 对数据进行转换,并重新返回给Index 或Bulk API
无需Logstash,就可以进行数据的预处理,例如:
Pipeline - 管道会对通过的数据(文档),按照顺序进行加工
Processor - ES对一些加工的行为进行了抽象包装
模拟Pipeline
post _ingest/pipeline/_simulate{"pipeline":{"description":"切割tags","processors":[{// 字段切割"split":{"field":"tags","separator":","}},{// 增加字段"set":{"field":"views","value":0}}]},// 测试文档"docs":[{"_index":"index","_id":"id","_source":{"title":"Big Data","tags":"hadoop,spark,elasticsearch"}}]}// 新增pipelineput _ingest/pipeline/blog_pipeline -- pipeline 名字{"description":"切割tags","processors":[{// 字段切割"split":{"field":"tags","separator":","}},{// 增加字段"set":{"field":"views","value":0}}]}// 查看pipelineget _ingest/pipeline/blog_pipeline// 测试pipelinepost _ingest/pipeline/blog_pipeline{"docs":[{"_index":"index","_id":"id","_source":{"title":"Big Data","tags":"hadoop,spark,elasticsearch"}}]}// 使用pipeline插入数据put tech_blogs/_doc/2?pipeline=blog_pipeline{"title":"Big Data","tags":"hadoop,spark,elasticsearch"}// update_by_query使用pipelinepost tech_blogs/_update_by_query?pipeline=blog_pipeline{"query":{"bool":{"must_not":{"exist":{"field":"views}}}}}
Painless用途
可对文档字段进行加工处理
- 更新或删除字段,处理数据聚合操作
- script field:对返回的字段进行提前计算
- function score:对文档的算分进行处理
- 在ingest pipeline中执行脚本
- 在reindex API,update_by_query时,对数据进行计算
painless脚本访问字段
| 上下文 | 语法 | | :—-: | :—-: | | ingest | ctx.field_name | | update | ctx.source.field_name | | Search & Aggregation | doc[“field_name”] |
// scriptput _ingest/pipeline/blog_pipeline -- pipeline 名字{"description":"切割tags","processors":[{// 字段切割"split":{"field":"tags","separator":","}},{"script":{"source": """if(ctx.containsKey("content")){ctx.content_length = ctx.content.length();}else {ctx.content_length=0;}"""}},{// 增加字段"set":{"field":"views","value":0}}]}// update scriptpost tech_blogs/_update/1{"script":{"source":"ctx._source.views += params.new_views","params":{"new_views":100}}}// 脚本保存到Cluster Statepost _scripts/update_views{"script":{"lang":"painless","source":"ctx._source.views += params.new_views"}}// 使用保存的脚本post tech_blogs/_update/1{"script":{"id":"update_views","params":{"new_views":1000}}}// search增加script字段get tech_blogs/_search{"script_fields":{"rnd_views:{"script":{"lang":"painless","source":"""java.util.Random rnd = new Random;doc['views'].value + rnd.nextInt(1000);"""}}}}
脚本缓存
- 编译开销较大
- ES会将脚本编译后缓存在cache中
- Inline scripts 和 Stored Scripts 都会被缓存
- 默认缓存100个脚本 | 参数 | 说明 | | —- | —- | | script.cache.max_size | 设置最大缓存数 | | script.cache.expire | 设置缓存超时 | | script.max_compilations_rate | 默认5分钟最多75次编译(75/5m) |
