学习目标:

1.数据聚合

2.自动补全

3.数据同步

4.es集群搭建

一.数据聚合

1.语法

条件:
image.png
结果:
image.png

2.业务需求

image.png

3.业务实现

service层:

  1. @Override
  2. public Map<String, List<String>> getFilters(RequestParams params) {
  3. try {
  4. //创建请求
  5. SearchRequest request = new SearchRequest("hotel");
  6. SearchSourceBuilder source = request.source();
  7. //2.设置搜索条件
  8. buildBasicQuery(params, source); //构建搜索条件 起到过滤作用
  9. //3.设置size=0
  10. source.size(0);
  11. //4.设置聚合请求条件 brand分组 city分组,starName分组
  12. buildAggregations(source);
  13. //5.执行聚合搜索请求
  14. SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  15. //解析结果
  16. //6.1获取聚合结果
  17. Aggregations aggregations = response.getAggregations();
  18. List<String> brandList = getAggByName(aggregations, "brandAgg");
  19. List<String> cityList = getAggByName(aggregations, "cityAgg");
  20. List<String> starList = getAggByName(aggregations, "starNameAgg");
  21. //将结果封装成Map<String, List<String>>
  22. HashMap<String, List<String>> map = new HashMap<>();
  23. map.put("brand", brandList);
  24. map.put("city", cityList);
  25. map.put("starName", starList);
  26. return map;
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. throw new RuntimeException("搜索出错");
  30. }
  31. }
  32. /**
  33. * @param aggregations 聚合结果对象
  34. * @param aggName 自定义的聚合名称
  35. * @return
  36. */
  37. private List<String> getAggByName(Aggregations aggregations, String aggName) {
  38. //6.2获取指定聚合结果
  39. Terms aggResult = aggregations.get(aggName);
  40. //6.3获取结果集合中的分组信息
  41. List<? extends Terms.Bucket> buckets = aggResult.getBuckets();
  42. //6.4遍历分组,得到分组信息
  43. ArrayList<String> resultList = new ArrayList<>();
  44. for (Terms.Bucket bucket : buckets) {
  45. //获取每个分组的key,加入到集合
  46. resultList.add(bucket.getKeyAsString());
  47. }
  48. //讲集合返回
  49. return resultList;
  50. }
  51. private void buildAggregations(SearchSourceBuilder source) {
  52. source.aggregation(
  53. AggregationBuilders.terms("brandAgg")
  54. .field("brand")
  55. .size(20)
  56. );
  57. source.aggregation(
  58. AggregationBuilders.terms("cityAgg")
  59. .field("city")
  60. .size(20)
  61. );
  62. source.aggregation(
  63. AggregationBuilders.terms("starNameAgg")
  64. .field("starName")
  65. .size(20)
  66. );
  67. }

二.自动补全

image.png

1.自动补全查询的JavaAPI

image.png
响应结果:
image.png

2.代码实现

 @Override
    public List<String> getSuggestions(String key) {
        try {
            //1.创建请求
            SearchRequest request = new SearchRequest("hotel");
            SearchSourceBuilder source = request.source();
            //2.添加Suggestions搜索条件
            source.suggest(
                    //3.搜索索引库中suggestions字段  以key为前缀的所有词条 (只显示10个)
                    new SuggestBuilder().addSuggestion(
                            "mySuggestion",  //自定义查询名称
                            SuggestBuilders.completionSuggestion("suggestion")  //建议查询的构建  参数:查询字段
                                    .prefix(key) //前缀匹配
                                    .skipDuplicates(true) //去重
                                    .size(10)  //保留10条

                    )
            );
            //4.执行请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            //5.解析响应结果
            //查询suggest结果
            Suggest suggest = response.getSuggest();
            //根据自定义名称,找到具体的suggest结果
            CompletionSuggestion suggestion = suggest.getSuggestion("mySuggestion");
            //获取options  遍历options
            List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
            List<String> suggestionList = new ArrayList<>();
            for (CompletionSuggestion.Entry.Option option : options) {
                suggestionList.add(option.getText().toString());
            }
            return suggestionList;
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException("搜索中断");
        }
    }

三.数据同步

1.声明交换机、队列

1)引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)声明队列交换机名称

package cn.itcast.hotel.constatnts;

    public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

3)绑定队列交换机

package cn.itcast.hotel.config;

import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

4) 配置参数

spring:
  rabbitmq:
    virtual-host: /
    port: 5672
    host: 192.168.200.130
    username: itcast
    password: 123321

2.发送MQ消息

3.接收MQ消息

编写监听器

package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
        hotelService.insertById(id);
    }

    /**
     * 监听酒店删除的业务
     * @param id 酒店id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
}

四.集群

面试:

项目中具体使用了几台es ? (如何保证es高可用?)
我们在Linux服务器上搭建了es集群,具体几台不清楚,软件的部署安装都是运维负责的
不过es集群原理我知道些
    es可以通过多台es节点,组成高可用集群,一个集群会有一个master节点,负责管理集群分片,如索引库分片
    搭建集群后,我们在创建索引库时可以设置分片的数量(分片把一个索引库的数据分成若干份,每一份就是一个分片)
    设置副本数量 (每一个主分片 都可以设置副本 保证分片安全 可用)

    比如:我们搭建一个 3节点es集群   主分片: 3   副本分片: 1   一共6份分片数据,
    master节点可以将6份分片数据分配到不同的clusterNode中, 
    并且保证 对应的 主分片 和 副本分片不在一起 这样分片越多能存储的数据量越大, 因为有副本 数据也是高可用的



自动选举:
如果master节点挂掉了,其它节点可以感应到 (集群节点会相互通信) 默认 : 超过半数节点认为master挂掉了  会投票选举新的master节点


故障恢复:   (green   yellow  red)
如果任意节点挂掉,分片信号丢失此时集群状态为 yellow es会触发故障恢复,

                 如果主分片信号丢失   ,会基于副本 将丢失的分片数据恢复 

                 如果副本分片信号丢失  , 会基于主分片 生成副本数据