- Pinpeline & Processor 概念
- Pinpeline 使用
- 内置的 Processors
- Set Processor(设置处理器)
- Append Processor(追加处理器)
- Convert Processor(转换处理器)
- Date Processor(日期处理器)
- Date Index Name Processor(日期索引名称处理器)
- Grok Processor(Grok 处理器)
- Gsub Processor(Gsub 处理器)
- Join Processor(连接处理器)
- JSON Processor(JSON 处理器)
- KV Processor(KV 处理器)
- Lowercase Processor(小写处理器)
- Uppercase Processor(大写处理器)
- Remove Processor(删除处理器)
- Rename Processor(重命名处理器)
- Split Processor(拆分处理器)
- Sort Processor(排序处理器)
- Trim Processor(修剪处理器)
- Script Processor(脚本处理器)
- 数据重建索引
- Ingest Node v.s Logstash
Elasticsearch 5.0 后,引入的一种新的节点类型。默认配置下,每个节点都是 Ingest Node。
- 具有预处理数据的能力,可拦截 Index 或 Bulk API 的请求。
- 为某个字段设置默认值:重命名某个字段的字段名;对字段值进行 Split 操作
- 支持设置 Painless 脚本,对数据进行更加复杂的加工。
对数据进行转换,并重新返回给 Index 或 Bulk API。
Pinpeline & Processor 概念
Pipeline:管道会对通过的数据(文档),按照顺序进行加工
- Processor:Elasticsearch 很多内置的 Processor,也支持通过插件的方式,实现自己的 Processor。
Pinpeline 使用
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
}
GET _ingest/pipeline/blog_pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
内置的 Processors
Set Processor(设置处理器)
{
"set": {
"field": "price_doc",
"value": "价格:{{_source.price}}"
}
}
Append Processor(追加处理器)
{
"append": {
"field": "field1",
"value": ["item2", "item3", "item4"]
}
}
Convert Processor(转换处理器)
{
"convert": {
"field" : "foo",
"target_field": "foo",
"type": "integer",
"ignore_missing": false
}
}
# field:将要转换其值的 field(字段)。
# target_field:可选,指定转换结果的 field(字段),默认情况下更新到原来的 field 上。
# type:转换现有值的 type。integer,float,string,boolean,和 auto。
# ignore_missing:如果为 true 的情况下,并且该 field 不存在或者是 null,则该 processor(处理器)将静默退出而不修改文档。
Date Processor(日期处理器)
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["dd/MM/yyyy hh:mm:ss"],
"timezone" : "Europe/Amsterdam"
}
}
Date Index Name Processor(日期索引名称处理器)
{
"date_index_name" : {
"field" : "date1",
"index_name_prefix" : "myindex-",
"date_rounding" : "M"
}
}
Grok Processor(Grok 处理器)
{
"grok": {
"field": "message",
"patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]
}
}
# "message": "55.3.244.1 GET /index.html 15824 0.043"
Gsub Processor(Gsub 处理器)
{
"gsub": {
"field": "field1",
"pattern": "\\.",
"replacement": "-"
}
}
Join Processor(连接处理器)
{
"join": {
"field": "joined_array_field",
"separator": "-"
}
}
JSON Processor(JSON 处理器)
{
"json" : {
"field" : "string_source",
"target_field" : "json_target"
}
}
KV Processor(KV 处理器)
{
"kv": {
"field": "message",
"field_split": "&",
"value_split": "="
}
}
Lowercase Processor(小写处理器)
{
"lowercase": {
"field": "field1"
}
}
Uppercase Processor(大写处理器)
{
"lowercase": {
"field": "field1"
}
}
Remove Processor(删除处理器)
{
"remove": {
"field": "field1"
}
}
Rename Processor(重命名处理器)
{
"rename": {
"field": "field3",
"target_field": "field4"
}
}
Split Processor(拆分处理器)
{
"split": {
"field": "tags",
"separator": ","
}
}
Sort Processor(排序处理器)
{
"sort": {
"field": "tags",
"order": "desc"
}
}
Trim Processor(修剪处理器)
{
"trim": {
"field": "foo"
}
}
Script Processor(脚本处理器)
上下文 | 语法 |
---|---|
Ingestion | ctx.field_name |
Update | ctx._source.fidld_name |
Search & Aggregation | doc[‘field_name’] |
# 增加一个 Script Prcessor
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"script": {
"source": """
if(ctx.containsKey("content")){
ctx.content_length = ctx.content.length();
}else{
ctx.content_length=0;
}
"""
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}
]
}
DELETE tech_blogs
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data",
"views":0
}
POST tech_blogs/_update/1
{
"script": {
"source": "ctx._source.views += params.new_views",
"params": {
"new_views":100
}
}
}
# 查看views计数
POST tech_blogs/_search
{
}
#保存脚本在 Cluster State
POST _scripts/update_views
{
"script":{
"lang": "painless",
"source": "ctx._source.views += params.new_views"
}
}
POST tech_blogs/_update/1
{
"script": {
"id": "update_views",
"params": {
"new_views":1000
}
}
}
GET tech_blogs/_search
{
"script_fields": {
"rnd_views": {
"script": {
"lang": "painless",
"source": """
java.util.Random rnd = new Random();
doc['views'].value+rnd.nextInt(1000);
"""
}
}
},
"query": {
"match_all": {}
}
}
数据重建索引
_update_by_query需要指定条件,避免重建已经经过 Pinpeline 的文档,导致数据类型异常报错。
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "views"
}
}
}
}
}
Ingest Node v.s Logstash
Logstash | Ingest Node | |
---|---|---|
数据输入与输出 | 支持从不同的数据源读取, 并写入不同的数据源 |
支持从 ES REST API 获取数据,并且写入 Elasticsearch |
数据缓冲 | 实现了简单的数据队列,支持重写 | 不支持缓冲 |
数据处理 | 支持大量的插件,也支持定制开发 | 内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启) |
配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |