聚合
聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算
聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
 - Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
 
 - 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
 - Max:求最大值
 - Min:求最小值
 - Stats:同时求max、min、avg、sum等
 
 - 
Bucket聚合语法:
Metric聚合语法:
RestAPI实现聚合
聚合条件语法

聚合结果解析
void testMetric() throws IOException {//1.创建对应请求SearchRequest request = new SearchRequest("hotel");//2.DSLrequest.source().size(0);request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("_count",true)));//3.发请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析结果Aggregations aggregations = response.getAggregations();Terms brandAgg = aggregations.get("brandAgg");List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();System.out.println(key);}}
案例:实现搜索栏条件
```java
package cn.itcast.hotel.service.impl; 
import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Service
public class HotelService extends ServiceImpl
@Autowiredprivate RestHighLevelClient client;/*** 根据关键字搜索酒店信息** @param params 请求参数对象,包含用户输入的关键字* @return 酒店文档列表*/@Overridepublic PageResult search(RequestParams params) {try {//1.SearchRequest request = new SearchRequest("hotel");//2.准备DSL并添加条件addQuery(request, params);//分页int size = params.getSize();int begin = (params.getPage() - 1) * size;request.source().from(begin).size(size);//距离排序String location = params.getLocation();if (location != null && !location.equals("")) {request.source().sort(SortBuilders.geoDistanceSort("location", new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}/*** 条件查询拼接DSL** @param request* @param params* @return*/private void addQuery(SearchRequest request, RequestParams params) {//2.1query//构建boolQuery,多条件使用bool查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();String key = params.getKey();//判断搜索词if (key == null || "".equals(key)) {boolQuery.must(QueryBuilders.matchAllQuery());} else {boolQuery.must(QueryBuilders.matchQuery("all", key));}// 3.城市条件if (params.getCity() != null && !params.getCity().equals("")) {boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}// 4.品牌条件if (params.getBrand() != null && !params.getBrand().equals("")) {boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}// 5.星级条件if (params.getStarName() != null && !params.getStarName().equals("")) {boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//6.价格if (params.getMinPrice() != null && params.getMaxPrice() != null) {boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}//function_scoreFunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(// 原始查询,相关性算分的查询boolQuery,// function score的数组new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{// 其中的一个function score 元素new FunctionScoreQueryBuilder.FilterFunctionBuilder(//过滤条件QueryBuilders.termQuery("isAD", true),//算分函数ScoreFunctionBuilders.weightFactorFunction(10))});//放入条件request.source().query(functionScoreQuery);}/*** 搜索栏条件添加** @param params* @return*/@Overridepublic Map<String, List<String>> getFilters(RequestParams params) {try {//1.SearchRequest request = new SearchRequest("hotel");//2.准备DSL并添加条件addQuery(request, params);//2.正常条件request.source().size(0);//3.聚合条件addFilter(request);//4.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//解析结果Aggregations aggregations = response.getAggregations();//放入不同mapMap<String, List<String>> map = new HashMap<>();List<String> cityList = getAggField(aggregations, "cityAgg");map.put("city", cityList);List<String> starList = getAggField(aggregations, "starAgg");map.put("starName", starList);List<String> brandList = getAggField(aggregations, "brandAgg");map.put("brand", brandList);return map;} catch (IOException e) {throw new RuntimeException(e);}}/*** 自动补全** @param key* @return*/@Overridepublic List<String> getSuggestions(String key) {try {//1.请求SearchRequest request = new SearchRequest("hotel");//2.DSL的java代码request.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(key).skipDuplicates(true).size(10)));//3.发送请求获取参数SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析参数Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestions= suggest.getSuggestion("suggestions");// 4.2.获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();List<String>list=new ArrayList<>();//4.3遍历放入集合for (CompletionSuggestion.Entry.Option option : options) {String string = option.getText().string();list.add(string);}return list;} catch (IOException e) {throw new RuntimeException(e);}}/*** 新增、修改* @param id*/@Overridepublic void insertById(Long id) {try {//查询对应id的酒店信息Hotel hotel = getById(id);HotelDoc hotelDoc=new HotelDoc(hotel);//请求IndexRequest request = new IndexRequest("hotel").id(id.toString());//组装DSLrequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);//发请求client.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 删除* @param id*/@Overridepublic void deleteById(Long id) {try {//创建DeleteRequest request = new DeleteRequest("hotel",id.toString());client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 得到对应搜索框信息** @param aggregations* @param aggName* @return*/private List<String> getAggField(Aggregations aggregations, String aggName) {Terms aggTerms = aggregations.get(aggName);List<? extends Terms.Bucket> buckets = aggTerms.getBuckets();List<String> list = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();list.add(key);}return list;}/*** 聚合条件** @param request*/private void addFilter(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));}/*** 结果解析** @param response* @return*/private PageResult handleResponse(SearchResponse response) {//解析结果SearchHits searchHits = response.getHits();//总数long total = searchHits.getTotalHits().value;//集合SearchHit[] hits = searchHits.getHits();List<HotelDoc> hotelDocList = new ArrayList<>();for (SearchHit hit : hits) {//酒店对象JSONString hitSourceAsString = hit.getSourceAsString();//转换对象hotelDocHotelDoc hotelDoc = JSON.parseObject(hitSourceAsString, HotelDoc.class);//距离Object[] sortValues = hit.getSortValues();if (sortValues != null && sortValues.length > 0) {hotelDoc.setDistance(sortValues[0]);}hotelDocList.add(hotelDoc);}return new PageResult(total, hotelDocList);}
}
<a name="oioHr"></a>### 自动补全拼音分词器:<br />安装方式与IK分词器一样,分三步:<br /> ①解压<br /> ②上传到虚拟机中,elasticsearch的plugin目录<br /> ③重启elasticsearch<br /> ④测试<br />```javaPOST /_analyze{ "text": "如家酒店还不错", "analyzer": "pinyin"}
创建表时添加suggestion字段用于补全
只有在创建表的时候添加
// 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

实例DSL:
解析:
@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2.准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestions",
            SuggestBuilders.completionSuggestion("suggestion")
            .prefix(prefix)
            .skipDuplicates(true)
            .size(10)
        ));
        // 3.发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1.根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2.获取options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3.遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
ES索引和数据库的数据同步问题:
三种数据同步方法
方式一:同步调用
- 优点:实现简单,粗暴
 - 缺点:业务耦合度高
 
方式二:异步通知
- 优点:低耦合,实现难度一般
 - 缺点:依赖mq的可靠性
 
方式三:监听binlog——相对高级
- 优点:完全解除服务间耦合
 - 缺点:开启binlog增加数据库负担、实现复杂度高
 
流程:
需要用到的模块都需要导入
<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml中申明rabbitMq的端口。。
spring:
  rabbitmq:
    host: 121.36.164.132 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码
可以用配置类声明也可以直接用注解,注解详细见上一片ES
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}
@PostMapping
    public void saveHotel(@RequestBody Hotel hotel) {
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_INSERT_KEY
                ,hotel.getId());
    }
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_INSERT_KEY
                ,hotel.getId());
    }
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_DELETE_KEY
                ,id);
    }
编写监听器
@Component
public class HotelListener {
    @Autowired
    private IHotelService hotelService;
    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }
    /**
     * 监听酒店删除的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}
面试相关问题:
数据同步的解决方案(面试)
有以下几种解决方案
同步调用:一般出现在传统类型的项目中,耦合度高,速度较慢
异步通知:使用MQ队列来完成异步和一定程度的解耦,依赖MQ的可靠性
监听binlog:可以通过阿里与的canal组件来监听mysql的binlog日志来实现,可以完全的解耦,开启binlog会增加数据库负担、实现复杂度高
ES内部的实现原理
数据插入的分片如何确定
先通过hash算法算出数据的hash值,然后用这个值去和分片的数量进行%运算,得到的值就是放入第几个分片
,也因此,索引库一旦创建,分片数量就不能修改
数据的查询
分为两个阶段
第一阶段是分散阶段,在这个阶段,协调节点会把请求分发到每一个分片
第二阶段是聚集阶段,这个阶段,协调节点会汇总分片的数据结果,并经过处理后将最终结果返回给用户
脑裂(面试)
数据脑裂:在一个集群中出现了不止一个master,这就是脑裂
解决:
配置方面:使用奇数个节点,因为选取master要满足半数以上原则
增加内存,防止内存不够带来的无法响应
增加响应超时时间,把心跳响应时间从默认的30s调整为更长的时间,来避免网络波动带来的无法接受心跳消息
较少访问数量,不要在短时间内对master进行太高的写请求并发,防止高并发带来的无法处理其他请求信息 
ES的集群故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这就是故障转移
