参考:官方链接 参数说明: processors: 内置pipeline支持数据加工处理器 set:使用集合新增数据
创建第一个pipeline
案例:增加省市区
GET _ingest/pipeline/mypipline-001PUT _ingest/pipeline/mypipline-001{"description": "增加省市区数据","processors": [{"set": {"field": "pro","value": "HN"}}]}POST ckiss-company-003/_doc?pipeline=mypipline-001{"companyName": "ckiss Company"}GET ckiss-company-003/_mapping
{"ckiss-company-003" : {"mappings" : {"properties" : {"companyName" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"pro" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}}}}}}
支持对象类型
PUT _ingest/pipeline/mypipline-002{"description": "增加省市区数据","processors": [{"set": {"field": "area.pro","value": "HN"}},{"set": {"field": "area.city","value": "CS"}},{"set": {"field": "area.country","value": "CN"}}]}POST ckiss-company-004/_doc?pipeline=mypipline-002{"companyName": "ckiss Company"}GET ckiss-company-004/_mappingGET ckiss-company-004/_search
响应
"_source" : {"area" : {"country" : "CN","city" : "CS","pro" : "HN"},"companyName" : "ckiss Company"}
经过管道的话,管道有最终决定权
数据格式不规范
POST ckiss-company-003/_doc?pipeline=mypipline-002{"companyName": "ckiss国际集团","area.pro": "JS","area.city": "SZ"}
响应
正确格式
POST ckiss-company-003/_doc?pipeline=mypipline-002{"companyName": "ckiss国际集团","area": {"pro": "JS","city": "SZ"}}
_simulate模拟测试
POST _ingest/pipeline/mypipline-002/_simulate{"docs": [{"_source": {"area": {"pro": "SH"}}}]}POST _ingest/pipeline/mypipline-002/_simulate{"docs": [{"_source": {}}]}# 以上结果一样
访问_source源数据
DELETE _ingest/pipeline/mypipline-002PUT _ingest/pipeline/mypipline-002{"description": "source源数据上下文","processors": [{"set": {"field": "company.companyId","value": "{{_source.companyId}}"}},{"set": {"field": "company.companyName","value": "{{_source.companyName}}"}},{"remove": {"field": ["companyId","companyName"]}}]}DELETE ckiss-company-001PUT ckiss-company-001/_doc/1?pipeline=mypipline-002{"companyId":1,"companyName": "ckiss国际集团","city": "WH"}GET ckiss-company-001/_search
访问meta元数据
- 元数据上下文参数:index、type、id、routing
- 注意:_id如果是自动生成,则无法获取
- 访问的语法通过: {{_index}} ```json DELETE _ingest/pipeline/mypipline-002
PUT _ingest/pipeline/mypipline-002 { “description”: “meta元数据_id”, “processors”: [ { “set”: { “field”: “company.companyId”, “value”: “{{_id}}” } }, { “remove”: { “field”: [ “companyId”, “companyName” ] } } ] }
GET _ingest/pipeline/mypipline-002
DELETE ckiss-company-001
PUT ckiss-company-001/_doc/122?pipeline=mypipline-002 { “companyId”:1233, “companyName”: “ckiss国际集团”, “city”: “WH” }
GET ckiss-company-001/_search
**响应**```json{"_index" : "ckiss-company-001","_type" : "_doc","_id" : "122","_score" : 1.0,"_source" : {"city" : "WH","company" : {"companyId" : "122"}}}
访问ingest元数据
- ingest元数据参数: _ingest.timestamp
- 访问语法:{{_ingest.timestamp}} ```json DELETE _ingest/pipeline/mypipline-002
PUT _ingest/pipeline/mypipline-002 { “processors”: [ { “set”: { “field”: “@timestamp”, “value”: “{{_ingest.timestamp}}” } } ] }
<a name="mcYAI"></a>### 多管道```jsonPUT _ingest/pipeline/company-area-001{"description": "区域讯息","processors": [{"set": {"field": "area.pro","value": "HN"}},{"set": {"field": "area.city","value": "CS"}},{"set": {"field": "area.country","value": "CN"}}]}PUT _ingest/pipeline/company-info-001{"description": "公司基本讯息","processors": [{"set": {"field": "companyName","value": "国际集团"}}]}PUT _ingest/pipeline/company-remove-001{"description": "删除字段","processors": [{"set": {"field": "@timestamp","value": "{{_ingest.timestamp}}"}},{"remove": {"field": ["companyId"]}}]}PUT _ingest/pipeline/company_pipeline_001{"description": "多管道处理器","processors": [{"pipeline": {"name": "company-area-001"}},{"pipeline": {"name": "company-info-001"}},{"pipeline": {"name": "/company-remove-001"}}]}
script脚本处理
PUT _ingest/pipeline/company_script_001{"description": "脚本处理器","processors": [{"script": {"source": """ctx.companyNameId=ctx.companyName+"_"+ctx.companyId;""","lang": "painless","params": {}}}]}GET _ingest/pipeline/company_script_001DELETE ckiss-company-001PUT ckiss-company-001/_doc/12?pipeline=company_script_001{"companyId":2,"userId": 123,"userName": "测试数据","regDate": "2022-04-10","companyName": "ckiss国际集团"}GET ckiss-company-001/_search
if逻辑判断
- 几乎所有的处理器都支持逻辑表达式,if 里面编写的是脚本
根据公司ID判断公司基本信息
DELETE _ingest/pipeline/company_info_003PUT _ingest/pipeline/company_info_003{"description": "公司信息重新组织","processors": [{"set": {"if": "ctx._id=='1'","field": "companyId","value": "{{_id}}"}},{"set": {"if": "ctx.companyId=='5'","field": "companyName","value": "{{_source.userName}}"}}]}DELETE ckiss-company-001PUT ckiss-company-001/_doc/5?pipeline=company_info_003{"companyId": 5,"city": "wh","userId": 123,"userName": "测试数据","regDate": "2022-04-10","companyName": "ckiss国际集团"}GET ckiss-company-001/_search
索引重建pipeline
POST _reindex{"conflicts": "proceed","source": {"index": "source-index"},"dest": {"index": "desc-index","pipeline": "pipeline"}}
