参考:官方链接 参数说明: processors: 内置pipeline支持数据加工处理器 set:使用集合新增数据

创建第一个pipeline

案例:增加省市区

  1. GET _ingest/pipeline/mypipline-001
  2. PUT _ingest/pipeline/mypipline-001
  3. {
  4. "description": "增加省市区数据",
  5. "processors": [
  6. {
  7. "set": {
  8. "field": "pro",
  9. "value": "HN"
  10. }
  11. }
  12. ]
  13. }
  14. POST ckiss-company-003/_doc?pipeline=mypipline-001
  15. {
  16. "companyName": "ckiss Company"
  17. }
  18. GET ckiss-company-003/_mapping
  1. {
  2. "ckiss-company-003" : {
  3. "mappings" : {
  4. "properties" : {
  5. "companyName" : {
  6. "type" : "text",
  7. "fields" : {
  8. "keyword" : {
  9. "type" : "keyword",
  10. "ignore_above" : 256
  11. }
  12. }
  13. },
  14. "pro" : {
  15. "type" : "text",
  16. "fields" : {
  17. "keyword" : {
  18. "type" : "keyword",
  19. "ignore_above" : 256
  20. }
  21. }
  22. }
  23. }
  24. }
  25. }
  26. }

支持对象类型

  1. PUT _ingest/pipeline/mypipline-002
  2. {
  3. "description": "增加省市区数据",
  4. "processors": [
  5. {
  6. "set": {
  7. "field": "area.pro",
  8. "value": "HN"
  9. }
  10. },
  11. {
  12. "set": {
  13. "field": "area.city",
  14. "value": "CS"
  15. }
  16. },
  17. {
  18. "set": {
  19. "field": "area.country",
  20. "value": "CN"
  21. }
  22. }
  23. ]
  24. }
  25. POST ckiss-company-004/_doc?pipeline=mypipline-002
  26. {
  27. "companyName": "ckiss Company"
  28. }
  29. GET ckiss-company-004/_mapping
  30. GET ckiss-company-004/_search

响应

  1. "_source" : {
  2. "area" : {
  3. "country" : "CN",
  4. "city" : "CS",
  5. "pro" : "HN"
  6. },
  7. "companyName" : "ckiss Company"
  8. }

经过管道的话,管道有最终决定权

数据格式不规范

  1. POST ckiss-company-003/_doc?pipeline=mypipline-002
  2. {
  3. "companyName": "ckiss国际集团",
  4. "area.pro": "JS",
  5. "area.city": "SZ"
  6. }

响应
ingest错误数据.png
正确格式

  1. POST ckiss-company-003/_doc?pipeline=mypipline-002
  2. {
  3. "companyName": "ckiss国际集团",
  4. "area": {
  5. "pro": "JS",
  6. "city": "SZ"
  7. }
  8. }

_simulate模拟测试

  1. POST _ingest/pipeline/mypipline-002/_simulate
  2. {
  3. "docs": [
  4. {
  5. "_source": {
  6. "area": {
  7. "pro": "SH"
  8. }
  9. }
  10. }
  11. ]
  12. }
  13. POST _ingest/pipeline/mypipline-002/_simulate
  14. {
  15. "docs": [
  16. {
  17. "_source": {}
  18. }
  19. ]
  20. }
  21. # 以上结果一样

访问_source源数据

  1. DELETE _ingest/pipeline/mypipline-002
  2. PUT _ingest/pipeline/mypipline-002
  3. {
  4. "description": "source源数据上下文",
  5. "processors": [
  6. {
  7. "set": {
  8. "field": "company.companyId",
  9. "value": "{{_source.companyId}}"
  10. }
  11. },
  12. {
  13. "set": {
  14. "field": "company.companyName",
  15. "value": "{{_source.companyName}}"
  16. }
  17. },
  18. {
  19. "remove": {
  20. "field": [
  21. "companyId",
  22. "companyName"
  23. ]
  24. }
  25. }
  26. ]
  27. }
  28. DELETE ckiss-company-001
  29. PUT ckiss-company-001/_doc/1?pipeline=mypipline-002
  30. {
  31. "companyId":1,
  32. "companyName": "ckiss国际集团",
  33. "city": "WH"
  34. }
  35. GET ckiss-company-001/_search

访问meta元数据

  • 元数据上下文参数:index、type、id、routing
  • 注意:_id如果是自动生成,则无法获取
  • 访问的语法通过: {{_index}} ```json DELETE _ingest/pipeline/mypipline-002

PUT _ingest/pipeline/mypipline-002 { “description”: “meta元数据_id”, “processors”: [ { “set”: { “field”: “company.companyId”, “value”: “{{_id}}” } }, { “remove”: { “field”: [ “companyId”, “companyName” ] } } ] }

GET _ingest/pipeline/mypipline-002

DELETE ckiss-company-001

PUT ckiss-company-001/_doc/122?pipeline=mypipline-002 { “companyId”:1233, “companyName”: “ckiss国际集团”, “city”: “WH” }

GET ckiss-company-001/_search

  1. **响应**
  2. ```json
  3. {
  4. "_index" : "ckiss-company-001",
  5. "_type" : "_doc",
  6. "_id" : "122",
  7. "_score" : 1.0,
  8. "_source" : {
  9. "city" : "WH",
  10. "company" : {
  11. "companyId" : "122"
  12. }
  13. }
  14. }

访问ingest元数据

  • ingest元数据参数: _ingest.timestamp
  • 访问语法:{{_ingest.timestamp}} ```json DELETE _ingest/pipeline/mypipline-002

PUT _ingest/pipeline/mypipline-002 { “processors”: [ { “set”: { “field”: “@timestamp”, “value”: “{{_ingest.timestamp}}” } } ] }

  1. <a name="mcYAI"></a>
  2. ### 多管道
  3. ```json
  4. PUT _ingest/pipeline/company-area-001
  5. {
  6. "description": "区域讯息",
  7. "processors": [
  8. {
  9. "set": {
  10. "field": "area.pro",
  11. "value": "HN"
  12. }
  13. },
  14. {
  15. "set": {
  16. "field": "area.city",
  17. "value": "CS"
  18. }
  19. },
  20. {
  21. "set": {
  22. "field": "area.country",
  23. "value": "CN"
  24. }
  25. }
  26. ]
  27. }
  28. PUT _ingest/pipeline/company-info-001
  29. {
  30. "description": "公司基本讯息",
  31. "processors": [
  32. {
  33. "set": {
  34. "field": "companyName",
  35. "value": "国际集团"
  36. }
  37. }
  38. ]
  39. }
  40. PUT _ingest/pipeline/company-remove-001
  41. {
  42. "description": "删除字段",
  43. "processors": [
  44. {
  45. "set": {
  46. "field": "@timestamp",
  47. "value": "{{_ingest.timestamp}}"
  48. }
  49. },
  50. {
  51. "remove": {
  52. "field": [
  53. "companyId"
  54. ]
  55. }
  56. }
  57. ]
  58. }
  59. PUT _ingest/pipeline/company_pipeline_001
  60. {
  61. "description": "多管道处理器",
  62. "processors": [
  63. {
  64. "pipeline": {
  65. "name": "company-area-001"
  66. }
  67. },
  68. {
  69. "pipeline": {
  70. "name": "company-info-001"
  71. }
  72. },
  73. {
  74. "pipeline": {
  75. "name": "/company-remove-001"
  76. }
  77. }
  78. ]
  79. }

script脚本处理

  1. PUT _ingest/pipeline/company_script_001
  2. {
  3. "description": "脚本处理器",
  4. "processors": [
  5. {
  6. "script": {
  7. "source": """
  8. ctx.companyNameId=ctx.companyName+"_"+ctx.companyId;
  9. """,
  10. "lang": "painless",
  11. "params": {}
  12. }
  13. }
  14. ]
  15. }
  16. GET _ingest/pipeline/company_script_001
  17. DELETE ckiss-company-001
  18. PUT ckiss-company-001/_doc/12?pipeline=company_script_001
  19. {
  20. "companyId":2,
  21. "userId": 123,
  22. "userName": "测试数据",
  23. "regDate": "2022-04-10",
  24. "companyName": "ckiss国际集团"
  25. }
  26. GET ckiss-company-001/_search

if逻辑判断

  • 几乎所有的处理器都支持逻辑表达式,if 里面编写的是脚本

    根据公司ID判断公司基本信息

  1. DELETE _ingest/pipeline/company_info_003
  2. PUT _ingest/pipeline/company_info_003
  3. {
  4. "description": "公司信息重新组织",
  5. "processors": [
  6. {
  7. "set": {
  8. "if": "ctx._id=='1'",
  9. "field": "companyId",
  10. "value": "{{_id}}"
  11. }
  12. },
  13. {
  14. "set": {
  15. "if": "ctx.companyId=='5'",
  16. "field": "companyName",
  17. "value": "{{_source.userName}}"
  18. }
  19. }
  20. ]
  21. }
  22. DELETE ckiss-company-001
  23. PUT ckiss-company-001/_doc/5?pipeline=company_info_003
  24. {
  25. "companyId": 5,
  26. "city": "wh",
  27. "userId": 123,
  28. "userName": "测试数据",
  29. "regDate": "2022-04-10",
  30. "companyName": "ckiss国际集团"
  31. }
  32. GET ckiss-company-001/_search

索引重建pipeline

  1. POST _reindex
  2. {
  3. "conflicts": "proceed",
  4. "source": {
  5. "index": "source-index"
  6. },
  7. "dest": {
  8. "index": "desc-index",
  9. "pipeline": "pipeline"
  10. }
  11. }