聚合

聚合(aggregations可以让我们极其方便的实现对数据的统计、分析、运算
聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组
    • TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

    Bucket聚合语法:

    image.png

    Metric聚合语法:

    例如stat聚合:就可以获取min、max、avg等结果。
    image.png

    RestAPI实现聚合

    聚合条件语法
    image.png
    聚合结果解析
    image.png

    1. void testMetric() throws IOException {
    2. //1.创建对应请求
    3. SearchRequest request = new SearchRequest("hotel");
    4. //2.DSL
    5. request.source().size(0);
    6. request.source().aggregation(AggregationBuilders
    7. .terms("brandAgg")
    8. .field("brand")
    9. .size(20)
    10. .order(BucketOrder.aggregation("_count",true))
    11. );
    12. //3.发请求
    13. SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    14. //4.解析结果
    15. Aggregations aggregations = response.getAggregations();
    16. Terms brandAgg = aggregations.get("brandAgg");
    17. List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
    18. for (Terms.Bucket bucket : buckets) {
    19. String key = bucket.getKeyAsString();
    20. System.out.println(key);
    21. }
    22. }

    案例:实现搜索栏条件
    image.png ```java package cn.itcast.hotel.service.impl;

import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;

import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;

@Service public class HotelService extends ServiceImpl implements IHotelService {

  1. @Autowired
  2. private RestHighLevelClient client;
  3. /**
  4. * 根据关键字搜索酒店信息
  5. *
  6. * @param params 请求参数对象,包含用户输入的关键字
  7. * @return 酒店文档列表
  8. */
  9. @Override
  10. public PageResult search(RequestParams params) {
  11. try {
  12. //1.
  13. SearchRequest request = new SearchRequest("hotel");
  14. //2.准备DSL并添加条件
  15. addQuery(request, params);
  16. //分页
  17. int size = params.getSize();
  18. int begin = (params.getPage() - 1) * size;
  19. request.source().from(begin).size(size);
  20. //距离排序
  21. String location = params.getLocation();
  22. if (location != null && !location.equals("")) {
  23. request.source().sort(SortBuilders
  24. .geoDistanceSort("location", new GeoPoint(location))
  25. .order(SortOrder.ASC)
  26. .unit(DistanceUnit.KILOMETERS)
  27. );
  28. }
  29. //发送请求
  30. SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  31. // 解析响应
  32. return handleResponse(response);
  33. } catch (IOException e) {
  34. throw new RuntimeException(e);
  35. }
  36. }
  37. /**
  38. * 条件查询拼接DSL
  39. *
  40. * @param request
  41. * @param params
  42. * @return
  43. */
  44. private void addQuery(SearchRequest request, RequestParams params) {
  45. //2.1query
  46. //构建boolQuery,多条件使用bool查询
  47. BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
  48. String key = params.getKey();
  49. //判断搜索词
  50. if (key == null || "".equals(key)) {
  51. boolQuery.must(QueryBuilders.matchAllQuery());
  52. } else {
  53. boolQuery.must(QueryBuilders.matchQuery("all", key));
  54. }
  55. // 3.城市条件
  56. if (params.getCity() != null && !params.getCity().equals("")) {
  57. boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
  58. }
  59. // 4.品牌条件
  60. if (params.getBrand() != null && !params.getBrand().equals("")) {
  61. boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
  62. }
  63. // 5.星级条件
  64. if (params.getStarName() != null && !params.getStarName().equals("")) {
  65. boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
  66. }
  67. //6.价格
  68. if (params.getMinPrice() != null && params.getMaxPrice() != null) {
  69. boolQuery.filter(QueryBuilders.rangeQuery("price")
  70. .gte(params.getMinPrice())
  71. .lte(params.getMaxPrice()));
  72. }
  73. //function_score
  74. FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
  75. // 原始查询,相关性算分的查询
  76. boolQuery,
  77. // function score的数组
  78. new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
  79. // 其中的一个function score 元素
  80. new FunctionScoreQueryBuilder.FilterFunctionBuilder(
  81. //过滤条件
  82. QueryBuilders.termQuery("isAD", true),
  83. //算分函数
  84. ScoreFunctionBuilders.weightFactorFunction(10)
  85. )
  86. }
  87. );
  88. //放入条件
  89. request.source().query(functionScoreQuery);
  90. }
  91. /**
  92. * 搜索栏条件添加
  93. *
  94. * @param params
  95. * @return
  96. */
  97. @Override
  98. public Map<String, List<String>> getFilters(RequestParams params) {
  99. try {
  100. //1.
  101. SearchRequest request = new SearchRequest("hotel");
  102. //2.准备DSL并添加条件
  103. addQuery(request, params);
  104. //2.正常条件
  105. request.source().size(0);
  106. //3.聚合条件
  107. addFilter(request);
  108. //4.发送请求
  109. SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  110. //解析结果
  111. Aggregations aggregations = response.getAggregations();
  112. //放入不同map
  113. Map<String, List<String>> map = new HashMap<>();
  114. List<String> cityList = getAggField(aggregations, "cityAgg");
  115. map.put("city", cityList);
  116. List<String> starList = getAggField(aggregations, "starAgg");
  117. map.put("starName", starList);
  118. List<String> brandList = getAggField(aggregations, "brandAgg");
  119. map.put("brand", brandList);
  120. return map;
  121. } catch (IOException e) {
  122. throw new RuntimeException(e);
  123. }
  124. }
  125. /**
  126. * 自动补全
  127. *
  128. * @param key
  129. * @return
  130. */
  131. @Override
  132. public List<String> getSuggestions(String key) {
  133. try {
  134. //1.请求
  135. SearchRequest request = new SearchRequest("hotel");
  136. //2.DSL的java代码
  137. request.source().suggest(new SuggestBuilder().addSuggestion(
  138. "suggestions",
  139. SuggestBuilders.completionSuggestion("suggestion")
  140. .prefix(key)
  141. .skipDuplicates(true)
  142. .size(10)
  143. ));
  144. //3.发送请求获取参数
  145. SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  146. //4.解析参数
  147. Suggest suggest = response.getSuggest();
  148. // 4.1.根据补全查询名称,获取补全结果
  149. CompletionSuggestion suggestions= suggest.getSuggestion("suggestions");
  150. // 4.2.获取options
  151. List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
  152. List<String>list=new ArrayList<>();
  153. //4.3遍历放入集合
  154. for (CompletionSuggestion.Entry.Option option : options) {
  155. String string = option.getText().string();
  156. list.add(string);
  157. }
  158. return list;
  159. } catch (IOException e) {
  160. throw new RuntimeException(e);
  161. }
  162. }
  163. /**
  164. * 新增、修改
  165. * @param id
  166. */
  167. @Override
  168. public void insertById(Long id) {
  169. try {
  170. //查询对应id的酒店信息
  171. Hotel hotel = getById(id);
  172. HotelDoc hotelDoc=new HotelDoc(hotel);
  173. //请求
  174. IndexRequest request = new IndexRequest("hotel").id(id.toString());
  175. //组装DSL
  176. request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
  177. //发请求
  178. client.index(request,RequestOptions.DEFAULT);
  179. } catch (IOException e) {
  180. throw new RuntimeException(e);
  181. }
  182. }
  183. /**
  184. * 删除
  185. * @param id
  186. */
  187. @Override
  188. public void deleteById(Long id) {
  189. try {
  190. //创建
  191. DeleteRequest request = new DeleteRequest("hotel",id.toString());
  192. client.delete(request,RequestOptions.DEFAULT);
  193. } catch (IOException e) {
  194. throw new RuntimeException(e);
  195. }
  196. }
  197. /**
  198. * 得到对应搜索框信息
  199. *
  200. * @param aggregations
  201. * @param aggName
  202. * @return
  203. */
  204. private List<String> getAggField(Aggregations aggregations, String aggName) {
  205. Terms aggTerms = aggregations.get(aggName);
  206. List<? extends Terms.Bucket> buckets = aggTerms.getBuckets();
  207. List<String> list = new ArrayList<>();
  208. for (Terms.Bucket bucket : buckets) {
  209. String key = bucket.getKeyAsString();
  210. list.add(key);
  211. }
  212. return list;
  213. }
  214. /**
  215. * 聚合条件
  216. *
  217. * @param request
  218. */
  219. private void addFilter(SearchRequest request) {
  220. request.source().aggregation(AggregationBuilders
  221. .terms("brandAgg")
  222. .field("brand")
  223. .size(100));
  224. request.source().aggregation(AggregationBuilders
  225. .terms("starAgg")
  226. .field("starName")
  227. .size(100));
  228. request.source().aggregation(AggregationBuilders
  229. .terms("cityAgg")
  230. .field("city")
  231. .size(100)
  232. );
  233. }
  234. /**
  235. * 结果解析
  236. *
  237. * @param response
  238. * @return
  239. */
  240. private PageResult handleResponse(SearchResponse response) {
  241. //解析结果
  242. SearchHits searchHits = response.getHits();
  243. //总数
  244. long total = searchHits.getTotalHits().value;
  245. //集合
  246. SearchHit[] hits = searchHits.getHits();
  247. List<HotelDoc> hotelDocList = new ArrayList<>();
  248. for (SearchHit hit : hits) {
  249. //酒店对象JSON
  250. String hitSourceAsString = hit.getSourceAsString();
  251. //转换对象hotelDoc
  252. HotelDoc hotelDoc = JSON.parseObject(hitSourceAsString, HotelDoc.class);
  253. //距离
  254. Object[] sortValues = hit.getSortValues();
  255. if (sortValues != null && sortValues.length > 0) {
  256. hotelDoc.setDistance(sortValues[0]);
  257. }
  258. hotelDocList.add(hotelDoc);
  259. }
  260. return new PageResult(total, hotelDocList);
  261. }

}

  1. <a name="oioHr"></a>
  2. ### 自动补全
  3. 拼音分词器:<br />安装方式与IK分词器一样,分三步:<br /> ①解压<br /> ②上传到虚拟机中,elasticsearch的plugin目录<br /> ③重启elasticsearch<br /> ④测试<br />
  4. ```java
  5. POST /_analyze{ "text": "如家酒店还不错", "analyzer": "pinyin"}

创建表时添加suggestion字段用于补全
只有在创建表的时候添加

// 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

image.png
实例DSL:
image.png
解析:
image.png

@Override
public List<String> getSuggestions(String prefix) {
    try {
        // 1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 2.准备DSL
        request.source().suggest(new SuggestBuilder().addSuggestion(
            "suggestions",
            SuggestBuilders.completionSuggestion("suggestion")
            .prefix(prefix)
            .skipDuplicates(true)
            .size(10)
        ));
        // 3.发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1.根据补全查询名称,获取补全结果
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        // 4.2.获取options
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        // 4.3.遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            String text = option.getText().toString();
            list.add(text);
        }
        return list;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

ES索引和数据库的数据同步问题:

三种数据同步方法
方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog——相对高级

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

流程:

需要用到的模块都需要导入
<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在yml中申明rabbitMq的端口。。
spring:
  rabbitmq:
    host: 121.36.164.132 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

可以用配置类声明也可以直接用注解,注解详细见上一片ES

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}
@PostMapping
    public void saveHotel(@RequestBody Hotel hotel) {
        hotelService.save(hotel);

        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_INSERT_KEY
                ,hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_INSERT_KEY
                ,hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
                ,MQConstants.HOTEL_DELETE_KEY
                ,id);
    }

编写监听器

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听酒店删除的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

面试相关问题:

数据同步的解决方案(面试)

有以下几种解决方案
同步调用:一般出现在传统类型的项目中,耦合度高,速度较慢
异步通知:使用MQ队列来完成异步和一定程度的解耦,依赖MQ的可靠性
监听binlog:可以通过阿里与的canal组件来监听mysql的binlog日志来实现,可以完全的解耦,开启binlog会增加数据库负担、实现复杂度高

ES内部的实现原理

数据插入的分片如何确定

先通过hash算法算出数据的hash值,然后用这个值去和分片的数量进行%运算,得到的值就是放入第几个分片
,也因此,索引库一旦创建,分片数量就不能修改

数据的查询

分为两个阶段
第一阶段是分散阶段,在这个阶段,协调节点会把请求分发到每一个分片
第二阶段是聚集阶段,这个阶段,协调节点会汇总分片的数据结果,并经过处理后将最终结果返回给用户

脑裂(面试)

数据脑裂:在一个集群中出现了不止一个master,这就是脑裂
解决:
配置方面:使用奇数个节点,因为选取master要满足半数以上原则
增加内存,防止内存不够带来的无法响应
增加响应超时时间,把心跳响应时间从默认的30s调整为更长的时间,来避免网络波动带来的无法接受心跳消息
较少访问数量,不要在短时间内对master进行太高的写请求并发,防止高并发带来的无法处理其他请求信息

ES的集群故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这就是故障转移