简介

管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。

Pipeline的分析结果会输出到原结果中,根据输出位置的不同,分为以下两类

  • Parent结果内嵌到现有的聚合分析结果中
    • Derivative
    • Moving Average
    • Cumulative Sum
  • Sibling结果与现有聚合分析结果同级

    • Max/Min/Avg/Sum Bucket
    • Stats/Extended Stats Bucket
    • Percentiles bucket

      语法简介

  • 嵌套的bucket聚合:聚合出按月价格的直方图

  • Metic聚合:对上面的聚合再求平均值。

字段类型

  • buckets_path:指定聚合的名称,支持多级嵌套聚合。
  • gap_policy 当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
  • format 用于格式化聚合桶的输出(key)。
    1. POST _search
    2. {
    3. "size": 0,
    4. "aggs": {
    5. "sales_per_month": {
    6. "date_histogram": {
    7. "field": "date",
    8. "calendar_interval": "month"
    9. },
    10. "aggs": {
    11. "sales": {
    12. "sum": {
    13. "field": "price"
    14. }
    15. }
    16. }
    17. },
    18. "avg_monthly_sales": {
    19. // tag::avg-bucket-agg-syntax[]
    20. "avg_bucket": {
    21. "buckets_path": "sales_per_month>sales",
    22. "gap_policy": "skip",
    23. "format": "#,##0.00;(#,##0.00)"
    24. }
    25. // end::avg-bucket-agg-syntax[]
    26. }
    27. }
    28. }
    输出
    1. {
    2. "took": 11,
    3. "timed_out": false,
    4. "_shards": ...,
    5. "hits": ...,
    6. "aggregations": {
    7. "sales_per_month": {
    8. "buckets": [
    9. {
    10. "key_as_string": "2015/01/01 00:00:00",
    11. "key": 1420070400000,
    12. "doc_count": 3,
    13. "sales": {
    14. "value": 550.0
    15. }
    16. },
    17. {
    18. "key_as_string": "2015/02/01 00:00:00",
    19. "key": 1422748800000,
    20. "doc_count": 2,
    21. "sales": {
    22. "value": 60.0
    23. }
    24. },
    25. {
    26. "key_as_string": "2015/03/01 00:00:00",
    27. "key": 1425168000000,
    28. "doc_count": 2,
    29. "sales": {
    30. "value": 375.0
    31. }
    32. }
    33. ]
    34. },
    35. "avg_monthly_sales": {
    36. "value": 328.33333333333333,
    37. "value_as_string": "328.33"
    38. }
    39. }
    40. }

    Min Bucket Aggregation

    找出所有bucket中值最小的bucket名称和值 ```java

    平均工资最低的工作类型

    POST employees/_search { “size”: 0, “aggs”: { “jobs”: {
    "terms": {
      "field": "job.keyword",
      "size": 10
    },
    "aggs": {
      "avg_salary": {
        "avg": {
          "field": "salary"
        }
      }
    }
    
    }, “min_salary_by_job”:{
    "min_bucket": {
      "buckets_path": "jobs>avg_salary"
    }
    
    } } }

输出 { “took” : 63, “timed_out” : false, “_shards” : { “total” : 1, “successful” : 1, “skipped” : 0, “failed” : 0 }, “hits” : { “total” : { “value” : 19, “relation” : “eq” }, “max_score” : null, “hits” : [ ] }, “aggregations” : { “jobs” : { “doc_count_error_upper_bound” : 0, “sum_other_doc_count” : 0, “buckets” : [ { “key” : “Java Programmer”, “doc_count” : 7, “avg_salary” : { “value” : 25571.428571428572 } }, { “key” : “Javascript Programmer”, “doc_count” : 4, “avg_salary” : { “value” : 19250.0 } }, { “key” : “QA”, “doc_count” : 3, “avg_salary” : { “value” : 21000.0 } }, { “key” : “Web Designer”, “doc_count” : 2, “avg_salary” : { “value” : 20000.0 } }, { “key” : “DBA”, “doc_count” : 1, “avg_salary” : { “value” : 30000.0 } }, { “key” : “Dev Manager”, “doc_count” : 1, “avg_salary” : { “value” : 50000.0 } }, { “key” : “Product Manager”, “doc_count” : 1, “avg_salary” : { “value” : 35000.0 } } ] }, “min_salary_by_job” : { //上面最小的 “value” : 19250.0, “keys” : [ “Javascript Programmer” ] } } }

<a name="LKbc8"></a>
## Max Bucket Aggregation
```java
# 平均工资最高的工作类型
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field": "job.keyword",
        "size": 10
      },
      "aggs": {
        "avg_salary": {
          "avg": {
            "field": "salary"
          }
        }
      }
    },
    "max_salary_by_job":{
      "max_bucket": {
        "buckets_path": "jobs>avg_salary"
      }
    }
  }

Sum Bucket Aggregation

image.png

Stats Bucket Aggregation

对前面输出的所有结果做个stats分析
image.png

Avg Bucket Aggregation

同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。
示例如下:

{
    "avg_bucket": {
        "buckets_path": "the_sum"  // @1
    }
}

buckets_path:指定聚合的名称,支持多级嵌套聚合。
其他参数:

  • gap_policy

当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。

  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。

  • insert_zeros:默认使用0代替。

  • format

用于格式化聚合桶的输出(key)。

POST /_search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {                  // @1
           "date_histogram": {
            "field": "date",
            "interval": "month"
      },
      "aggs": {                                    // @2
        "sales": {
          "sum": {
            "field": "price"
          }
        }
      }
    },
    "avg_monthly_sales": {             // @3
      "avg_bucket": {
        "buckets_path": "sales_per_month>sales" 
      }
    }
  }
}

代码@1:首先定义第一级聚合(按月)直方图聚合。
代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。
代码@3:对上面的聚合求平均值。
其返回结果如下:

{
    ... // 省略
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               }
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               }
            }
         ]
      },
      "avg_monthly_sales": {   // 这是对二级聚合的结果再进行一次求平均值聚合。
          "value": 328.33333333333333
      }
   }
}

对应的JAVA示例如下:

public static void test_pipeline_avg_buncket_aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
                                                        .field("sellerId")
                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
                                                                            .field("num")
                                                        )
                                                  ;
            sourceBuilder.aggregation(aggregationBuild);

            // 添加 avg bucket pipeline
            sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
            sourceBuilder.size(0);

            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

Percentiles Bucket Aggregation

同级管道聚合,百分位管道聚合。其JAVA示例如下

public static void test_Percentiles_buncket_aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
                                                        .field("sellerId")
                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
                                                                            .field("num")
                                                        )
                                                  ;
            sourceBuilder.aggregation(aggregationBuild);

            // 添加 avg bucket pipeline
            sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
            sourceBuilder.size(0);

            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

其返回值如下:

{
  ...  // 省略其他属性
    "aggregations":{
        "lterms#seller_agg":{
            "doc_count_error_upper_bound":0,
            "sum_other_doc_count":12,
            "buckets":[
                {
                    "key":45,
                    "doc_count":567,
                    "sum#seller_num_agg":{
                        "value":911
                    }
                },
                {
                    "key":31,
                    "doc_count":324,
                    "sum#seller_num_agg":{
                        "value":353
                    }
                } // 省略其他桶的显示
            ]
        },
        "percentiles_bucket#seller_num_agg_av":{
            "values":{
                "1.0":5,
                "5.0":5,
                "25.0":10,
                "50.0":20,
                "75.0":290,
                "95.0":911,
                "99.0":911
            }
        }
    }
}

Cumulative Sum Aggregation

累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。
其语法(restfull)如下:

{
    "cumulative_sum": {
        "buckets_path": "the_sum"
    }
}

支持的参数说明:

  • buckets_path
    桶聚合名称,作为管道聚合的输入信息。
  • format
    格式化key。

使用示例如下:

POST /sales/_search
{
    "size": 0,
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "interval" : "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                },
                "cumulative_sales": {
                    "cumulative_sum": {
                        "buckets_path": "sales" 
                    }
                }
            }
        }
    }
}

其返回结果如下:

{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               },
               "cumulative_sales": {
                  "value": 550.0
               }
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               },
               "cumulative_sales": {
                  "value": 610.0
               }
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2,
               "sales": {
                  "value": 375.0
               },
               "cumulative_sales": {
                  "value": 985.0
               }
            }
         ]
      }
   }
}

从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。

Bucket Sort Aggregation

一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。
使用语法如下:

{
    "bucket_sort": {
        "sort": [
            {"sort_field_1": {"order": "asc"}},
            {"sort_field_2": {"order": "desc"}},
            "sort_field_3"
        ],
        "from": 1,
        "size": 3
    }
}

支持的参数说明如下:

  • sort

定义排序结构。

  • from

用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。

  • size

返回的桶数。默认为父聚合的所有桶。

  • gap_policy

当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。

  • skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
  • insert_zeros:默认使用0代替。

官方示例如下:

POST /sales/_search
{
    "size": 0,
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "interval" : "month"
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "price"
                    }
                },
                "sales_bucket_sort": {
                    "bucket_sort": {
                        "sort": [
                          {"total_sales": {"order": "desc"}}
                        ],
                        "size": 3
                    }
                }
            }
        }
    }
}

对应的JAVA示例如下:

public static void test_bucket_sort_Aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {

            //构建日期直方图聚合  时间间隔,示例中按月统计
            DateHistogramInterval interval = new DateHistogramInterval("1M"); 
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")
                                                                        .field("createTime")
                                                                        .dateHistogramInterval(interval)
                                                                        .keyed(true)
                                                                        .subAggregation(AggregationBuilders.sum("seller_num_agg")
                                                                                .field("num")
                                                                        )
                                                                        .subAggregation(new  BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList(
                                                                                new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC)))
                                                                                .from(0)
                                                                                .size(3))
                                                                        //  BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts)
                                                                        .subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg"))
                                                                    //    .format("yyyy-MM-dd") // 对key的格式化
                                                  ;
            sourceBuilder.aggregation(aggregationBuild);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

返回值:

{
    "aggregations":{
        "date_histogram#createTime_histogram":{
            "buckets":{
                "2016-04-01 00:00:00":{
                    "key_as_string":"2016-04-01 00:00:00",
                    "key":1459468800000,
                    "doc_count":2,
                    "sum#seller_num_agg":{
                        "value":2
                    },
                    "simple_value#Cumulative_Seller_num_agg":{
                        "value":2
                    }
                },
                "2017-05-01 00:00:00":{
                    "key_as_string":"2017-05-01 00:00:00",
                    "key":1493596800000,
                    "doc_count":3,
                    "sum#seller_num_agg":{
                        "value":3
                    },
                    "simple_value#Cumulative_Seller_num_agg":{
                        "value":5
                    }
                },
                "2017-02-01 00:00:00":{
                    "key_as_string":"2017-02-01 00:00:00",
                    "key":1485907200000,
                    "doc_count":4,
                    "sum#seller_num_agg":{
                        "value":4
                    },
                    "simple_value#Cumulative_Seller_num_agg":{
                        "value":9
                    }
                }
            }
        }
    }

其他

5、Max Bucket Aggregation
与 avg类似。

6、Min Bucket Aggregation
与 avg类似。

7、Sum Bucket Aggregation
与 avg类似。

8、Stats Bucket Aggregation
与 avg类似。