1.项目地址

https://github.com/GuardFTC/elasticsearch-test.git

2.创建实体类

  1. import lombok.Data;
  2. import java.util.Date;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-08-24 19:42:52
  6. * @describe: 汽车订单
  7. */
  8. @Data
  9. public class CarOrder {
  10. /**
  11. * 订单汽车售价
  12. */
  13. private double price;
  14. /**
  15. * 订单汽车颜色
  16. */
  17. private String color;
  18. /**
  19. * 订单汽车厂商
  20. */
  21. private String make;
  22. /**
  23. * 订单售卖时间
  24. */
  25. private Date soldDate;
  26. }
  1. import lombok.Data;
  2. import java.util.Date;
  3. /**
  4. * @author: 冯铁城 [17615007230@163.com]
  5. * @date: 2022-08-28 18:05:27
  6. * @describe: 网站请求数据
  7. */
  8. @Data
  9. public class WebsiteRequest {
  10. /**
  11. * 延迟 单位ms
  12. */
  13. private long latency;
  14. /**
  15. * 所在地区
  16. */
  17. private String zone;
  18. /**
  19. * 时间戳
  20. */
  21. private Date timestamp;
  22. }

3.聚合API命令整合

terms

在获取聚合结果时,提供了多种terms聚合,包括sterms,dterms等很多

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void terms() {
  4. //1.根据颜色进行聚合
  5. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .aggregations("colorCars", a -> a.terms(t -> t
  8. .field("color"))
  9. ), CarOrder.class
  10. );
  11. //2.校验
  12. Aggregate colorCars = search.aggregations().get("colorCars");
  13. Buckets<StringTermsBucket> buckets = colorCars.sterms().buckets();
  14. Assert.isTrue(4 == buckets.array().size());
  15. buckets.array().forEach(b -> {
  16. Console.log(b.key());
  17. Console.log(b.docCount());
  18. });
  19. }
  20. @Test
  21. @SneakyThrows(IOException.class)
  22. void nested() {
  23. //1.同级嵌套,统计每种颜色,每种厂商的汽车销量
  24. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  25. .index(INDEX_NAME)
  26. .aggregations("colorOrders", a -> a
  27. .terms(t -> t.field("color")
  28. ))
  29. .aggregations("makeOrders", a -> a
  30. .terms(t -> t.field("make"))
  31. ), CarOrder.class
  32. );
  33. //2.校验
  34. Aggregate colorOrders = search.aggregations().get("colorOrders");
  35. List<StringTermsBucket> colorBuckets = colorOrders.sterms().buckets().array();
  36. Assert.isTrue(4 == colorBuckets.size());
  37. Aggregate makeOrders = search.aggregations().get("makeOrders");
  38. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  39. Assert.isTrue(4 == makeBuckets.size());
  40. //3.递归嵌套,统计取不同颜色汽车的销量,同时统计每种颜色汽车中每个制造商的销量以及每个制造商的总销售额
  41. search = primaryClient.search(s -> s
  42. .index(INDEX_NAME)
  43. .aggregations("colorOrders", a -> a
  44. .terms(t -> t.field("color"))
  45. .aggregations("makeOrders", colorA -> colorA.terms(
  46. t -> t.field("make"))
  47. .aggregations("makeTotalPrice", makeA -> makeA.sum(
  48. sum -> sum.field("price")
  49. ))
  50. )
  51. ), CarOrder.class
  52. );
  53. //4.校验
  54. colorOrders = search.aggregations().get("colorOrders");
  55. colorBuckets = colorOrders.sterms().buckets().array();
  56. Assert.isTrue(4 == colorBuckets.size());
  57. Console.log("按照颜色聚合-----");
  58. for (StringTermsBucket colorBucket : colorBuckets) {
  59. Console.log("颜色:" + colorBucket.key() + " 数量:" + colorBucket.docCount());
  60. Console.log("按照厂商聚合-----");
  61. makeOrders = colorBucket.aggregations().get("makeOrders");
  62. makeBuckets = makeOrders.sterms().buckets().array();
  63. for (StringTermsBucket makeBucket : makeBuckets) {
  64. Console.log("厂商:" + makeBucket.key() + " 数量:" + makeBucket.docCount());
  65. Aggregate makeTotalPrice = makeBucket.aggregations().get("makeTotalPrice");
  66. SumAggregate sum = makeTotalPrice.sum();
  67. Console.log("厂商销售总额:" + sum.value());
  68. }
  69. }
  70. }

histogram

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void histogram() {
  4. //1.按照销售额步长为20000统计不同颜色汽车的销量
  5. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .aggregations("priceCarOrders", a -> a
  8. .histogram(h -> h
  9. .field("price")
  10. .interval((double) 20000)
  11. )
  12. .aggregations("colorOrders", ha -> ha
  13. .terms(t -> t.field("color"))
  14. )
  15. ), CarOrder.class
  16. );
  17. //2.校验
  18. Aggregate priceCarOrders = search.aggregations().get("priceCarOrders");
  19. List<HistogramBucket> priceCarOrdersBuckets = priceCarOrders.histogram().buckets().array();
  20. Assert.isTrue(7 == priceCarOrdersBuckets.size());
  21. Console.log("按照20000销售额价格聚合-----");
  22. for (HistogramBucket priceCarOrdersBucket : priceCarOrdersBuckets) {
  23. Console.log("[" + priceCarOrdersBucket.key() + "-" + (priceCarOrdersBucket.key() + (double) 20000)
  24. + "]区间内销售量:" + priceCarOrdersBucket.docCount()
  25. );
  26. Aggregate colorOrders = priceCarOrdersBucket.aggregations().get("colorOrders");
  27. List<StringTermsBucket> colorBuckets = colorOrders.sterms().buckets().array();
  28. for (StringTermsBucket colorBucket : colorBuckets) {
  29. Console.log("颜色:" + colorBucket.key() + " 数量:" + colorBucket.docCount());
  30. }
  31. }
  32. }

dateHistogram

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void DateHistogram() {
  4. //1.每个季度,不同汽车品牌的销售总额
  5. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .aggregations("quarterCarOrders", a -> a
  8. .dateHistogram(d -> d
  9. .field("soldDate")
  10. .calendarInterval(CalendarInterval.Quarter)
  11. .format(DatePattern.NORM_DATE_PATTERN)
  12. )
  13. .aggregations("makeOrders", da -> da
  14. .terms(t -> t.field("make"))
  15. .aggregations("totalPrice", ta -> ta
  16. .sum(sum -> sum.field("price"))
  17. )
  18. )
  19. ), CarOrder.class
  20. );
  21. //2.校验
  22. Aggregate quarterCarOrders = search.aggregations().get("quarterCarOrders");
  23. List<DateHistogramBucket> dateHistogramBuckets = quarterCarOrders.dateHistogram().buckets().array();
  24. Assert.isTrue(5 == dateHistogramBuckets.size());
  25. Console.log("按照季度聚合-----");
  26. for (DateHistogramBucket dateHistogramBucket : dateHistogramBuckets) {
  27. Console.log(dateHistogramBucket.keyAsString() + "季度售卖量" + dateHistogramBucket.docCount());
  28. Aggregate makeOrders = dateHistogramBucket.aggregations().get("makeOrders");
  29. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  30. for (StringTermsBucket makeBucket : makeBuckets) {
  31. Console.log("厂商" + makeBucket.key() + "售卖" + makeBucket.docCount() + "台");
  32. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  33. Console.log("共盈利" + totalPrice.sum().value());
  34. }
  35. }
  36. }

查询结果和聚合结果都过滤

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void searchAndAggregationsFilter() {
  4. //1.获取福特汽车均价以及所有汽车均价,看看福特汽车的价格是否合理
  5. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .query(q -> q.constantScore(c -> c.filter(f -> f.term(t -> t
  8. .field("make")
  9. .value(FieldValue.of("ford"))
  10. ))))
  11. .aggregations("fortAvePrice", a -> a
  12. .avg(avg -> avg.field("price"))
  13. )
  14. .aggregations("totalMake", a -> a
  15. .global(g -> g)
  16. .aggregations("totalAvgPrice", ta -> ta.
  17. avg(avg -> avg.field("price"))
  18. )
  19. ), CarOrder.class
  20. );
  21. //2.校验
  22. Aggregate fortAvePrice = search.aggregations().get("fortAvePrice");
  23. Aggregate totalMake = search.aggregations().get("totalMake");
  24. Assert.isTrue(4 == search.hits().hits().size());
  25. Console.log("福特汽车均价:" + fortAvePrice.avg().value());
  26. Console.log("所有汽车均价:" + totalMake.global().aggregations().get("totalAvgPrice").avg().value());
  27. }

查询结果不过滤,聚合结果过滤

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void searchNotFilterButAggregationsFilter() {
  4. //1.查询所有福特汽车订单,统计福特汽车中黄色汽车的销售总额
  5. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .query(q -> q.constantScore(c -> c.filter(f -> f.term(t -> t
  8. .field("make")
  9. .value(FieldValue.of("ford"))
  10. ))))
  11. .aggregations("yellowColorCarOrders", a -> a
  12. .filter(f -> f.constantScore(c -> c.filter(cf -> cf.term(t -> t
  13. .field("color")
  14. .value(FieldValue.of("yellow"))
  15. ))))
  16. .aggregations("totalPrice", ta -> ta.sum(sum -> sum
  17. .field("price")
  18. ))
  19. ), CarOrder.class
  20. );
  21. //2.校验
  22. Assert.isTrue(4 == search.hits().hits().size());
  23. Aggregate yellowColorCarOrders = search.aggregations().get("yellowColorCarOrders");
  24. Assert.isTrue(2 == yellowColorCarOrders.filter().docCount());
  25. Aggregate totalPrice = yellowColorCarOrders.filter().aggregations().get("totalPrice");
  26. Console.log("福特黄色汽车的销售总额:" + totalPrice.sum().value());
  27. }

查询结果过滤,局和结果不过滤

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void searchFilterButAggregationsNotFilter() {
  4. //1.查询展示蓝色福特汽车,同时统计不同颜色汽车的销售均价
  5. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .aggregations("colorCarOrders", a -> a.terms(t -> t
  8. .field("color"))
  9. .aggregations("avgPrice", fa -> fa.avg(avg -> avg
  10. .field("price")
  11. ))
  12. )
  13. .postFilter(pf -> pf.bool(b -> b
  14. .must(m -> m.term(t -> t
  15. .field("color")
  16. .value(FieldValue.of("blue"))
  17. ))
  18. .must(m -> m.term(t -> t
  19. .field("make")
  20. .value(FieldValue.of("ford"))
  21. )))
  22. ), CarOrder.class
  23. );
  24. //2.校验
  25. Assert.isTrue(1 == search.hits().hits().size());
  26. Aggregate colorCarOrders = search.aggregations().get("colorCarOrders");
  27. List<StringTermsBucket> colorBuckets = colorCarOrders.sterms().buckets().array();
  28. Assert.isTrue(4 == colorBuckets.size());
  29. for (StringTermsBucket colorBucket : colorBuckets) {
  30. Aggregate avePrice = colorBucket.aggregations().get("avgPrice");
  31. Console.log(colorBucket.key() + "汽车销售均价" + avePrice.avg().value());
  32. }
  33. }

sort

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void sort() {
  4. //1.同级排序-统计所有厂商的销售总额,并明确了解谁的销售总额最高
  5. HashMap<String, SortOrder> sortMaps = MapUtil.newHashMap(1);
  6. sortMaps.put("totalPrice", SortOrder.Desc);
  7. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  8. .index(INDEX_NAME)
  9. .aggregations("makeCarOrders", a -> a.terms(t -> t
  10. .field("make")
  11. .order(sortMaps)
  12. )
  13. .aggregations("totalPrice", fa -> fa.sum(sum -> sum
  14. .field("price")
  15. ))
  16. ), CarOrder.class
  17. );
  18. //2.校验
  19. Aggregate makeCarOrders = search.aggregations().get("makeCarOrders");
  20. List<StringTermsBucket> makeBuckets = makeCarOrders.sterms().buckets().array();
  21. for (StringTermsBucket makeBucket : makeBuckets) {
  22. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  23. Console.log(makeBucket.key() + "汽车销售总额" + totalPrice.sum().value());
  24. }
  25. }
  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void sortDeep() {
  4. //1.统计每个季度,每个厂商黄色汽车销售总额,并且直观看到黄色汽车销售总额高的厂商
  5. HashMap<String, SortOrder> sortMaps = MapUtil.newHashMap(1);
  6. sortMaps.put("totalPrice>yellowTotalPrice", SortOrder.Desc);
  7. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  8. .index(INDEX_NAME)
  9. .aggregations("quarterOrders", a -> a
  10. .dateHistogram(d -> d
  11. .field("soldDate")
  12. .calendarInterval(CalendarInterval.Quarter)
  13. .format(DatePattern.NORM_DATE_PATTERN)
  14. )
  15. .aggregations("makeOrders", da -> da
  16. .terms(t -> t
  17. .field("make")
  18. .order(sortMaps)
  19. )
  20. .aggregations("totalPrice", daa -> daa
  21. .filter(f -> f.term(t -> t
  22. .field("color")
  23. .value(FieldValue.of("yellow"))
  24. ))
  25. .aggregations("yellowTotalPrice", daaa -> daaa.sum(sum -> sum
  26. .field("price")
  27. ))
  28. )
  29. )
  30. ), CarOrder.class
  31. );
  32. //2.校验
  33. Aggregate quarterOrders = search.aggregations().get("quarterOrders");
  34. List<DateHistogramBucket> histogramBuckets = quarterOrders.dateHistogram().buckets().array();
  35. Assert.isTrue(5 == histogramBuckets.size());
  36. for (DateHistogramBucket histogramBucket : histogramBuckets) {
  37. Console.log(histogramBucket.keyAsString() + "-季度共卖出" + histogramBucket.docCount() + "台汽车");
  38. Console.log("其中黄色汽车订单销售额如下:");
  39. Aggregate makeOrders = histogramBucket.aggregations().get("makeOrders");
  40. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  41. for (StringTermsBucket makeBucket : makeBuckets) {
  42. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  43. Aggregate yellowTotalPrice = totalPrice.filter().aggregations().get("yellowTotalPrice");
  44. Console.log(makeBucket.key() + "共卖出黄色汽车总价:" + yellowTotalPrice.sum().value());
  45. }
  46. }
  47. }

cardinality

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void cardinality() {
  4. //1.统计共有多少个厂商的汽车订单
  5. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  6. .index(INDEX_NAME)
  7. .aggregations("makeCount", a -> a.cardinality(c -> c
  8. .field("make")
  9. .precisionThreshold(100)
  10. )), CarOrder.class
  11. );
  12. //2.校验
  13. Aggregate makeCount = search.aggregations().get("makeCount");
  14. Assert.isTrue(4 == makeCount.cardinality().value());
  15. }

percentiles

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void percentiles() {
  4. //1.存储网站请求数据
  5. saveWebSite();
  6. //2.统计不同区域网站请求的平均值,通过百分数进行异常值分析,查看哪个区域网络延迟较高
  7. SearchResponse<WebsiteRequest> search = primaryClient.search(s -> s
  8. .index(WEBSITE_INDEX)
  9. .aggregations("zoneWebRequest", a -> a
  10. .terms(t -> t.field("zone"))
  11. .aggregations("avgLatency", ta -> ta.avg(avg -> avg
  12. .field("latency"))
  13. )
  14. .aggregations("percentilesLatency", ta -> ta.percentiles(p -> p
  15. .field("latency")
  16. .percents((double) 5, (double) 25, (double) 50, (double) 75, (double) 100)
  17. ))
  18. ), WebsiteRequest.class
  19. );
  20. //3.校验
  21. Aggregate zoneWebRequest = search.aggregations().get("zoneWebRequest");
  22. List<StringTermsBucket> zoneBuckets = zoneWebRequest.sterms().buckets().array();
  23. for (StringTermsBucket zoneBucket : zoneBuckets) {
  24. String key = zoneBucket.key();
  25. double avgLatency = zoneBucket.aggregations().get("avgLatency").avg().value();
  26. Console.log(key + "地区 网站平均响应延迟:" + avgLatency);
  27. TDigestPercentilesAggregate percentilesLatency = zoneBucket.aggregations().get("percentilesLatency").tdigestPercentiles();
  28. Percentiles values = percentilesLatency.values();
  29. for (String percentilesItem : values.keyed().keySet()) {
  30. Console.log(key + "地区" + percentilesItem + "%的请求 响应延迟:" + values.keyed().get(percentilesItem));
  31. }
  32. }
  33. }

percentileRanks

  1. @Test
  2. @SneakyThrows(IOException.class)
  3. void percentileRanks() {
  4. //1.存储网站请求数据
  5. saveWebSite();
  6. //2.获取美国地区,网络延迟达到30,80,100的用户百分比
  7. SearchResponse<WebsiteRequest> search = secondaryClient.search(s -> s
  8. .index(WEBSITE_INDEX)
  9. .aggregations("usWebRequests", a -> a
  10. .filter(f -> f.term(t -> t
  11. .field("zone")
  12. .value(FieldValue.of("US"))
  13. ))
  14. .aggregations("usPercentileRanks", fa -> fa.percentileRanks(p -> p
  15. .field("latency")
  16. .values((double) 30, (double) 80, (double) 100)
  17. ))
  18. ), WebsiteRequest.class
  19. );
  20. //3.校验
  21. Aggregate usWebRequests = search.aggregations().get("usWebRequests");
  22. Aggregate usPercentileRanks = usWebRequests.filter().aggregations().get("usPercentileRanks");
  23. TDigestPercentileRanksAggregate tDigestPercentileRanks = usPercentileRanks.tdigestPercentileRanks();
  24. Map<String, String> keyed = tDigestPercentileRanks.values().keyed();
  25. Assert.isTrue(3 == keyed.size());
  26. for (String key : keyed.keySet()) {
  27. Console.log(keyed.get(key) + "%的用户 网络延迟请求达到了" + key + "毫秒");
  28. }
  29. }

4.完整版单元测试

  1. import cn.hutool.core.date.DatePattern;
  2. import cn.hutool.core.lang.Assert;
  3. import cn.hutool.core.lang.Console;
  4. import cn.hutool.core.map.MapUtil;
  5. import cn.hutool.core.util.ObjectUtil;
  6. import cn.hutool.json.JSONUtil;
  7. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  8. import co.elastic.clients.elasticsearch._types.FieldValue;
  9. import co.elastic.clients.elasticsearch._types.Refresh;
  10. import co.elastic.clients.elasticsearch._types.SortOrder;
  11. import co.elastic.clients.elasticsearch._types.aggregations.*;
  12. import co.elastic.clients.elasticsearch.core.BulkRequest;
  13. import co.elastic.clients.elasticsearch.core.BulkResponse;
  14. import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
  15. import co.elastic.clients.elasticsearch.core.SearchResponse;
  16. import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
  17. import co.elastic.clients.transport.endpoints.BooleanResponse;
  18. import com.ftc.elasticsearchtest.entity.CarOrder;
  19. import com.ftc.elasticsearchtest.entity.WebsiteRequest;
  20. import lombok.SneakyThrows;
  21. import org.junit.jupiter.api.BeforeEach;
  22. import org.junit.jupiter.api.Test;
  23. import org.springframework.beans.factory.annotation.Qualifier;
  24. import org.springframework.boot.test.context.SpringBootTest;
  25. import javax.annotation.Resource;
  26. import java.io.IOException;
  27. import java.util.HashMap;
  28. import java.util.List;
  29. import java.util.Map;
  30. /**
  31. * @author: 冯铁城 [17615007230@163.com]
  32. * @date: 2022-08-24 19:35:36
  33. * @describe: ElasticSearch聚合操作Api
  34. */
  35. @SpringBootTest
  36. public class ElasticSearchAggregationTest {
  37. @Resource
  38. @Qualifier("primaryElasticsearchClient")
  39. private ElasticsearchClient primaryClient;
  40. @Resource
  41. @Qualifier("secondaryElasticsearchClient")
  42. private ElasticsearchClient secondaryClient;
  43. /**
  44. * 测试Mock索引名称常量
  45. */
  46. private static final String INDEX_NAME = "test_car_order";
  47. /**
  48. * 网站索引名称
  49. */
  50. private static final String WEBSITE_INDEX = "website";
  51. /**
  52. * 测试数据
  53. */
  54. private static final List<CarOrder> CAR_ORDER_LIST;
  55. /*
  56. 加载测试数据
  57. */
  58. static {
  59. String testData = "[{\"price\":10000,\"color\":\"red\",\"make\":\"honda\",\"soldDate\":\"2014-10-28\"},{\"price\":20000,\"color\":\"red\",\"make\":\"honda\",\"soldDate\":\"2014-11-05\"},{\"price\":30000,\"color\":\"green\",\"make\":\"ford\",\"soldDate\":\"2014-05-18\"},{\"price\":25000,\"color\":\"blue\",\"make\":\"ford\",\"soldDate\":\"2014-02-12\"},{\"price\":120000,\"color\":\"yellow\",\"make\":\"ford\",\"soldDate\":\"2014-06-11\"},{\"price\":90000,\"color\":\"yellow\",\"make\":\"ford\",\"soldDate\":\"2014-03-03\"},{\"price\":15000,\"color\":\"blue\",\"make\":\"toyota\",\"soldDate\":\"2014-07-02\"},{\"price\":12000,\"color\":\"green\",\"make\":\"toyota\",\"soldDate\":\"2014-08-19\"},{\"price\":80000,\"color\":\"red\",\"make\":\"bmw\",\"soldDate\":\"2014-01-01\"}]";
  60. CAR_ORDER_LIST = JSONUtil.toList(JSONUtil.parseArray(testData), CarOrder.class);
  61. }
  62. @BeforeEach
  63. void saveTestData() throws IOException {
  64. //1.判定索引是否存在
  65. boolean exist = primaryClient.indices().exists(e -> e.index(INDEX_NAME)).value();
  66. //2.不存在创建索引
  67. if (!exist) {
  68. //3.创建索引
  69. CreateIndexResponse createIndexResponse = primaryClient.indices().create(c -> c
  70. .index(INDEX_NAME + "_1")
  71. .aliases(INDEX_NAME, a -> a)
  72. .mappings(m -> m
  73. .properties("price", p -> p.double_(d -> d))
  74. .properties("color", p -> p.keyword(k -> k))
  75. .properties("make", p -> p.keyword(k -> k))
  76. .properties("soldDate", p -> p.date(d -> d))
  77. )
  78. .settings(s -> s
  79. .refreshInterval(r -> r.time("1s"))
  80. .numberOfShards("3")
  81. .numberOfReplicas("1")
  82. )
  83. );
  84. Assert.isTrue(Boolean.TRUE.equals(createIndexResponse.acknowledged()));
  85. }
  86. //4.删除数据
  87. DeleteByQueryResponse deleteByQuery = primaryClient.deleteByQuery(d -> d
  88. .index(INDEX_NAME)
  89. .query(q -> q.matchAll(m -> m))
  90. .refresh(true)
  91. );
  92. Assert.isTrue(ObjectUtil.isNotNull(deleteByQuery.deleted()));
  93. //5.批量存入数据
  94. BulkRequest.Builder builder = new BulkRequest.Builder();
  95. builder.index(INDEX_NAME);
  96. CAR_ORDER_LIST.forEach(car -> builder.operations(o -> o.create(c -> c.document(car))));
  97. builder.refresh(Refresh.True);
  98. BulkRequest build = builder.build();
  99. //6.存储数据
  100. BulkResponse bulk = primaryClient.bulk(build);
  101. Assert.isFalse(bulk.errors());
  102. }
  103. @Test
  104. @SneakyThrows(IOException.class)
  105. void terms() {
  106. //1.根据颜色进行聚合
  107. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  108. .index(INDEX_NAME)
  109. .aggregations("colorCars", a -> a.terms(t -> t
  110. .field("color"))
  111. ), CarOrder.class
  112. );
  113. //2.校验
  114. Aggregate colorCars = search.aggregations().get("colorCars");
  115. Buckets<StringTermsBucket> buckets = colorCars.sterms().buckets();
  116. Assert.isTrue(4 == buckets.array().size());
  117. buckets.array().forEach(b -> {
  118. Console.log(b.key());
  119. Console.log(b.docCount());
  120. });
  121. }
  122. @Test
  123. @SneakyThrows(IOException.class)
  124. void nested() {
  125. //1.同级嵌套,统计每种颜色,每种厂商的汽车销量
  126. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  127. .index(INDEX_NAME)
  128. .aggregations("colorOrders", a -> a
  129. .terms(t -> t.field("color")
  130. ))
  131. .aggregations("makeOrders", a -> a
  132. .terms(t -> t.field("make"))
  133. ), CarOrder.class
  134. );
  135. //2.校验
  136. Aggregate colorOrders = search.aggregations().get("colorOrders");
  137. List<StringTermsBucket> colorBuckets = colorOrders.sterms().buckets().array();
  138. Assert.isTrue(4 == colorBuckets.size());
  139. Aggregate makeOrders = search.aggregations().get("makeOrders");
  140. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  141. Assert.isTrue(4 == makeBuckets.size());
  142. //3.递归嵌套,统计取不同颜色汽车的销量,同时统计每种颜色汽车中每个制造商的销量以及每个制造商的总销售额
  143. search = primaryClient.search(s -> s
  144. .index(INDEX_NAME)
  145. .aggregations("colorOrders", a -> a
  146. .terms(t -> t.field("color"))
  147. .aggregations("makeOrders", colorA -> colorA.terms(
  148. t -> t.field("make"))
  149. .aggregations("makeTotalPrice", makeA -> makeA.sum(
  150. sum -> sum.field("price")
  151. ))
  152. )
  153. ), CarOrder.class
  154. );
  155. //4.校验
  156. colorOrders = search.aggregations().get("colorOrders");
  157. colorBuckets = colorOrders.sterms().buckets().array();
  158. Assert.isTrue(4 == colorBuckets.size());
  159. Console.log("按照颜色聚合-----");
  160. for (StringTermsBucket colorBucket : colorBuckets) {
  161. Console.log("颜色:" + colorBucket.key() + " 数量:" + colorBucket.docCount());
  162. Console.log("按照厂商聚合-----");
  163. makeOrders = colorBucket.aggregations().get("makeOrders");
  164. makeBuckets = makeOrders.sterms().buckets().array();
  165. for (StringTermsBucket makeBucket : makeBuckets) {
  166. Console.log("厂商:" + makeBucket.key() + " 数量:" + makeBucket.docCount());
  167. Aggregate makeTotalPrice = makeBucket.aggregations().get("makeTotalPrice");
  168. SumAggregate sum = makeTotalPrice.sum();
  169. Console.log("厂商销售总额:" + sum.value());
  170. }
  171. }
  172. }
  173. @Test
  174. @SneakyThrows(IOException.class)
  175. void histogram() {
  176. //1.按照销售额步长为20000统计不同颜色汽车的销量
  177. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  178. .index(INDEX_NAME)
  179. .aggregations("priceCarOrders", a -> a
  180. .histogram(h -> h
  181. .field("price")
  182. .interval((double) 20000)
  183. )
  184. .aggregations("colorOrders", ha -> ha
  185. .terms(t -> t.field("color"))
  186. )
  187. ), CarOrder.class
  188. );
  189. //2.校验
  190. Aggregate priceCarOrders = search.aggregations().get("priceCarOrders");
  191. List<HistogramBucket> priceCarOrdersBuckets = priceCarOrders.histogram().buckets().array();
  192. Assert.isTrue(7 == priceCarOrdersBuckets.size());
  193. Console.log("按照20000销售额价格聚合-----");
  194. for (HistogramBucket priceCarOrdersBucket : priceCarOrdersBuckets) {
  195. Console.log("[" + priceCarOrdersBucket.key() + "-" + (priceCarOrdersBucket.key() + (double) 20000)
  196. + "]区间内销售量:" + priceCarOrdersBucket.docCount()
  197. );
  198. Aggregate colorOrders = priceCarOrdersBucket.aggregations().get("colorOrders");
  199. List<StringTermsBucket> colorBuckets = colorOrders.sterms().buckets().array();
  200. for (StringTermsBucket colorBucket : colorBuckets) {
  201. Console.log("颜色:" + colorBucket.key() + " 数量:" + colorBucket.docCount());
  202. }
  203. }
  204. }
  205. @Test
  206. @SneakyThrows(IOException.class)
  207. void DateHistogram() {
  208. //1.每个季度,不同汽车品牌的销售总额
  209. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  210. .index(INDEX_NAME)
  211. .aggregations("quarterCarOrders", a -> a
  212. .dateHistogram(d -> d
  213. .field("soldDate")
  214. .calendarInterval(CalendarInterval.Quarter)
  215. .format(DatePattern.NORM_DATE_PATTERN)
  216. )
  217. .aggregations("makeOrders", da -> da
  218. .terms(t -> t.field("make"))
  219. .aggregations("totalPrice", ta -> ta
  220. .sum(sum -> sum.field("price"))
  221. )
  222. )
  223. ), CarOrder.class
  224. );
  225. //2.校验
  226. Aggregate quarterCarOrders = search.aggregations().get("quarterCarOrders");
  227. List<DateHistogramBucket> dateHistogramBuckets = quarterCarOrders.dateHistogram().buckets().array();
  228. Assert.isTrue(5 == dateHistogramBuckets.size());
  229. Console.log("按照季度聚合-----");
  230. for (DateHistogramBucket dateHistogramBucket : dateHistogramBuckets) {
  231. Console.log(dateHistogramBucket.keyAsString() + "季度售卖量" + dateHistogramBucket.docCount());
  232. Aggregate makeOrders = dateHistogramBucket.aggregations().get("makeOrders");
  233. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  234. for (StringTermsBucket makeBucket : makeBuckets) {
  235. Console.log("厂商" + makeBucket.key() + "售卖" + makeBucket.docCount() + "台");
  236. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  237. Console.log("共盈利" + totalPrice.sum().value());
  238. }
  239. }
  240. }
  241. @Test
  242. @SneakyThrows(IOException.class)
  243. void searchAndAggregationsFilter() {
  244. //1.获取福特汽车均价以及所有汽车均价,看看福特汽车的价格是否合理
  245. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  246. .index(INDEX_NAME)
  247. .query(q -> q.constantScore(c -> c.filter(f -> f.term(t -> t
  248. .field("make")
  249. .value(FieldValue.of("ford"))
  250. ))))
  251. .aggregations("fortAvePrice", a -> a
  252. .avg(avg -> avg.field("price"))
  253. )
  254. .aggregations("totalMake", a -> a
  255. .global(g -> g)
  256. .aggregations("totalAvgPrice", ta -> ta.
  257. avg(avg -> avg.field("price"))
  258. )
  259. ), CarOrder.class
  260. );
  261. //2.校验
  262. Aggregate fortAvePrice = search.aggregations().get("fortAvePrice");
  263. Aggregate totalMake = search.aggregations().get("totalMake");
  264. Assert.isTrue(4 == search.hits().hits().size());
  265. Console.log("福特汽车均价:" + fortAvePrice.avg().value());
  266. Console.log("所有汽车均价:" + totalMake.global().aggregations().get("totalAvgPrice").avg().value());
  267. }
  268. @Test
  269. @SneakyThrows(IOException.class)
  270. void searchNotFilterButAggregationsFilter() {
  271. //1.查询所有福特汽车订单,统计福特汽车中黄色汽车的销售总额
  272. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  273. .index(INDEX_NAME)
  274. .query(q -> q.constantScore(c -> c.filter(f -> f.term(t -> t
  275. .field("make")
  276. .value(FieldValue.of("ford"))
  277. ))))
  278. .aggregations("yellowColorCarOrders", a -> a
  279. .filter(f -> f.constantScore(c -> c.filter(cf -> cf.term(t -> t
  280. .field("color")
  281. .value(FieldValue.of("yellow"))
  282. ))))
  283. .aggregations("totalPrice", ta -> ta.sum(sum -> sum
  284. .field("price")
  285. ))
  286. ), CarOrder.class
  287. );
  288. //2.校验
  289. Assert.isTrue(4 == search.hits().hits().size());
  290. Aggregate yellowColorCarOrders = search.aggregations().get("yellowColorCarOrders");
  291. Assert.isTrue(2 == yellowColorCarOrders.filter().docCount());
  292. Aggregate totalPrice = yellowColorCarOrders.filter().aggregations().get("totalPrice");
  293. Console.log("福特黄色汽车的销售总额:" + totalPrice.sum().value());
  294. }
  295. @Test
  296. @SneakyThrows(IOException.class)
  297. void searchFilterButAggregationsNotFilter() {
  298. //1.查询展示蓝色福特汽车,同时统计不同颜色汽车的销售均价
  299. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  300. .index(INDEX_NAME)
  301. .aggregations("colorCarOrders", a -> a.terms(t -> t
  302. .field("color"))
  303. .aggregations("avgPrice", fa -> fa.avg(avg -> avg
  304. .field("price")
  305. ))
  306. )
  307. .postFilter(pf -> pf.bool(b -> b
  308. .must(m -> m.term(t -> t
  309. .field("color")
  310. .value(FieldValue.of("blue"))
  311. ))
  312. .must(m -> m.term(t -> t
  313. .field("make")
  314. .value(FieldValue.of("ford"))
  315. )))
  316. ), CarOrder.class
  317. );
  318. //2.校验
  319. Assert.isTrue(1 == search.hits().hits().size());
  320. Aggregate colorCarOrders = search.aggregations().get("colorCarOrders");
  321. List<StringTermsBucket> colorBuckets = colorCarOrders.sterms().buckets().array();
  322. Assert.isTrue(4 == colorBuckets.size());
  323. for (StringTermsBucket colorBucket : colorBuckets) {
  324. Aggregate avePrice = colorBucket.aggregations().get("avgPrice");
  325. Console.log(colorBucket.key() + "汽车销售均价" + avePrice.avg().value());
  326. }
  327. }
  328. @Test
  329. @SneakyThrows(IOException.class)
  330. void sort() {
  331. //1.同级排序-统计所有厂商的销售总额,并明确了解谁的销售总额最高
  332. HashMap<String, SortOrder> sortMaps = MapUtil.newHashMap(1);
  333. sortMaps.put("totalPrice", SortOrder.Desc);
  334. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  335. .index(INDEX_NAME)
  336. .aggregations("makeCarOrders", a -> a.terms(t -> t
  337. .field("make")
  338. .order(sortMaps)
  339. )
  340. .aggregations("totalPrice", fa -> fa.sum(sum -> sum
  341. .field("price")
  342. ))
  343. ), CarOrder.class
  344. );
  345. //2.校验
  346. Aggregate makeCarOrders = search.aggregations().get("makeCarOrders");
  347. List<StringTermsBucket> makeBuckets = makeCarOrders.sterms().buckets().array();
  348. for (StringTermsBucket makeBucket : makeBuckets) {
  349. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  350. Console.log(makeBucket.key() + "汽车销售总额" + totalPrice.sum().value());
  351. }
  352. }
  353. @Test
  354. @SneakyThrows(IOException.class)
  355. void sortDeep() {
  356. //1.统计每个季度,每个厂商黄色汽车销售总额,并且直观看到黄色汽车销售总额高的厂商
  357. HashMap<String, SortOrder> sortMaps = MapUtil.newHashMap(1);
  358. sortMaps.put("totalPrice>yellowTotalPrice", SortOrder.Desc);
  359. SearchResponse<CarOrder> search = secondaryClient.search(s -> s
  360. .index(INDEX_NAME)
  361. .aggregations("quarterOrders", a -> a
  362. .dateHistogram(d -> d
  363. .field("soldDate")
  364. .calendarInterval(CalendarInterval.Quarter)
  365. .format(DatePattern.NORM_DATE_PATTERN)
  366. )
  367. .aggregations("makeOrders", da -> da
  368. .terms(t -> t
  369. .field("make")
  370. .order(sortMaps)
  371. )
  372. .aggregations("totalPrice", daa -> daa
  373. .filter(f -> f.term(t -> t
  374. .field("color")
  375. .value(FieldValue.of("yellow"))
  376. ))
  377. .aggregations("yellowTotalPrice", daaa -> daaa.sum(sum -> sum
  378. .field("price")
  379. ))
  380. )
  381. )
  382. ), CarOrder.class
  383. );
  384. //2.校验
  385. Aggregate quarterOrders = search.aggregations().get("quarterOrders");
  386. List<DateHistogramBucket> histogramBuckets = quarterOrders.dateHistogram().buckets().array();
  387. Assert.isTrue(5 == histogramBuckets.size());
  388. for (DateHistogramBucket histogramBucket : histogramBuckets) {
  389. Console.log(histogramBucket.keyAsString() + "-季度共卖出" + histogramBucket.docCount() + "台汽车");
  390. Console.log("其中黄色汽车订单销售额如下:");
  391. Aggregate makeOrders = histogramBucket.aggregations().get("makeOrders");
  392. List<StringTermsBucket> makeBuckets = makeOrders.sterms().buckets().array();
  393. for (StringTermsBucket makeBucket : makeBuckets) {
  394. Aggregate totalPrice = makeBucket.aggregations().get("totalPrice");
  395. Aggregate yellowTotalPrice = totalPrice.filter().aggregations().get("yellowTotalPrice");
  396. Console.log(makeBucket.key() + "共卖出黄色汽车总价:" + yellowTotalPrice.sum().value());
  397. }
  398. }
  399. }
  400. @Test
  401. @SneakyThrows(IOException.class)
  402. void cardinality() {
  403. //1.统计共有多少个厂商的汽车订单
  404. SearchResponse<CarOrder> search = primaryClient.search(s -> s
  405. .index(INDEX_NAME)
  406. .aggregations("makeCount", a -> a.cardinality(c -> c
  407. .field("make")
  408. .precisionThreshold(100)
  409. )), CarOrder.class
  410. );
  411. //2.校验
  412. Aggregate makeCount = search.aggregations().get("makeCount");
  413. Assert.isTrue(4 == makeCount.cardinality().value());
  414. }
  415. @Test
  416. @SneakyThrows(IOException.class)
  417. void percentiles() {
  418. //1.存储网站请求数据
  419. saveWebSite();
  420. //2.统计不同区域网站请求的平均值,通过百分数进行异常值分析,查看哪个区域网络延迟较高
  421. SearchResponse<WebsiteRequest> search = primaryClient.search(s -> s
  422. .index(WEBSITE_INDEX)
  423. .aggregations("zoneWebRequest", a -> a
  424. .terms(t -> t.field("zone"))
  425. .aggregations("avgLatency", ta -> ta.avg(avg -> avg
  426. .field("latency"))
  427. )
  428. .aggregations("percentilesLatency", ta -> ta.percentiles(p -> p
  429. .field("latency")
  430. .percents((double) 5, (double) 25, (double) 50, (double) 75, (double) 100)
  431. ))
  432. ), WebsiteRequest.class
  433. );
  434. //3.校验
  435. Aggregate zoneWebRequest = search.aggregations().get("zoneWebRequest");
  436. List<StringTermsBucket> zoneBuckets = zoneWebRequest.sterms().buckets().array();
  437. for (StringTermsBucket zoneBucket : zoneBuckets) {
  438. String key = zoneBucket.key();
  439. double avgLatency = zoneBucket.aggregations().get("avgLatency").avg().value();
  440. Console.log(key + "地区 网站平均响应延迟:" + avgLatency);
  441. TDigestPercentilesAggregate percentilesLatency = zoneBucket.aggregations().get("percentilesLatency").tdigestPercentiles();
  442. Percentiles values = percentilesLatency.values();
  443. for (String percentilesItem : values.keyed().keySet()) {
  444. Console.log(key + "地区" + percentilesItem + "%的请求 响应延迟:" + values.keyed().get(percentilesItem));
  445. }
  446. }
  447. }
  448. @Test
  449. @SneakyThrows(IOException.class)
  450. void percentileRanks() {
  451. //1.存储网站请求数据
  452. saveWebSite();
  453. //2.获取美国地区,网络延迟达到30,80,100的用户百分比
  454. SearchResponse<WebsiteRequest> search = secondaryClient.search(s -> s
  455. .index(WEBSITE_INDEX)
  456. .aggregations("usWebRequests", a -> a
  457. .filter(f -> f.term(t -> t
  458. .field("zone")
  459. .value(FieldValue.of("US"))
  460. ))
  461. .aggregations("usPercentileRanks", fa -> fa.percentileRanks(p -> p
  462. .field("latency")
  463. .values((double) 30, (double) 80, (double) 100)
  464. ))
  465. ), WebsiteRequest.class
  466. );
  467. //3.校验
  468. Aggregate usWebRequests = search.aggregations().get("usWebRequests");
  469. Aggregate usPercentileRanks = usWebRequests.filter().aggregations().get("usPercentileRanks");
  470. TDigestPercentileRanksAggregate tDigestPercentileRanks = usPercentileRanks.tdigestPercentileRanks();
  471. Map<String, String> keyed = tDigestPercentileRanks.values().keyed();
  472. Assert.isTrue(3 == keyed.size());
  473. for (String key : keyed.keySet()) {
  474. Console.log(keyed.get(key) + "%的用户 网络延迟请求达到了" + key + "毫秒");
  475. }
  476. }
  477. @SneakyThrows(IOException.class)
  478. void saveWebSite() {
  479. //1.判定索引是否存在
  480. BooleanResponse exists = primaryClient.indices().exists(e -> e.index(WEBSITE_INDEX));
  481. //2.不存在创建索引
  482. if (!exists.value()) {
  483. //3.创建索引
  484. CreateIndexResponse createIndexResponse = primaryClient.indices()
  485. .create(c -> c
  486. .index(WEBSITE_INDEX)
  487. .mappings(m -> m
  488. .properties("latency", p -> p.long_(l -> l))
  489. .properties("timestamp", p -> p.date(d -> d))
  490. .properties("zone", p -> p.keyword(k -> k))
  491. )
  492. );
  493. Assert.isTrue(Boolean.TRUE.equals(createIndexResponse.acknowledged()));
  494. }
  495. //4.清空数据
  496. DeleteByQueryResponse deleteByQueryResponse = primaryClient.deleteByQuery(d -> d
  497. .index(WEBSITE_INDEX)
  498. .query(q -> q.matchAll(m -> m))
  499. .refresh(true)
  500. );
  501. Assert.isTrue(ObjectUtil.isNotNull(deleteByQueryResponse.deleted()));
  502. //5.解析数据
  503. String dataStr = "[{\"latency\":100,\"zone\":\"US\",\"timestamp\":\"2014-10-28\"},{\"latency\":80,\"zone\":\"US\",\"timestamp\":\"2014-10-29\"},{\"latency\":99,\"zone\":\"US\",\"timestamp\":\"2014-10-29\"},{\"latency\":102,\"zone\":\"US\",\"timestamp\":\"2014-10-28\"},{\"latency\":75,\"zone\":\"US\",\"timestamp\":\"2014-10-28\"},{\"latency\":82,\"zone\":\"US\",\"timestamp\":\"2014-10-29\"},{\"latency\":100,\"zone\":\"EU\",\"timestamp\":\"2014-10-28\"},{\"latency\":280,\"zone\":\"EU\",\"timestamp\":\"2014-10-29\"},{\"latency\":155,\"zone\":\"EU\",\"timestamp\":\"2014-10-29\"},{\"latency\":623,\"zone\":\"EU\",\"timestamp\":\"2014-10-28\"},{\"latency\":380,\"zone\":\"EU\",\"timestamp\":\"2014-10-28\"},{\"latency\":319,\"zone\":\"EU\",\"timestamp\":\"2014-10-29\"}]";
  504. List<WebsiteRequest> websiteRequests = JSONUtil.toList(dataStr, WebsiteRequest.class);
  505. //6.封装Request
  506. BulkRequest.Builder builder = new BulkRequest.Builder();
  507. builder.index(WEBSITE_INDEX);
  508. for (WebsiteRequest websiteRequest : websiteRequests) {
  509. builder.operations(o -> o.create(c -> c.document(websiteRequest)));
  510. }
  511. builder.refresh(Refresh.True);
  512. //7.存储数据
  513. BulkResponse bulk = primaryClient.bulk(builder.build());
  514. Assert.isFalse(bulk.errors());
  515. }
  516. }