- Pinpeline & Processor 概念
- Pinpeline 使用
- 内置的 Processors
- Set Processor(设置处理器)
- Append Processor(追加处理器)
- Convert Processor(转换处理器)
- Date Processor(日期处理器)
- Date Index Name Processor(日期索引名称处理器)
- Grok Processor(Grok 处理器)
- Gsub Processor(Gsub 处理器)
- Join Processor(连接处理器)
- JSON Processor(JSON 处理器)
- KV Processor(KV 处理器)
- Lowercase Processor(小写处理器)
- Uppercase Processor(大写处理器)
- Remove Processor(删除处理器)
- Rename Processor(重命名处理器)
- Split Processor(拆分处理器)
- Sort Processor(排序处理器)
- Trim Processor(修剪处理器)
- Script Processor(脚本处理器)
- 数据重建索引
- Ingest Node v.s Logstash
Elasticsearch 5.0 后,引入的一种新的节点类型。默认配置下,每个节点都是 Ingest Node。
- 具有预处理数据的能力,可拦截 Index 或 Bulk API 的请求。
- 为某个字段设置默认值:重命名某个字段的字段名;对字段值进行 Split 操作
- 支持设置 Painless 脚本,对数据进行更加复杂的加工。
对数据进行转换,并重新返回给 Index 或 Bulk API。
Pinpeline & Processor 概念
Pipeline:管道会对通过的数据(文档),按照顺序进行加工
- Processor:Elasticsearch 很多内置的 Processor,也支持通过插件的方式,实现自己的 Processor。
Pinpeline 使用
POST _ingest/pipeline/_simulate{"pipeline": {"description": "to split blog tags","processors": [{"split": {"field": "tags","separator": ","}},{"set":{"field": "views","value": 0}}]},"docs": [{"_index": "index","_id": "id","_source": {"title": "Introducing big data......","tags": "hadoop,elasticsearch,spark","content": "You konw, for big data"}},{"_index": "index","_id": "idxx","_source": {"title": "Introducing cloud computering","tags": "openstack,k8s","content": "You konw, for cloud"}}]}
PUT _ingest/pipeline/blog_pipeline{"description": "a blog pipeline","processors": [{"split": {"field": "tags","separator": ","}},{"set":{"field": "views","value": 0}}]}
GET _ingest/pipeline/blog_pipelinePOST _ingest/pipeline/blog_pipeline/_simulate{"docs": [{"_source": {"title": "Introducing cloud computering","tags": "openstack,k8s","content": "You konw, for cloud"}}]}
PUT tech_blogs/_doc/2?pipeline=blog_pipeline{"title": "Introducing cloud computering","tags": "openstack,k8s","content": "You konw, for cloud"}
内置的 Processors
Set Processor(设置处理器)
{"set": {"field": "price_doc","value": "价格:{{_source.price}}"}}
Append Processor(追加处理器)
{"append": {"field": "field1","value": ["item2", "item3", "item4"]}}
Convert Processor(转换处理器)
{"convert": {"field" : "foo","target_field": "foo","type": "integer","ignore_missing": false}}# field:将要转换其值的 field(字段)。# target_field:可选,指定转换结果的 field(字段),默认情况下更新到原来的 field 上。# type:转换现有值的 type。integer,float,string,boolean,和 auto。# ignore_missing:如果为 true 的情况下,并且该 field 不存在或者是 null,则该 processor(处理器)将静默退出而不修改文档。
Date Processor(日期处理器)
{"date" : {"field" : "initial_date","target_field" : "timestamp","formats" : ["dd/MM/yyyy hh:mm:ss"],"timezone" : "Europe/Amsterdam"}}
Date Index Name Processor(日期索引名称处理器)
{"date_index_name" : {"field" : "date1","index_name_prefix" : "myindex-","date_rounding" : "M"}}
Grok Processor(Grok 处理器)
{"grok": {"field": "message","patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]}}# "message": "55.3.244.1 GET /index.html 15824 0.043"
Gsub Processor(Gsub 处理器)
{"gsub": {"field": "field1","pattern": "\\.","replacement": "-"}}
Join Processor(连接处理器)
{"join": {"field": "joined_array_field","separator": "-"}}
JSON Processor(JSON 处理器)
{"json" : {"field" : "string_source","target_field" : "json_target"}}
KV Processor(KV 处理器)
{"kv": {"field": "message","field_split": "&","value_split": "="}}
Lowercase Processor(小写处理器)
{"lowercase": {"field": "field1"}}
Uppercase Processor(大写处理器)
{"lowercase": {"field": "field1"}}
Remove Processor(删除处理器)
{"remove": {"field": "field1"}}
Rename Processor(重命名处理器)
{"rename": {"field": "field3","target_field": "field4"}}
Split Processor(拆分处理器)
{"split": {"field": "tags","separator": ","}}
Sort Processor(排序处理器)
{"sort": {"field": "tags","order": "desc"}}
Trim Processor(修剪处理器)
{"trim": {"field": "foo"}}
Script Processor(脚本处理器)
| 上下文 | 语法 |
|---|---|
| Ingestion | ctx.field_name |
| Update | ctx._source.fidld_name |
| Search & Aggregation | doc[‘field_name’] |
# 增加一个 Script PrcessorPOST _ingest/pipeline/_simulate{"pipeline": {"description": "to split blog tags","processors": [{"split": {"field": "tags","separator": ","}},{"script": {"source": """if(ctx.containsKey("content")){ctx.content_length = ctx.content.length();}else{ctx.content_length=0;}"""}},{"set":{"field": "views","value": 0}}]},"docs": [{"_index":"index","_id":"id","_source":{"title":"Introducing big data......","tags":"hadoop,elasticsearch,spark","content":"You konw, for big data"}},{"_index":"index","_id":"idxx","_source":{"title":"Introducing cloud computering","tags":"openstack,k8s","content":"You konw, for cloud"}}]}DELETE tech_blogsPUT tech_blogs/_doc/1{"title":"Introducing big data......","tags":"hadoop,elasticsearch,spark","content":"You konw, for big data","views":0}POST tech_blogs/_update/1{"script": {"source": "ctx._source.views += params.new_views","params": {"new_views":100}}}# 查看views计数POST tech_blogs/_search{}#保存脚本在 Cluster StatePOST _scripts/update_views{"script":{"lang": "painless","source": "ctx._source.views += params.new_views"}}POST tech_blogs/_update/1{"script": {"id": "update_views","params": {"new_views":1000}}}GET tech_blogs/_search{"script_fields": {"rnd_views": {"script": {"lang": "painless","source": """java.util.Random rnd = new Random();doc['views'].value+rnd.nextInt(1000);"""}}},"query": {"match_all": {}}}
数据重建索引
_update_by_query需要指定条件,避免重建已经经过 Pinpeline 的文档,导致数据类型异常报错。
POST tech_blogs/_update_by_query?pipeline=blog_pipeline{"query": {"bool": {"must_not": {"exists": {"field": "views"}}}}}
Ingest Node v.s Logstash
| Logstash | Ingest Node | |
|---|---|---|
| 数据输入与输出 | 支持从不同的数据源读取, 并写入不同的数据源 |
支持从 ES REST API 获取数据,并且写入 Elasticsearch |
| 数据缓冲 | 实现了简单的数据队列,支持重写 | 不支持缓冲 |
| 数据处理 | 支持大量的插件,也支持定制开发 | 内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启) |
| 配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |
