课程大纲

    1. SearchResponse sr = node.client().prepareSearch()
    2. .addAggregation(
    3. AggregationBuilders.terms("by_country").field("country")
    4. .subAggregation(AggregationBuilders.dateHistogram("by_year")
    5. .field("dateOfBirth")
    6. .dateHistogramInterval(DateHistogramInterval.YEAR)
    7. .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
    8. )
    9. )
    10. .execute().actionGet();

    我们先给个需求:
    (1)首先按照country国家来进行分组
    (2)然后在每个country分组内,再按照入职年限进行分组
    (3)最后计算每个分组内的平均薪资

    PUT /company
    {
      "mappings": {
          "employee": {
            "properties": {
              "age": {
                "type": "long"
              },
              "country": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                },
                "fielddata": true
              },
              "join_date": {
                "type": "date"
              },
              "name": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              },
              "position": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              },
              "salary": {
                "type": "long"
              }
            }
          }
        }
    }
    
    GET /company/employee/_search
    {
      "size": 0,
      "aggs": {
        "group_by_country": {
          "terms": {
            "field": "country"
          },
          "aggs": {
            "group_by_join_date": {
              "date_histogram": {
                "field": "join_date",
                "interval": "year"
              },
              "aggs": {
                "avg_salary": {
                  "avg": {
                    "field": "salary"
                  }
                }
              }
            }
          }
        }
      }
    }
    
    Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
            StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
            Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
    
            while(groupByCountryBucketIterator.hasNext()) {
                Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
    
                System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount()); 
    
                Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); 
                Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
    
                while(groupByJoinDateBucketIterator.hasNext()) {
                    org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
    
                    System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount()); 
    
                    Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
                    System.out.println(avgSalary.getValue()); 
                }
            }
    
            client.close();
        }
    

    代码如下:

    import java.net.InetAddress;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.search.aggregations.Aggregation;
    import org.elasticsearch.search.aggregations.AggregationBuilders;
    import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
    import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
    import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
    import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
    import org.elasticsearch.search.aggregations.metrics.avg.Avg;
    import org.elasticsearch.transport.client.PreBuiltTransportClient;
    
    /**
     * 员工聚合分析应用程序
     * @author Administrator
     *
     */
    public class EmployeeAggrApp {
    
        @SuppressWarnings({ "unchecked", "resource" })
        public static void main(String[] args) throws Exception {
            Settings settings = Settings.builder()
                    .put("cluster.name", "elasticsearch")
                    .build();
    
            TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300)); 
    
            SearchResponse searchResponse = client.prepareSearch("company") 
                    .addAggregation(AggregationBuilders.terms("group_by_country").field("country")
                            .subAggregation(AggregationBuilders
                                    .dateHistogram("group_by_join_date")
                                    .field("join_date")
                                    .dateHistogramInterval(DateHistogramInterval.YEAR)
                                    .subAggregation(AggregationBuilders.avg("avg_salary").field("salary")))
                    )
                    .execute().actionGet();
    
            Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
    
            StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
            Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
            while(groupByCountryBucketIterator.hasNext()) {
                Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
                System.out.println(groupByCountryBucket.getKey() + ":" + groupByCountryBucket.getDocCount()); 
    
                Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");
                Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
                while(groupByJoinDateBucketIterator.hasNext()) {
                    org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
                    System.out.println(groupByJoinDateBucket.getKey() + ":" +groupByJoinDateBucket.getDocCount()); 
    
                    Avg avg = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary"); 
                    System.out.println(avg.getValue()); 
                }
            }
    
            client.close();
        }
    
    }