准备环境

spring-data-elasticsearch3.2.12.RELEASE

  1. # 订单索引,一个订单下有多个商品
  2. PUT order_test
  3. {
  4. "mappings": {
  5. "properties": {
  6. // 订单状态 0未付款 1未发货 2运输中 3待签收 4已签收
  7. "status": {
  8. "type": "integer"
  9. },
  10. // 订单编号
  11. "no": {
  12. "type": "keyword"
  13. },
  14. // 下单时间
  15. "create_time": {
  16. "type": "date",
  17. "format": "yyyy-MM-dd HH:mm:ss"
  18. },
  19. // 订单金额
  20. "amount": {
  21. "type": "double"
  22. },
  23. // 创建人
  24. "creator":{
  25. "type": "keyword"
  26. },
  27. // 商品信息
  28. "product":{
  29. "type": "nested",
  30. "properties": {
  31. // 商品ID
  32. "id": {
  33. "type": "keyword"
  34. },
  35. // 商品名称
  36. "name":{
  37. "type": "keyword"
  38. },
  39. // 商品价格
  40. "price": {
  41. "type": "double"
  42. },
  43. // 商品数量
  44. "quantity": {
  45. "type": "integer"
  46. }
  47. }
  48. }
  49. }
  50. }
  51. }

测试数据

POST order_test/_bulk
{"index":{}}
{"status":0,"no":"DD202205280001","create_time":"2022-05-01 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"1","name":"苹果","price":20.0,"quantity":5}]}
{"index":{}}
{"status":0,"no":"DD202205280002","create_time":"2022-05-01 12:00:00","amount":100.0,"creator":"李四","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":1,"no":"DD202205280003","create_time":"2022-05-02 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":2,"no":"DD202205280004","create_time":"2022-05-01 12:00:00","amount":150.0,"creator":"王二","product":[{"id":"1","name":"苹果","price":30.0,"quantity":5}]}
{"index":{}}
{"status":2,"no":"DD202205280005","create_time":"2022-05-03 12:00:00","amount":100.0,"creator":"55555","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":3,"no":"DD202205280006","create_time":"2022-05-04 12:00:00","amount":150.0,"creator":"李四","product":[{"id":"3","name":"榴莲","price":150.0,"quantity":1}]}
{"index":{}}
{"status":4,"no":"DD202205280007","create_time":"2022-05-04 12:00:00","amount":100.0,"creator":"张三","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}
{"index":{}}
{"status":3,"no":"DD202205280008","create_time":"2022-05-01 12:00:00","amount":200.0,"creator":"王二","product":[{"id":"1","name":"苹果","price":40.0,"quantity":5}]}
{"index":{}}
{"status":4,"no":"DD202205280009","create_time":"2022-05-03 12:00:00","amount":100.0,"creator":"55555","product":[{"id":"2","name":"香蕉","price":20.0,"quantity":5}]}

分桶聚合 Bucket aggregations

Terms aggregation

分桶聚合中最常用的就是terms聚合了,它可以按照指定字段将数据分组聚合,类似mysql中的group by

  • 案例

要求统计各种状态的单数

  • DSL ```java GET order_test/_search { “size”: 0, “aggs”: { “status_bucket”: {
    "terms": {
      "field": "status"
    }
    
    } } }

- java
```java
public void termsAgg(){
    String aggName = "status_bucket";
    NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder();
    queryBuilder.withPageable(PageRequest.of(0,1));
    TermsAggregationBuilder termsAgg = AggregationBuilders.terms(aggName).field("status");
    queryBuilder.addAggregation(termsAgg);
    Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
    Terms terms = aggregations.get(aggName);
    List<? extends Terms.Bucket> buckets = terms.getBuckets();
    HashMap<String,Long> statusRes = new HashMap<>();
    buckets.forEach(bucket -> {
        statusRes.put(bucket.getKeyAsString(),bucket.getDocCount());
    });
    System.out.println("---聚合结果---");
    System.out.println(statusRes);
}

Date histogram aggregation

日期分组聚合可以按照日期进行分组,常用到一些日期趋势统计中

  • 案例

统计每天的下单量

  • DSL ```java GET order_test/_search { “size”: 0, “aggs”: { “date”: {
    "date_histogram": {
      "field": "create_time",
      "calendar_interval": "day",
      "format": "yyyy-MM-dd"
    }
    
    } } }

- java
```java
public void dateHistogramAgg(){
    String aggName = "date";
    DateHistogramAggregationBuilder dateHistogramAggregation = AggregationBuilders.dateHistogram(aggName).field("create_time")
        .calendarInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd");
    NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder();
    queryBuilder.withPageable(PageRequest.of(0,1)).addAggregation(dateHistogramAggregation);
    Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
    ParsedDateHistogram terms = aggregations.get(aggName);
    List<? extends Histogram.Bucket> buckets = terms.getBuckets();
    HashMap<String,Long> resultMap = new HashMap<>();
    buckets.forEach(bucket -> {
        resultMap.put(bucket.getKeyAsString(),bucket.getDocCount());
    });
    System.out.println("---聚合结果---");
    System.out.println(resultMap);
}

拓展:

这里大家会发现使用的是ParsedDateHistogram来承接结果,与上述的Term不一致,那么我们怎么知道什么时候该用哪个呢?实际上可以通过断点来判断
我们通过把断点截取到restTemplate.query的执行结果aggregations之后,会发现该aggregations中的元素已经标明了其类型为ParsedDateHistogram,所以大家只需要跟着用就可以了。
image.png

Range aggregation

范围分组聚合可以帮助我们按照指定的数值范围进行分组

  • 案例

统计订单金额在0~100,100~200,200+ 这几个区间的订单数量

  • DSL ```java GET order_test/_search { “size”: 0, “aggs”: { “date_range”: {
    "range": {
      "field": "amount",
      "ranges": [
        {
          "to": "100"
        },
        {
          "from": "100",
          "to": "200"
        },
        {
          "from": "200"
        }
      ]
    }
    
    } } }

- java
```java
public void rangeAgg(){
    String aggName = "range";
    RangeAggregationBuilder agg = AggregationBuilders.range(aggName).field("amount").addUnboundedTo(100).addRange(100, 200).addUnboundedFrom(200);
    NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder();
    queryBuilder.withPageable(PageRequest.of(0,1)).addAggregation(agg);
    Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
    ParsedRange terms = aggregations.get(aggName);
    List<? extends Range.Bucket> buckets = terms.getBuckets();
    HashMap<String,Long> resultMap = new HashMap<>();
    buckets.forEach(bucket -> {
        resultMap.put(bucket.getKeyAsString(),bucket.getDocCount());
    });
    System.out.println("---聚合结果---");
    System.out.println(resultMap);
}

Nested aggregation

nested聚合专用于json型子对象进行聚合,比如上述案例中product是json型数组,如果当我们想通过商品中的属性来聚合统计时就需要用到nested聚合,直接使用product.name来聚合其结果则不会是我们预期的,这主要与es针对数组的存储形式有关。

  • 案例

统计每种货物的订单数

  • DSL

    GET order_test/_search
    {
    "size": 0,
    "aggs": {
      "product_nested": {
        "nested": {
          "path": "product"
        },
        "aggs": {
          "name_bucket": {
            "terms": {
              "field": "product.name"
            }
          }
        }
      }
    }
    }
    
  • java:这里我们涉及到要设置一个嵌套聚合,可以通过subAggregation方法来定义子聚合 ```java public void nestedAgg(){ String aggName = “product_nested”; String termsAggName = “name_bucket”; NestedAggregationBuilder aggregationBuilder = AggregationBuilders.nested(aggName, “product”).subAggregation(AggregationBuilders.terms(termsAggName).field(“product.name”)); NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()

      .withPageable(PageRequest.of(0,1))
      .addAggregation(aggregationBuilder);
    

    Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations); ParsedNested nestedRes = aggregations.get(aggName); Terms terms = nestedRes.getAggregations().get(termsAggName); List<? extends Terms.Bucket> buckets = terms.getBuckets(); HashMap resMap = new HashMap<>(); buckets.forEach(bucket -> {

      resMap.put(bucket.getKeyAsString(),bucket.getDocCount());
    

    }); System.out.println(“—-聚合结果—-“); System.out.println(resMap); }

<a name="jekKa"></a>
# 数值聚合 Metrics aggregations
<a name="JuIYX"></a>
## 3.1 Sum aggregations
求和聚合是常用的聚合之一,经常与分组聚合配合使用,用来统计出各组下的合计

- 案例

求5月1日销售总额

- DSL:这里我们添加了一个query语句,用来限定聚合范围是5.1日的订单
```java
GET order_test/_search
{
  "query": {
    "range": {
      "create_time": {
        "format": "yyyy-MM-dd",
        "from": "2022-05-01",
        "to": "2022-05-01"
      }
    }
  }, 
  "size": 0,
  "aggs": {
    "sum_amount": {
      "sum": {
        "field": "amount"
      }
    }
  }
}
  • java ```java public void sumAgg(){
      String aggName = "sumAmount";
      SumAggregationBuilder agg = AggregationBuilders.sum(aggName).field("amount");
      NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder()
              .withPageable(PageRequest.of(0,1))
              .withQuery(QueryBuilders.rangeQuery("create_time").format("yyyy-MM-dd").from("2022-05-01").to("2022-05-01"))
              .addAggregation(agg);
      Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
      ParsedSum metric = aggregations.get(aggName);
      double value = metric.getValue();
      System.out.println("---聚合结果---");
      System.out.println(value);
    
    }
<a name="sSn1c"></a>
## Script aggregation
脚本聚合支持我们通过脚本语言来自定义聚合的数值,es中脚本默认的语言为painless。需要注意的是脚本语法非常影响性能,我们一般是尽量避免使用。同时es中还提供了专门的[脚本数值聚合 script metric aggregation](https://www.elastic.co/guide/en/elasticsearch/reference/7.13/search-aggregations-metrics-scripted-metric-aggregation.html),但因为不太常用,所以我们这里以更加常用的聚合脚本来讲解

- 案例

求所有货物平均单价

- DSL:注意这里不能直接用product.price。因为product是数组,里面可能包含多种货物,所以应该用订单总金额除以所有订单的货物数量
```java
GET order_test/_search
{
  "size": 0,
  "aggs": {
    "total_amount":{
      "sum": {
        "field": "amount"
      }
    },
    "total_quantity":{
      "sum": {
        "script": {
          "source": """
            int total = 0;
            for(int i=0; i<params._source['product'].size(); i++){
              if(params._source['product'][i]['quantity'] != null){
                total += params._source['product'][i]['quantity'];
              }
            }
            return total;
          """
        }
      }
    }
  }
}

可以看到这里,原本sum聚合中是field属性的,改成了script脚本来动态计算属性值,从而实现聚合。同理,script脚本不仅可以使用到sum聚合中,也可以用到其他metric聚合中。
total_amount除以total_quantity即可得到平均价格

  • java ```java public void scriptAgg(){ String totalAmountAggName = “total_amount”; String totalQuantityAggName = “total_quantity”; SumAggregationBuilder amountAgg = AggregationBuilders.sum(totalAmountAggName).field(“amount”); SumAggregationBuilder quantityAgg = AggregationBuilders.sum(totalQuantityAggName).script(

      new Script("int total = 0;\n" +
                 "            for(int i=0; i<params._source['product'].size(); i++){\n" +
                 "              if(params._source['product'][i]['quantity'] != null){\n" +
                 "                total += params._source['product'][i]['quantity'];\n" +
                 "              }\n" +
                 "            }\n" +
                 "            return total;"));
    

    NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()

      .withPageable(PageRequest.of(0,1))
      .addAggregation(amountAgg).addAggregation(quantityAgg);
    

    Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations); ParsedSum amountRes = aggregations.get(totalAmountAggName); ParsedSum quantityRes = aggregations.get(totalQuantityAggName); double avgPrice = amountRes.getValue()/quantityRes.getValue(); System.out.println(“—-聚合结果—-“); System.out.println(avgPrice); }

<a name="KkmKi"></a>
# 管道聚合 Pipeline aggregations
首先要理解管道聚合的概念,与其他聚合不同,管道聚合是在其他聚合的结果下进行聚合操作的,所以管道聚合是配合其他聚合来工作的,而不是像其他聚合那样直接操作文档数据。
<a name="hXtSR"></a>
## Bucket script aggregation
分桶脚本聚合用于多分桶聚合的指标进行二次计算,我们通过案例来具体体会他的用法。

- 案例

求每个订单的货物平均单价

- DSL:我们上述求解过所有订单的货物平均单价,这里需要求解每个订单的货物平均单价,那么就需要先对数据按照订单进行分桶
```java
GET order_test/_search
{
  "size": 0,
  "aggs": {
    "order_bucket": {
      "terms": {
        "field": "no"
      },
      "aggs": {
        "total_amount": {
          "sum": {
            "field": "amount"
          }
        },
        "total_quantity": {
          "sum": {
            "script": {
              "source": """
            int total = 0;
            for(int i=0; i<params._source['product'].size(); i++){
              if(params._source['product'][i]['quantity'] != null){
                total += params._source['product'][i]['quantity'];
              }
            }
            return total;
            """
            }
          }
        },
        "avg_price": {
          "bucket_script": {
            "buckets_path": {
              "amount": "total_amount",
              "quantity": "total_quantity"
            },
            "script": "params.amount / params.quantity"
          }
        }
      }
    }
  }
}
  • java:需要注意的是管道聚合使用的聚合生成器就不再是AggregationBuilders类,而是PipelineAggregatorBuilders,其余的用法类似 ```java public void bucketScriptAgg(){

      String aggName = "order_bucket";
      String totalAmountAggName = "total_amount";
      String totalQuantityAggName = "total_quantity";
      String avgPriceAggName = "avg_price";
    
      HashMap<String,String> bucketsPath = new HashMap<>();
      bucketsPath.put("amount","total_amount");
      bucketsPath.put("quantity","total_quantity");
      TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(aggName).field("no")
              .subAggregation(AggregationBuilders.sum(totalAmountAggName).field("amount"))
              .subAggregation(AggregationBuilders.sum(totalQuantityAggName).script(
                      new Script("int total = 0;\n" +
                              "            for(int i=0; i<params._source['product'].size(); i++){\n" +
                              "              if(params._source['product'][i]['quantity'] != null){\n" +
                              "                total += params._source['product'][i]['quantity'];\n" +
                              "              }\n" +
                              "            }\n" +
                              "            return total;")
              ))
              .subAggregation(PipelineAggregatorBuilders.bucketScript(avgPriceAggName, bucketsPath,
                      new Script("params.amount / params.quantity")));
      NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder()
              .withPageable(PageRequest.of(0,1))
              .addAggregation(aggregationBuilder);
      Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
      HashMap<String,Double> resultMap = new HashMap<>();
      Terms terms = aggregations.get(aggName);
      List<? extends Terms.Bucket> buckets = terms.getBuckets();
      buckets.forEach(bucket -> {
          ParsedSimpleValue avgRes = bucket.getAggregations().get(avgPriceAggName);
          resultMap.put(bucket.getKeyAsString(),Double.parseDouble(avgRes.getValueAsString()));
      });
      System.out.println("---聚合结果---");
      System.out.println(resultMap);
    

    }

<a name="GIPZi"></a>
## Bucket sort aggregation
bucket sort可以针对聚合结果实现自定义排序、分页,在桶排序中很常用  [https://blog.csdn.net/qq_24950043/article/details/124535801](https://blog.csdn.net/qq_24950043/article/details/124535801)

- 案例

求订单货物平均单价TOP5的订单

- DSL:上述题目不仅要求我们订单的货物平均单价,还要根据平均单价排序,且分页取前5
```java
GET order_test/_search
{
  "size": 0,
  "aggs": {
    "order_bucket": {
      "terms": {
        "field": "no"
      },
      "aggs": {
        "total_amount": {
          "sum": {
            "field": "amount"
          }
        },
        "total_quantity": {
          "sum": {
            "script": {
              "source": """
            int total = 0;
            for(int i=0; i<params._source['product'].size(); i++){
              if(params._source['product'][i]['quantity'] != null){
                total += params._source['product'][i]['quantity'];
              }
            }
            return total;
            """
            }
          }
        },
        "avg_price": {
          "bucket_script": {
            "buckets_path": {
              "amount": "total_amount",
              "quantity": "total_quantity"
            },
            "script": "params.amount / params.quantity"
          }
        },
        "avg_price_sort": {
          "bucket_sort": {
            "sort": [
              {"avg_price":{"order":"desc"}}
            ],
            "from": 0,
            "size": 5
          }
        }
      }
    }
  }
}
  • java:同上使用的聚合生成器就不再是AggregationBuilders类,而是PipelineAggregatorBuilders,其余的用法类似 ```java public void bucketSortAgg(){

      String aggName = "order_bucket";
      String totalAmountAggName = "total_amount";
      String totalQuantityAggName = "total_quantity";
      String avgPriceAggName = "avg_price";
      String bucketSortAggName = "avg_price_sort";
      HashMap<String,String> bucketsPath = new HashMap<>();
      bucketsPath.put("amount","total_amount");
      bucketsPath.put("quantity","total_quantity");
      List<FieldSortBuilder> sortList = new ArrayList<>();
      FieldSortBuilder fieldSortBuilder = new FieldSortBuilder("avg_price").order(SortOrder.DESC);
      sortList.add(fieldSortBuilder);
      TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(aggName).field("no")
              .subAggregation(AggregationBuilders.sum(totalAmountAggName).field("amount"))
              .subAggregation(AggregationBuilders.sum(totalQuantityAggName).script(
                      new Script("int total = 0;\n" +
                              "            for(int i=0; i<params._source['product'].size(); i++){\n" +
                              "              if(params._source['product'][i]['quantity'] != null){\n" +
                              "                total += params._source['product'][i]['quantity'];\n" +
                              "              }\n" +
                              "            }\n" +
                              "            return total;")
              ))
              .subAggregation(PipelineAggregatorBuilders.bucketScript(avgPriceAggName, bucketsPath,
                      new Script("params.amount / params.quantity")))
              .subAggregation(PipelineAggregatorBuilders.bucketSort(bucketSortAggName,sortList).from(0).size(5));
      NativeSearchQueryBuilder queryBuilder =  new NativeSearchQueryBuilder()
              .withPageable(PageRequest.of(0,1))
              .addAggregation(aggregationBuilder);
      Aggregations aggregations = restTemplate.query(queryBuilder.build(), SearchResponse::getAggregations);
      // 因为要求按序输出,所以这里使用LinkedHashMap,HashMap不会按照顺序显示
      LinkedHashMap<String,Double> resultMap = new LinkedHashMap<>();
      Terms terms = aggregations.get(aggName);
      List<? extends Terms.Bucket> buckets = terms.getBuckets();
      buckets.forEach(bucket -> {
          ParsedSimpleValue avgRes = bucket.getAggregations().get(avgPriceAggName);
    
          resultMap.put(bucket.getKeyAsString(),Double.parseDouble(avgRes.getValueAsString()));
    
      });
      System.out.println("---聚合结果---");
      System.out.println(resultMap);
    

    }

```

版本对应

需要注意的是并不是所有的DSL都可以在spring-data-elasticsearch中实现,某些操作在kibana中可以执行,但是在spring-data-elasticsearch中就不能执行了,还要注意es版本与spring-data-elasticsearch的版本统一
https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#preface.requirements

Spring Data Release Train Spring Data Elasticsearch Elasticsearch Spring Framework Spring Boot
2022.0 (Raj) 4.4.x 7.17.3 5.3.x 2.7.x
2021.1 (Q) 4.3.x 7.15.2 5.3.x 2.6.x
2021.0 (Pascal) 4.2.x 7.12.0 5.3.x 2.5.x
2020.0 (Ockham)[1] 4.1.x[1] 7.9.3 5.3.2 2.4.x
Neumann[1] 4.0.x[1] 7.6.2 5.2.12 2.3.x
Moore[1] 3.2.x[1] 6.8.12 5.2.12 2.2.x
Lovelace[1] 3.1.x[1] 6.2.2 5.1.19 2.1.x
Kay[1] 3.0.x[1] 5.5.0 5.0.13 2.0.x
Ingalls[1] 2.1.x[1] 2.4.0 4.3.25 1.5.x