简介
管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。
Pipeline的分析结果会输出到原结果中,根据输出位置的不同,分为以下两类
- Parent结果内嵌到现有的聚合分析结果中
- Derivative
- Moving Average
- Cumulative Sum
Sibling结果与现有聚合分析结果同级
嵌套的bucket聚合:聚合出按月价格的直方图
- Metic聚合:对上面的聚合再求平均值。
字段类型:
- buckets_path:指定聚合的名称,支持多级嵌套聚合。
- gap_policy 当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
- skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
- format 用于格式化聚合桶的输出(key)。
输出POST _search
{
"size": 0,
"aggs": {
"sales_per_month": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"avg_monthly_sales": {
// tag::avg-bucket-agg-syntax[]
"avg_bucket": {
"buckets_path": "sales_per_month>sales",
"gap_policy": "skip",
"format": "#,##0.00;(#,##0.00)"
}
// end::avg-bucket-agg-syntax[]
}
}
}
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
}
}
]
},
"avg_monthly_sales": {
"value": 328.33333333333333,
"value_as_string": "328.33"
}
}
}
Min Bucket Aggregation
找出所有bucket中值最小的bucket名称和值 ```java平均工资最低的工作类型
POST employees/_search { “size”: 0, “aggs”: { “jobs”: {
}, “min_salary_by_job”:{"terms": { "field": "job.keyword", "size": 10 }, "aggs": { "avg_salary": { "avg": { "field": "salary" } } }
} } }"min_bucket": { "buckets_path": "jobs>avg_salary" }
输出 { “took” : 63, “timed_out” : false, “_shards” : { “total” : 1, “successful” : 1, “skipped” : 0, “failed” : 0 }, “hits” : { “total” : { “value” : 19, “relation” : “eq” }, “max_score” : null, “hits” : [ ] }, “aggregations” : { “jobs” : { “doc_count_error_upper_bound” : 0, “sum_other_doc_count” : 0, “buckets” : [ { “key” : “Java Programmer”, “doc_count” : 7, “avg_salary” : { “value” : 25571.428571428572 } }, { “key” : “Javascript Programmer”, “doc_count” : 4, “avg_salary” : { “value” : 19250.0 } }, { “key” : “QA”, “doc_count” : 3, “avg_salary” : { “value” : 21000.0 } }, { “key” : “Web Designer”, “doc_count” : 2, “avg_salary” : { “value” : 20000.0 } }, { “key” : “DBA”, “doc_count” : 1, “avg_salary” : { “value” : 30000.0 } }, { “key” : “Dev Manager”, “doc_count” : 1, “avg_salary” : { “value” : 50000.0 } }, { “key” : “Product Manager”, “doc_count” : 1, “avg_salary” : { “value” : 35000.0 } } ] }, “min_salary_by_job” : { //上面最小的 “value” : 19250.0, “keys” : [ “Javascript Programmer” ] } } }
<a name="LKbc8"></a>
## Max Bucket Aggregation
```java
# 平均工资最高的工作类型
POST employees/_search
{
"size": 0,
"aggs": {
"jobs": {
"terms": {
"field": "job.keyword",
"size": 10
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
},
"max_salary_by_job":{
"max_bucket": {
"buckets_path": "jobs>avg_salary"
}
}
}
Sum Bucket Aggregation
Stats Bucket Aggregation
Avg Bucket Aggregation
同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。
示例如下:
{
"avg_bucket": {
"buckets_path": "the_sum" // @1
}
}
buckets_path:指定聚合的名称,支持多级嵌套聚合。
其他参数:
- gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
insert_zeros:默认使用0代替。
format
用于格式化聚合桶的输出(key)。
POST /_search
{
"size": 0,
"aggs": {
"sales_per_month": { // @1
"date_histogram": {
"field": "date",
"interval": "month"
},
"aggs": { // @2
"sales": {
"sum": {
"field": "price"
}
}
}
},
"avg_monthly_sales": { // @3
"avg_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}
代码@1:首先定义第一级聚合(按月)直方图聚合。
代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。
代码@3:对上面的聚合求平均值。
其返回结果如下:
{
... // 省略
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
}
}
]
},
"avg_monthly_sales": { // 这是对二级聚合的结果再进行一次求平均值聚合。
"value": 328.33333333333333
}
}
}
对应的JAVA示例如下:
public static void test_pipeline_avg_buncket_aggregation() {
RestHighLevelClient client = EsClient.getClient();
try {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("aggregations_index02");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
.field("sellerId")
.subAggregation(AggregationBuilders.sum("seller_num_agg")
.field("num")
)
;
sourceBuilder.aggregation(aggregationBuild);
// 添加 avg bucket pipeline
sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
sourceBuilder.size(0);
searchRequest.source(sourceBuilder);
SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
Percentiles Bucket Aggregation
同级管道聚合,百分位管道聚合。其JAVA示例如下
public static void test_Percentiles_buncket_aggregation() {
RestHighLevelClient client = EsClient.getClient();
try {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("aggregations_index02");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg")
.field("sellerId")
.subAggregation(AggregationBuilders.sum("seller_num_agg")
.field("num")
)
;
sourceBuilder.aggregation(aggregationBuild);
// 添加 avg bucket pipeline
sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg"));
sourceBuilder.size(0);
searchRequest.source(sourceBuilder);
SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
其返回值如下:
{
... // 省略其他属性
"aggregations":{
"lterms#seller_agg":{
"doc_count_error_upper_bound":0,
"sum_other_doc_count":12,
"buckets":[
{
"key":45,
"doc_count":567,
"sum#seller_num_agg":{
"value":911
}
},
{
"key":31,
"doc_count":324,
"sum#seller_num_agg":{
"value":353
}
} // 省略其他桶的显示
]
},
"percentiles_bucket#seller_num_agg_av":{
"values":{
"1.0":5,
"5.0":5,
"25.0":10,
"50.0":20,
"75.0":290,
"95.0":911,
"99.0":911
}
}
}
}
Cumulative Sum Aggregation
累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。
其语法(restfull)如下:
{
"cumulative_sum": {
"buckets_path": "the_sum"
}
}
支持的参数说明:
- buckets_path
桶聚合名称,作为管道聚合的输入信息。 - format
格式化key。
使用示例如下:
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"cumulative_sales": {
"cumulative_sum": {
"buckets_path": "sales"
}
}
}
}
}
}
其返回结果如下:
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"cumulative_sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"cumulative_sales": {
"value": 610.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"cumulative_sales": {
"value": 985.0
}
}
]
}
}
}
从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。
Bucket Sort Aggregation
一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。
使用语法如下:
{
"bucket_sort": {
"sort": [
{"sort_field_1": {"order": "asc"}},
{"sort_field_2": {"order": "desc"}},
"sort_field_3"
],
"from": 1,
"size": 3
}
}
支持的参数说明如下:
- sort
定义排序结构。
- from
用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。
- size
返回的桶数。默认为父聚合的所有桶。
- gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。
- skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
- insert_zeros:默认使用0代替。
官方示例如下:
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
},
"sales_bucket_sort": {
"bucket_sort": {
"sort": [
{"total_sales": {"order": "desc"}}
],
"size": 3
}
}
}
}
}
}
对应的JAVA示例如下:
public static void test_bucket_sort_Aggregation() {
RestHighLevelClient client = EsClient.getClient();
try {
//构建日期直方图聚合 时间间隔,示例中按月统计
DateHistogramInterval interval = new DateHistogramInterval("1M");
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("aggregations_index02");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")
.field("createTime")
.dateHistogramInterval(interval)
.keyed(true)
.subAggregation(AggregationBuilders.sum("seller_num_agg")
.field("num")
)
.subAggregation(new BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList(
new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC)))
.from(0)
.size(3))
// BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts)
.subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg"))
// .format("yyyy-MM-dd") // 对key的格式化
;
sourceBuilder.aggregation(aggregationBuild);
sourceBuilder.size(0);
sourceBuilder.query(
QueryBuilders.termQuery("sellerId", 24)
);
searchRequest.source(sourceBuilder);
SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(result);
} catch (Throwable e) {
e.printStackTrace();
} finally {
EsClient.close(client);
}
}
返回值:
{
"aggregations":{
"date_histogram#createTime_histogram":{
"buckets":{
"2016-04-01 00:00:00":{
"key_as_string":"2016-04-01 00:00:00",
"key":1459468800000,
"doc_count":2,
"sum#seller_num_agg":{
"value":2
},
"simple_value#Cumulative_Seller_num_agg":{
"value":2
}
},
"2017-05-01 00:00:00":{
"key_as_string":"2017-05-01 00:00:00",
"key":1493596800000,
"doc_count":3,
"sum#seller_num_agg":{
"value":3
},
"simple_value#Cumulative_Seller_num_agg":{
"value":5
}
},
"2017-02-01 00:00:00":{
"key_as_string":"2017-02-01 00:00:00",
"key":1485907200000,
"doc_count":4,
"sum#seller_num_agg":{
"value":4
},
"simple_value#Cumulative_Seller_num_agg":{
"value":9
}
}
}
}
}
其他
5、Max Bucket Aggregation
与 avg类似。
6、Min Bucket Aggregation
与 avg类似。
7、Sum Bucket Aggregation
与 avg类似。
8、Stats Bucket Aggregation
与 avg类似。