参考:官方链接 参数说明: processors: 内置pipeline支持数据加工处理器 set:使用集合新增数据
创建第一个pipeline
案例:增加省市区
GET _ingest/pipeline/mypipline-001
PUT _ingest/pipeline/mypipline-001
{
"description": "增加省市区数据",
"processors": [
{
"set": {
"field": "pro",
"value": "HN"
}
}
]
}
POST ckiss-company-003/_doc?pipeline=mypipline-001
{
"companyName": "ckiss Company"
}
GET ckiss-company-003/_mapping
{
"ckiss-company-003" : {
"mappings" : {
"properties" : {
"companyName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"pro" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
支持对象类型
PUT _ingest/pipeline/mypipline-002
{
"description": "增加省市区数据",
"processors": [
{
"set": {
"field": "area.pro",
"value": "HN"
}
},
{
"set": {
"field": "area.city",
"value": "CS"
}
},
{
"set": {
"field": "area.country",
"value": "CN"
}
}
]
}
POST ckiss-company-004/_doc?pipeline=mypipline-002
{
"companyName": "ckiss Company"
}
GET ckiss-company-004/_mapping
GET ckiss-company-004/_search
响应
"_source" : {
"area" : {
"country" : "CN",
"city" : "CS",
"pro" : "HN"
},
"companyName" : "ckiss Company"
}
经过管道的话,管道有最终决定权
数据格式不规范
POST ckiss-company-003/_doc?pipeline=mypipline-002
{
"companyName": "ckiss国际集团",
"area.pro": "JS",
"area.city": "SZ"
}
响应
正确格式
POST ckiss-company-003/_doc?pipeline=mypipline-002
{
"companyName": "ckiss国际集团",
"area": {
"pro": "JS",
"city": "SZ"
}
}
_simulate模拟测试
POST _ingest/pipeline/mypipline-002/_simulate
{
"docs": [
{
"_source": {
"area": {
"pro": "SH"
}
}
}
]
}
POST _ingest/pipeline/mypipline-002/_simulate
{
"docs": [
{
"_source": {}
}
]
}
# 以上结果一样
访问_source源数据
DELETE _ingest/pipeline/mypipline-002
PUT _ingest/pipeline/mypipline-002
{
"description": "source源数据上下文",
"processors": [
{
"set": {
"field": "company.companyId",
"value": "{{_source.companyId}}"
}
},
{
"set": {
"field": "company.companyName",
"value": "{{_source.companyName}}"
}
},
{
"remove": {
"field": [
"companyId",
"companyName"
]
}
}
]
}
DELETE ckiss-company-001
PUT ckiss-company-001/_doc/1?pipeline=mypipline-002
{
"companyId":1,
"companyName": "ckiss国际集团",
"city": "WH"
}
GET ckiss-company-001/_search
访问meta元数据
- 元数据上下文参数:index、type、id、routing
- 注意:_id如果是自动生成,则无法获取
- 访问的语法通过: {{_index}} ```json DELETE _ingest/pipeline/mypipline-002
PUT _ingest/pipeline/mypipline-002 { “description”: “meta元数据_id”, “processors”: [ { “set”: { “field”: “company.companyId”, “value”: “{{_id}}” } }, { “remove”: { “field”: [ “companyId”, “companyName” ] } } ] }
GET _ingest/pipeline/mypipline-002
DELETE ckiss-company-001
PUT ckiss-company-001/_doc/122?pipeline=mypipline-002 { “companyId”:1233, “companyName”: “ckiss国际集团”, “city”: “WH” }
GET ckiss-company-001/_search
**响应**
```json
{
"_index" : "ckiss-company-001",
"_type" : "_doc",
"_id" : "122",
"_score" : 1.0,
"_source" : {
"city" : "WH",
"company" : {
"companyId" : "122"
}
}
}
访问ingest元数据
- ingest元数据参数: _ingest.timestamp
- 访问语法:{{_ingest.timestamp}} ```json DELETE _ingest/pipeline/mypipline-002
PUT _ingest/pipeline/mypipline-002 { “processors”: [ { “set”: { “field”: “@timestamp”, “value”: “{{_ingest.timestamp}}” } } ] }
<a name="mcYAI"></a>
### 多管道
```json
PUT _ingest/pipeline/company-area-001
{
"description": "区域讯息",
"processors": [
{
"set": {
"field": "area.pro",
"value": "HN"
}
},
{
"set": {
"field": "area.city",
"value": "CS"
}
},
{
"set": {
"field": "area.country",
"value": "CN"
}
}
]
}
PUT _ingest/pipeline/company-info-001
{
"description": "公司基本讯息",
"processors": [
{
"set": {
"field": "companyName",
"value": "国际集团"
}
}
]
}
PUT _ingest/pipeline/company-remove-001
{
"description": "删除字段",
"processors": [
{
"set": {
"field": "@timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"remove": {
"field": [
"companyId"
]
}
}
]
}
PUT _ingest/pipeline/company_pipeline_001
{
"description": "多管道处理器",
"processors": [
{
"pipeline": {
"name": "company-area-001"
}
},
{
"pipeline": {
"name": "company-info-001"
}
},
{
"pipeline": {
"name": "/company-remove-001"
}
}
]
}
script脚本处理
PUT _ingest/pipeline/company_script_001
{
"description": "脚本处理器",
"processors": [
{
"script": {
"source": """
ctx.companyNameId=ctx.companyName+"_"+ctx.companyId;
""",
"lang": "painless",
"params": {}
}
}
]
}
GET _ingest/pipeline/company_script_001
DELETE ckiss-company-001
PUT ckiss-company-001/_doc/12?pipeline=company_script_001
{
"companyId":2,
"userId": 123,
"userName": "测试数据",
"regDate": "2022-04-10",
"companyName": "ckiss国际集团"
}
GET ckiss-company-001/_search
if逻辑判断
- 几乎所有的处理器都支持逻辑表达式,if 里面编写的是脚本
根据公司ID判断公司基本信息
DELETE _ingest/pipeline/company_info_003
PUT _ingest/pipeline/company_info_003
{
"description": "公司信息重新组织",
"processors": [
{
"set": {
"if": "ctx._id=='1'",
"field": "companyId",
"value": "{{_id}}"
}
},
{
"set": {
"if": "ctx.companyId=='5'",
"field": "companyName",
"value": "{{_source.userName}}"
}
}
]
}
DELETE ckiss-company-001
PUT ckiss-company-001/_doc/5?pipeline=company_info_003
{
"companyId": 5,
"city": "wh",
"userId": 123,
"userName": "测试数据",
"regDate": "2022-04-10",
"companyName": "ckiss国际集团"
}
GET ckiss-company-001/_search
索引重建pipeline
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "source-index"
},
"dest": {
"index": "desc-index",
"pipeline": "pipeline"
}
}