1.1 导入es的高阶客户端

    1. <dependency>
    2. <groupId>org.elasticsearch.client</groupId>
    3. <artifactId>elasticsearch-rest-high-level-client</artifactId>
    4. <version>7.4.2</version>
    5. </dependency>

    1.2 给容器中注入RestHighLevelClient

    1. @Configuration
    2. public class ElasticsearchConfig {
    3. /**
    4. * 通用的请求设置项, 类似于请求头信息
    5. */
    6. public static final RequestOptions COMMON_OPTIONS;
    7. static {
    8. RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
    9. COMMON_OPTIONS = builder.build();
    10. }
    11. /**
    12. * 容器注入高阶的rest访问client
    13. */
    14. @Bean
    15. public RestHighLevelClient elasticsearchRestHighLevelClient() {
    16. //final String hostname, final int port, final String scheme
    17. //schemeName 默认是 http, 不传也行
    18. HttpHost httpHost = new HttpHost("192.168.56.10", 9200);
    19. RestClientBuilder builder = RestClient.builder(httpHost);
    20. RestHighLevelClient client = new RestHighLevelClient(builder);
    21. return client;
    22. }
    23. }

    1.3 自动注入高阶Client

    1. @Autowired
    2. private RestHighLevelClient client;
    1. es中保存数据

      1. @Test
      2. void indexData() throws IOException {
      3. //1.新建索引
      4. IndexRequest indexRequest = new IndexRequest("users");
      5. //2.数据的id
      6. indexRequest.id("1");
      7. //第一种保存方式, k-v形式
      8. //indexRequest.source("username", "张三", "age", 18, "gender", "男");
      9. //第二种, 保存json字符串对象的形式
      10. User user = new User();
      11. user.setUsername("张三");
      12. user.setAge(18);
      13. user.setGender("男");
      14. String jsonString = JSON.toJSONString(user);
      15. //3.构建数据 XContentType ====> 必须要指定内容类型
      16. indexRequest.source(jsonString, XContentType.JSON);
      17. //4.执行操作
      18. IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
      19. //提取有用的响应数据
      20. System.out.println(index);
      21. }
    2. 测试批量保存数据 ```java @Test public void bulkTest() throws IOException { //1.创建emp集合 List emps = new ArrayList<>(); for (int i = 0; i < 10; i++) {

      1. Emp emp = new Emp();
      2. emp.setId(i);
      3. emp.setName("张益达" + i + "号");
      4. emp.setGender(i % 2 == 0 ? "男" : "女");
      5. emp.setProfession(i % 3 == 0 ? "vue开发工程师" : "安卓开发工程师");
      6. emps.add(emp);

      }

      //2.bulk请求 BulkRequest bulkRequest = new BulkRequest(); for (Emp emp : emps) {

      1. //3.index请求
      2. IndexRequest indexRequest = new IndexRequest("emp");
      3. indexRequest.id(emp.getId().toString());
      4. String jsonString = JSON.toJSONString(emp);
      5. indexRequest.source(jsonString, XContentType.JSON);
      6. //4.将index请求放入bulkRequest中
      7. bulkRequest.add(indexRequest);

      }

      //5.执行 BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);

      BulkItemResponse[] items = bulk.getItems(); List collect = Arrays.stream(items).map(BulkItemResponse::getResponse).collect(Collectors.toList()); System.out.println(“返回的结果集合:” + collect); }

      @Data class Emp { private Integer id; private String name; private String gender; private String profession; }

      1. 4. **测试检索数据 (简单查询)**
      2. ```java
      3. @Test
      4. void searchData() throws IOException {
      5. //1.创建检索请求
      6. SearchRequest searchRequest = new SearchRequest();
      7. //2.指定索引
      8. searchRequest.indices("bank");
      9. //3.指定DSL检索条件, 构建查询语句
      10. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
      11. MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("address", "Mill");
      12. sourceBuilder.query(matchQueryBuilder);
      13. sourceBuilder.from(0);
      14. sourceBuilder.size(4);
      15. //按照年龄的值分布进行聚合
      16. TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age");
      17. sourceBuilder.aggregation(ageAgg);
      18. //计算平均薪资
      19. AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
      20. sourceBuilder.aggregation(balanceAvg);
      21. //4.sourceBuilder放入请求中
      22. searchRequest.source(sourceBuilder);
      23. //5.执行检索, 拿到响应
      24. SearchResponse searchResponse = client.search(searchRequest, ElasticsearchConfig.COMMON_OPTIONS);
      25. System.out.println("检索条件 " + sourceBuilder.toString());
      26. System.out.println(searchResponse.toString());
      27. SearchHits hits = searchResponse.getHits();
      28. //真正命中的所有记录
      29. SearchHit[] searchHits = hits.getHits();
      30. for (SearchHit hit : searchHits) {
      31. /**
      32. * "_index": "bank",
      33. * "_type": "account",
      34. * "_id": "970",
      35. * "_score": 5.4032025,
      36. * "_source":
      37. */
      38. String sourceAsString = hit.getSourceAsString();
      39. Account account = JSON.parseObject(sourceAsString, Account.class);
      40. System.out.println(account);
      41. }
      42. //获取响应结果的聚合
      43. Aggregations aggregations = searchResponse.getAggregations();
      44. Terms ageAggRes = aggregations.get("ageAgg");
      45. for (Terms.Bucket bucket : ageAggRes.getBuckets()) {
      46. String keyAsString = bucket.getKeyAsString();
      47. System.out.println("年龄分布" + keyAsString + "===>有" + bucket.getDocCount() + "人");
      48. }
      49. Avg balanceAvgRes = aggregations.get("balanceAvg");
      50. System.out.println("平均薪资" + balanceAvgRes.getValue());
      51. }
      52. @Data
      53. @ToString
      54. static class Account {
      55. private int account_number;
      56. private int balance;
      57. private String firstname;
      58. private String lastname;
      59. private int age;
      60. private String gender;
      61. private String address;
      62. private String employer;
      63. private String email;
      64. private String city;
      65. private String state;
      66. }
      1. 先按照年龄聚合, 再计算每个年龄段的人的平均薪资 ```java @Test public void searchAgeWithBalance() throws IOException { //1.创建检索请求 SearchRequest searchRequest = new SearchRequest(); //2.指定索引 searchRequest.indices(“bank”); //3.指定DSL检索条件, 构建查询语句 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.matchAllQuery()); //查询所有

        //构造age的聚合 TermsAggregationBuilder ageAgg = AggregationBuilders.terms(“ageAgg”).field(“age”).size(20);

      1. /**
      2. * 先按照桶的doc_count排序, doc_count相同, 在根据key排序
      3. */
      4. //方法一: 按照count升序排列
      5. //ageAgg.order(BucketOrder.count(true));
      6. //按照key降序排列
      7. //ageAgg.order(BucketOrder.key(false));
      8. //方法二: 使用BucketOrder.compound方法
      9. List<BucketOrder> orders = new ArrayList<>();
      10. orders.add(BucketOrder.count(true));
      11. orders.add(BucketOrder.key(false));
      12. //ageAgg.order(orders);
      13. BucketOrder compound = BucketOrder.compound(orders);
      14. ageAgg.order(compound);
      15. //构造年龄的薪资子聚合, 即计算相同年龄段的平均薪资
      16. AvgAggregationBuilder subBalanceAvg = AggregationBuilders.avg("subBalanceAvg").field("balance");
      17. ageAgg.subAggregation(subBalanceAvg);
      18. sourceBuilder.aggregation(ageAgg);
      19. //总的平均薪资
      20. AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
      21. sourceBuilder.aggregation(balanceAvg);
      22. //4.sourceBuilder放入请求中
      23. searchRequest.source(sourceBuilder);
      24. //5.执行检索, 拿到响应
      25. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
      26. System.out.println("检索条件 " + sourceBuilder.toString());
      27. System.out.println(searchResponse.toString());
      28. SearchHits hits = searchResponse.getHits();
      29. for (SearchHit hit : hits.getHits()) {
      30. String sourceAsString = hit.getSourceAsString();
      31. Account account = JSON.parseObject(sourceAsString, Account.class);
      32. System.out.println("account===>" + account);
      33. }
      34. System.out.println("---------------------------------");
      35. Aggregations aggregations = searchResponse.getAggregations();
      36. ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");
      37. for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
      38. String keyAsString = bucket.getKeyAsString();
      39. long docCount = bucket.getDocCount();
      40. Aggregations aggregations1 = bucket.getAggregations();
      41. ParsedAvg subBalanceAvgRes = aggregations1.get("subBalanceAvg");
      42. double value = subBalanceAvgRes.getValue();
      43. System.out.println(keyAsString + "岁年龄人数: " + docCount + " || 平均薪资:" + value);
      44. }
      45. ParsedAvg balanceAvgTerms = aggregations.get("balanceAvg");
      46. double value = balanceAvgTerms.getValue();
      47. System.out.println("平均薪资" + value);

      }

      1. 6. **查出所有年龄分布,以及这些年龄段中【男性】的平均薪资和【女性】的平均薪资**
      2. **以及这个年龄段的【总体平均薪资】**
      3. ```java
      4. @Test
      5. public void searchBalanceGroupByGender() throws IOException {
      6. //1.检索请求
      7. SearchRequest searchRequest = new SearchRequest();
      8. //2.索引
      9. searchRequest.indices("bank");
      10. //3.构造检索条件, DSL语句
      11. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
      12. sourceBuilder.query(QueryBuilders.matchAllQuery()); //查询所有
      13. //按照年龄聚合
      14. TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(20).order(BucketOrder.key(true));
      15. TermsAggregationBuilder genderAgg = AggregationBuilders.terms("genderAgg").field("gender.keyword").order(BucketOrder.key(true));
      16. AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
      17. genderAgg.subAggregation(balanceAvg);
      18. ageAgg.subAggregation(genderAgg);
      19. ageAgg.subAggregation(balanceAvg);
      20. sourceBuilder.aggregation(ageAgg);
      21. //4.sourceBuilder放入请求中
      22. searchRequest.source(sourceBuilder);
      23. //5.执行请求
      24. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
      25. System.out.println("检索条件 " + sourceBuilder.toString());
      26. Aggregations aggregations = searchResponse.getAggregations();
      27. ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");
      28. for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
      29. String keyAsString = bucket.getKeyAsString();
      30. long docCount = bucket.getDocCount();
      31. Aggregations bucketAggregations = bucket.getAggregations();
      32. ParsedStringTerms genderAggterms = bucketAggregations.get("genderAgg");
      33. for (Terms.Bucket genderAggtermsBucket : genderAggterms.getBuckets()) {
      34. String genderKey = genderAggtermsBucket.getKeyAsString();
      35. String gender = genderKey.equals("M") ? "男" : "女";
      36. long genderDocCount = genderAggtermsBucket.getDocCount();
      37. Aggregations genderBucketAggregations = genderAggtermsBucket.getAggregations();
      38. ParsedAvg genderBalanceAvg = genderBucketAggregations.get("balanceAvg");
      39. System.out.println(keyAsString + "岁的人数中," +gender+ "性有" + genderDocCount + "人,平均薪资是" + genderBalanceAvg.getValue());
      40. }
      41. ParsedAvg agebalanceAvg = bucketAggregations.get("balanceAvg");
      42. System.out.println(keyAsString + "岁的人数有: " + docCount + " 平均薪资:" + agebalanceAvg.getValue() + "\n");
      43. }
      44. }