Elasticsearch

开源搜索引擎,帮助在海量数据中进行快速查找。
结合kibana、Logstach、Beats,合成BLK (elastic stack)。应用在日志数据分析、实时监控等。
image.png

倒排索引

文档:每条数据是一个文档
词条:文档按照词义分成的词语

image.pngimage.png

正向索引 倒排索引
基于文档id创建索引。查找时先找到文档,判断是否包含词条 对文档内容分词,对词条创建索引,记录词条在文档中的信息。查找是先根据词条找到文档id,获取文档

概念对比

文档
image.png
索引

  • 索引:相同类型的文档集合
  • 映射:索引中文档的字段约束信息,类似表的结构约束

image.png
image.png

安装es,kibana,ik及使用

安装elasticsearch.md

https://www.elastic.co/cn/elasticsearch/
https://www.elastic.co/cn/kibana/
https://github.com/medcl/elasticsearch-analysis-ik/releases
IK分词器
image.png
image.png
分词器作用?

  • 创建倒排索引时对文档分词
  • 用户搜索时,对输入内容分词

IK分词器几种模式?

  • ik_smart:粗粒度
  • ik_max_word:细粒度

操作索引库

mapping属性
索引库的文档约束,常见mapping属性

  • type:字段数据类型
    • 字符串:text(可分词文本)、keyword
    • 数值:long、integer、short、byte、double、float
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认true
  • analyzer:使用哪种分词器
  • properties:该字段的子字段

索引库操作

创建、查询、删除、修改

  1. PUT /heima
  2. {
  3. "mappings": {
  4. "properties": {
  5. "info": {
  6. "type": "text",
  7. "analyzer": "ik_smart"
  8. },
  9. "email":{
  10. "type": "keyword",
  11. "index": false
  12. },
  13. "name": {
  14. "type": "object",
  15. "properties": {
  16. "firstName": {
  17. "type": "keyword"
  18. },
  19. "lastName": {
  20. "type" : "keyword"
  21. }
  22. }
  23. }
  24. }
  25. }
  26. }
DELETE /heima
ES禁止修改索引库,只能添加新字段
PUT /heima/_mapping
{
  "properties": {
      "age": {
        "type": "integer"
    }
  }
}

文档操作

# 插入文档
POST /heima/_doc/1
{
  "info": "黑马程序员",
  "email": "zy@itcast.cn",
  "name": {
    "firstName": "云",
    "last": "赵"
  }
}

# 查询操作
GET /heima/_doc/1

# 删除操作
DELETE /heima/_doc/1
# 全量修改文档
PUT /heima/_doc/1
{
  "info": "黑马程序员",
  "email": "zhaoyun@itcast.cn",
  "name": {
    "firstName": "云",
    "last": "赵"
  }
}

# 局部修改文档字段
POST /heima/_update/1
{
  "doc": {
      "email": "ZhaoYun@itcast.cn"
  }
}

RestClient操作索引库

ES官方提供的各种语言的客户端,本质是组装DSL语句,通过http请求发给ES。

步骤1:导入课前资料Demo
步骤2:分析数据结构
酒店mapping

# 酒店的mapping
PUT /hotel
{
  "mappings": {
    "properties": {
      "id" : {
        "type": "keyword"
      },
      "name": {
        "type": "text",
        "analyzer": "ik_max_word",
        "copy_to": "all"
      },
      "address": {
        "type": "keyword",
        "index": false
      },
      "price": {
        "type": "integer"
      },
      "score": {
        "type": "integer"
      },
      "brand": {
        "type": "keyword",
        "copy_to": "all"
      },
      "city": {
        "type": "keyword"
      },
      "starName": {
        "type": "keyword"
      },
      "business": {
        "type": "keyword",
        "copy_to": "all"
      },
      "location": {
        "type": "geo_point"
      },
      "pic": {
        "type": "keyword",
        "index": false
      },
      "all": {
        "type": "text",
        "analyzer": "ik_max_word"
      }
    }
  }
}

tips:字段拷贝使用copy_to属性将当前字段拷贝到
步骤3:初始化JavaRestClient

  1. 引入es的RestHighLevelClient依赖
  2. 覆盖默认ES版本
  3. 初始化RestHighLevelClient
    <properties>
    <java.version>1.8</java.version>
    <elasticsearch.version>7.12.1</elasticsearch.version>
    </properties>
    <dependencies>
    <dependency>
     <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
    </dependencies>
    
    步骤4:创建索引库。准备DSL,常量 ```java package cn.itcast.hotel.constans;

public class HotelConstants { public static final String MAPPING_TEMPLATE = “{\n” + “ \”mappings\”: {\n” + “ \”properties\”: {\n” + “ \”id\” : {\n” + “ \”type\”: \”keyword\”\n” + “ },\n” + “ \”name\”: {\n” + “ \”type\”: \”text\”,\n” + “ \”analyzer\”: \”ik_max_word\”,\n” + “ \”copy_to\”: \”all\”\n” + “ },\n” + “ \”address\”: {\n” + “ \”type\”: \”keyword\”,\n” + “ \”index\”: false\n” + “ },\n” + “ \”price\”: {\n” + “ \”type\”: \”integer\”\n” + “ },\n” + “ \”score\”: {\n” + “ \”type\”: \”integer\”\n” + “ },\n” + “ \”brand\”: {\n” + “ \”type\”: \”keyword\”,\n” + “ \”copy_to\”: \”all\”\n” + “ },\n” + “ \”city\”: {\n” + “ \”type\”: \”keyword\”\n” + “ },\n” + “ \”starName\”: {\n” + “ \”type\”: \”keyword\”\n” + “ },\n” + “ \”business\”: {\n” + “ \”type\”: \”keyword\”,\n” + “ \”copy_to\”: \”all\”\n” + “ },\n” + “ \”location\”: {\n” + “ \”type\”: \”geo_point\”\n” + “ },\n” + “ \”pic\”: {\n” + “ \”type\”: \”keyword\”,\n” + “ \”index\”: false\n” + “ },\n” + “ \”all\”: {\n” + “ \”type\”: \”text\”,\n” + “ \”analyzer\”: \”ik_max_word\”\n” + “ }\n” + “ }\n” + “ }\n” + “}”; }

**初始化&创建索引**
```java
public class HotelIndexTest {
    private RestHighLevelClient client;

    @Test
    void testInit() {
        System.out.println(client);
    }
    @Test
    void createHotelIndex() throws IOException {
        //1.创建Request对象
        CreateIndexRequest request = new CreateIndexRequest("hotel");
        //2.准备请求的参数,DSL语句
        request.source(MAPPING_TEMPLATE, XContentType.JSON);
        //3.发送请求
        client.indices().create(request, RequestOptions.DEFAULT);
    }

    @BeforeEach
    void setUp() {
        this.client = new RestHighLevelClient(RestClient.builder(
                HttpHost.create("http://localhost:9200")
        ));
    }

    @AfterEach
    void tearDown() throws IOException {
        this.client.close();
    }
}

删除&判断存在

@Test
void testDeleteHotelIndex() throws IOException {
    //1.创建Request对象
    DeleteIndexRequest request = new DeleteIndexRequest("hotel");
    //2.发送请求
    client.indices().delete(request, RequestOptions.DEFAULT);
}
@Test
void testExistsIndex() throws IOException {
    //1.创建Request对象
    GetIndexRequest request = new GetIndexRequest("hotel");
    //2.发送请求
    boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
    //3.输出
    System.err.println(exists?"索引库已存在!":"索引库不存在!");
}

RestClient操作文档

从数据库查询数据导入hotel索引库,实现数据CRUD
ps:修改application.yaml中的url、username、password
新增文档

@Test
void testAddDocumnet() throws IOException {
    //根据id查询酒店数据
    Hotel hotel = hotelService.getById(61083L);
    //转换为文档类型
    HotelDoc hotelDoc = new HotelDoc(hotel);
    //1.准备Request对象
    IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
    //2.准备Json文档
    request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
    //3.发送请求
    client.index(request, RequestOptions.DEFAULT);
}

查询文档

@Test
void testGetDocumentById() throws IOException {
    //1.准备Request
    GetRequest request = new GetRequest("hotel", "61083");
    //2.发送请求,得到相应
    GetResponse response = client.get(request, RequestOptions.DEFAULT);
    //3.解析响应结果
    String json = response.getSourceAsString();
    HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
    System.out.println(hotelDoc);
}

更新文档
方式一:全量更新,删除id一样的旧文档
方式二:局部更新,只更新部分字段,演示方式二

@Test
void testUpdateDocumentById() throws IOException {
    //1.创建Request对象
    UpdateRequest request = new UpdateRequest("hotel", "61083");
    //2.准备参数,一对key-value
    request.doc(
        "price","999",
        "starName","四钻"
    );
    //3.更新文档,发送参数
    client.update(request,RequestOptions.DEFAULT);
}

删除文档

@Test
void testDeleteDocumentById() throws IOException {
    //1.准备Request
    DeleteRequest request = new DeleteRequest("hotel", "61083");
    //2.发送请求
    client.delete(request,RequestOptions.DEFAULT);
}

批量导入

@Test
void testBulkRequest() throws IOException {
    //1.批量查询数据并转换为hotelDoc
    List<Hotel> hotels = hotelService.list();
    BulkRequest request = new BulkRequest();
    //2.循环添加
    for(Hotel hotel : hotels){
        HotelDoc hotelDoc = new HotelDoc(hotel);
        request.add(new IndexRequest("hotel")
                    .id(hotelDoc.getId().toString())
                    .source(JSON.toJSONString(hotelDoc),XContentType.JSON));
    }
    //3.提交请求
    client.bulk(request,RequestOptions.DEFAULT);
}

GET /hotel/_search

DSL

DSL查询语法

简单查询包括

  • 查询所有:所有数据,一般测试用
  • 全文检索:分词。例:match,multi_match
  • 精确查询:不分词。例:term,range
  • 地理查询:根据经纬度。例:geo_bounding_box,geo_distance
GET /索引库名/_search
{"query": {"查询类型":{"FIELD": "TEXT"}}}
# 全文检索
GET /hotel/_search
{
  "query": {
    "match": {
      "all": "外滩"
    }
  }
}
# 精确查询
GET /hotel/_search
{
  "query": {
    "term": {
      "city": {
        "value": "上海"
      }
    }
  }
}
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "gte": 100,
        "lte": 300
      }
    }

  }
}
# 地理查询
GET /hotel/_search
{
  "query": {
    "geo_distance":{
      "distance": "5km",
      "location": "31.21,121.5"
    }
  }
}

复合查询

  • 相关性算分:match查询时,文档结果根据与搜索词条的关联度打分,返回排序

image.png
image.png

# function score查询
GET /hotel/_search
{
  "query": {
    "function_score": {
      "query": {
        "match": {
          "all": "外滩"
        }
      },
      "functions": [
        {
          "filter": {
            "term": {
              "brand": "如家"
            }
          },
          "weight": 10
        }
      ],
      "boost_mode": "sum"
    }
  }
}

子查询组合方式

  • must:必须匹配每个子查询
  • should:选择匹配子查询
  • must_not:必须不匹配,不参与算分
  • filter:必须匹配,不参与算分

demo:搜索名字包含”如家”,价格不高于400,在坐标31.21,121.5周围10km酒店

# boolean query
GET /hotel/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "name": "如家"
          }
        }
      ],
      "must_not": [
        {
          "range": {
            "price": {
              "gt": 400
            }
          }
        }
      ],
      "filter": [
        {
          "geo_distance": {
            "distance": "10km",
            "location": {
              "lat": 31.21,
              "lon": 121.5
            }
          }
        }
      ]
    }
  }
}

搜索结果处理

排序

默认根据相关度算分排序,可自定义排序字段

# 排序
GET /hotel/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "score": "desc"
    },
    {
      "price": "asc"
    }
  ]
}

找附近酒店

# 排序
GET /hotel/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "_geo_distance": {
        "location": {
          "lat": 31.034662,
          "lon": 121.612282
        },
        "order": "asc",
        "unit": "km"
      }
    }
  ]
}

分页

# 分页
GET /hotel/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "price": "asc"
    }
  ],
  "from": 10,
  "size": 10
}

深度分页
针对深度分页,ES提供了两种解决方案,官方文档

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
  • scroll:原理将排序后的文档id形成快照,保存在内存。官方已经不推荐使用。

分页查询的常见实现方案以及优缺点:

  • from + size:
    • 优点:支持随机翻页
    • 缺点:深度分页问题,默认查询上限(from + size)是10000
    • 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
  • after search:
    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:只能向后逐页查询,不支持随机翻页
    • 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
  • scroll:
    • 优点:没有查询上限(单次查询的size不超过10000)
    • 缺点:会有额外内存消耗,并且搜索结果是非实时的
    • 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用 after search方案。

      高亮

      GET /hotel/_search
      {
      "query": {
      "match": {
       "all": "如家"
      }
      },
      "highlight": {
      "fields": {
       "name": {
         "require_field_match": "false"
       }
      }
      }
      }
      

RestClient查询文档

快速入门

image.png

@Test
void testMatchAll() throws IOException {
  //1.准备Request
  SearchRequest request = new SearchRequest("hotel");
  //2.准备DSL
  request.source().query(QueryBuilders.matchAllQuery());
  //3.发送请求
  SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  //4.解析response
  SearchHits searchHits = response.getHits();
  long total = searchHits.getTotalHits().value;
  System.out.println("条数:"+total);
  SearchHit[] hits = searchHits.getHits();
  for (SearchHit hit : hits) {
    //获取文档source
    String json = hit.getSourceAsString();
    //反序列化
    HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
    System.out.println("hotelDoc="+hotelDoc);
  }
  System.out.println(response);
    }

抽取代码封装(ctrl+alt+m)

QueryBuilders

@Test
void testBool() throws IOException {
  //1.准备Request
  SearchRequest request = new SearchRequest("hotel");
  //2.准备DSL
  //2.1 准备BooleanQuery
  BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
  boolQuery.must(QueryBuilders.termQuery("city","上海"))
    .filter(QueryBuilders.rangeQuery("price").lte(250));
  request.source().query(boolQuery);
  //3.发送请求
  SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  handleResponse(response);
  System.out.println(response);
}

排序和分页

image.png

@Test
void testPageAndSort() throws IOException {
  //1.准备Request
  SearchRequest request = new SearchRequest("hotel");
  int page = 1, size = 5;
  //2.准备DSL
  //2.1 query
  //2.2排序
  //2.3分页
  request.source().query(QueryBuilders.matchAllQuery())
    .sort("price", SortOrder.ASC)
    .from((page - 1)*size).size(size);
  //3.发送请求
  SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  handleResponse(response);
  System.out.println(response);
}

高亮显示

image.png

image.png

//获取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
//健壮值处理
if(!CollectionUtils.isEmpty(highlightFields)){
  HighlightField highlightField = highlightFields.get("name");
  if(highlightField != null){
    //获取高亮值
    String name = highlightField.getFragments()[0].string();
    hotelDoc.setName(name);
  }
}

黑马旅游

demo1:实现搜索功能,完成关键字搜索和分页

  1. 定义实体类,接收前端请求 ```javascript @Data public class RequestParams { private String key; private Integer page; private Integer size; private String sortBy; }
```javascript
@Data
public class PageResult {
    private Long total;
    private List<HotelDoc> hotels;

    public PageResult(Long total, List<HotelDoc> hotels) {
        this.total = total;
        this.hotels = hotels;
    }

    public PageResult() {
    }
}
  1. 定义Controller接口,接收页面请求,调用IHotelService的search方法

ctrl+alt+t 封装抛出异常

@RestController
@RequestMapping("/hotel")
public class HotelController {
    @Autowired
    private IHotelService hotelService;

    @PostMapping("/list")
    public PageResult search(@RequestBody RequestParams params){
        return hotelService.search(params);
    }
}

  1. @Service
    public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {
     @Autowired
     private RestHighLevelClient client;
    
     @Override
     public PageResult search(RequestParams params) {
         try {
             //1.准备Request
             SearchRequest request = new SearchRequest("hotel");
             //2.准备DSL
             //2.1 query
             String key = params.getKey();
             if(key == null || "".equals(key)){
                 request.source().query(QueryBuilders.matchAllQuery());
             }else{
                 request.source().query(QueryBuilders.matchQuery("all",key));
             }
             //2.2 分页
             int page = params.getPage();
             int size = params.getSize();
             request.source().from((page - 1) * size).size(size);
             //3.发送请求,得到响应
             SearchResponse response = client.search(request, RequestOptions.DEFAULT);
             //4.解析相应
             return handleResponse(response);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
    
     private PageResult handleResponse(SearchResponse response) {
         //4.解析response
         SearchHits searchHits = response.getHits();
         long total = searchHits.getTotalHits().value;
         System.out.println("条数:" + total);
         SearchHit[] hits = searchHits.getHits();
         List<HotelDoc> hotels = new ArrayList<>();
         for (SearchHit hit : hits) {
             //获取文档source
             String json = hit.getSourceAsString();
             //反序列化
             HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
             hotels.add(hotelDoc);
             System.out.println("hotelDoc=" + hotelDoc);
         }
         return new PageResult(total,hotels);
     }
    }
    

    demo2:添加品牌、城市、星级、价格等过滤功能

  2. 修改RequestParams类,添加brand、city、starName、minPrice、maxPrice等参数

  3. 修改searhc方法,在搜索时,如果参数存在,对其过滤

    private void buildBasicQuery(RequestParams params, SearchRequest request, BoolQueryBuilder boolQuery) {
     //关键字搜索
     String key = params.getKey();
     if(key == null || "".equals(key)){
         boolQuery.must(QueryBuilders.matchAllQuery());
     }else{
         boolQuery.must(QueryBuilders.matchQuery("all",key));
     }
     //条件过滤
     if(params.getCity() != null && !params.getCity().equals("")){
         boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
     }
     if(params.getBrand() != null && !params.getBrand().equals("")){
         boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
     }
     if(params.getStarName() != null && !params.getStarName().equals("")){
         boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
     }
     //范围过滤
     if(params.getMinPrice() != null && params.getMinPrice() != null){
         boolQuery.filter(QueryBuilders.
                          rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
     }
     request.source().query(boolQuery);
    }
    

    demo3:我附近的酒店

    image.png

  4. 修改RequestParams参数,接收location字段

  5. 修改search方法业务逻辑,如果location有值,添加根据geo_distance排序功能

    //2.3 排序
    String location = params.getLocation();
    if(location != null && !location.equals("")){
     request.source().sort(SortBuilders
                           .geoDistanceSort("location",new GeoPoint(location))
                           .order(SortOrder.ASC)
                           .unit(DistanceUnit.KILOMETERS)
                          );
    }
    
    //获取排序值
    Object[] sortValues = hit.getSortValues();
    if(sortValues.length > 0){
     Object sortValue = sortValues[0];
     hotelDoc.setDistance(sortValue);
    }
    

    demo4:指定酒店在搜索结果中排名置顶

  6. 给Hotel类添加isAD字段,Boolean类型

  7. 挑选几个酒店,添加isAD字段(用DSL)

    POST /hotel/_update/205612683
    {
    "doc": {
     "isAD": true
    }
    }
    POST /hotel/_update/2056105938
    {
    "doc": {
     "isAD": true
    }
    
  8. 修改search方法,添加function score功能,给isAD为true酒店增加权重

    //算分控制
    FunctionScoreQueryBuilder functionScoreQueryBuilder =
     QueryBuilders.functionScoreQuery(
     //原始查询,相关性算分查询
     boolQuery,
     //function socre的数组
     new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
         //其中一个function score元素
         new FunctionScoreQueryBuilder.FilterFunctionBuilder(
             //过滤条件
             QueryBuilders.termQuery("isAD",true),
             //算分函数
             ScoreFunctionBuilders.weightFactorFunction(10)
         )
         });
    request.source().query(functionScoreQueryBuilder);
    

数据聚合

聚合(aggregations)实现对文档数据统计、分析、运算。常见三类:

  • 桶聚合:对文档做分组,比如按日期、字段值
  • 度量聚合:用以计算值,比如max、min、average
  • 管道聚合:其他聚合的结果为基础做聚合

聚合的字段不可分词

DSL实现Bucket聚合

# 聚合
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 10,
        "order": {
          "_count": "asc"
        }
      }
    }
  }
}

默认文档数_count,按降序,可自定义order

DSL实现Metrics聚合

聚合嵌套

# 嵌套聚合
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
          "scoreAgg.avg": "desc"   # 根据平均值排序
        }
      },
      "aggs": {
        "scoreAgg": {
          "stats": {
            "field": "score"        # 度量聚合
          }
        }
      }
    }
  }
}

RestAPI实现聚合

image.png

image.png

@Test
void testAggregation() throws IOException {
    //1.准备Request
    SearchRequest request = new SearchRequest("hotel");
    //2.准备DSL
    //2.1设置size
    request.source().size(0);
    request.source().aggregation(AggregationBuilders
                                 .terms("brandAgg")
                                 .field("brand")
                                 .size(10)
                                );
    //3.发出请求
    SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    //4.解析结果
    Aggregations aggregations = response.getAggregations();
    //4.1根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get("brandAgg");
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        System.out.println(key);
    }
    System.out.println(response);
}

多条件聚合

demo:在IUserService定义方法,实现对品牌、城市、星级的聚合

@Override
public Map<String, List<String>> filters() {
    try {
        //1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        //2.准备DSL
        //2.1设置size
        request.source().size(0);
        buildAggregation(request);
        //3.发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        //4.解析结果
        Map<String,List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        //根据名称获取品牌结果
        List<String> brandList = getAggByName(aggregations,"brandAgg");
        List<String> cityList = getAggByName(aggregations,"cityAgg");
        List<String> starList = getAggByName(aggregations,"starAgg");
        //4.4放入map
        result.put("品牌",brandList);
        result.put("城市",cityList);
        result.put("星级",starList);
        System.out.println(response);
        return result;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

private List<String> getAggByName(Aggregations aggregations, String aggName) {
    //4.1根据聚合名称获取聚合结果
    Terms brandTerms = aggregations.get(aggName);
    //4.2获取buckets
    List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
    //4.3遍历
    List<String> brandList = new ArrayList<>();
    for (Terms.Bucket bucket : buckets) {
        String key = bucket.getKeyAsString();
        brandList.add(key);
    }
    return brandList;
}

测试类:

@SpringBootTest
class HotelDemoApplicationTests {

    @Autowired
    private IHotelService hotelService;

    @Test
    void contextLoads() {
        Map<String, List<String>> filters = hotelService.filters();
        System.out.println(filters);
    }

}

带过滤条件的聚合

输入栏控制过滤条件,比如输入虹桥时,城市只显示上海(含结果的城市)
对接前端接口

  1. 编写controller接口,接收请求
  2. 修改IUserServiceGetFilters()方法,添加RequestParam参数
  3. 修改getFilters方法的业务,聚合时添加query条件 ```java @Override public Map> filters(RequestParams params) { try {

     //1.准备Request
     SearchRequest request = new SearchRequest("hotel");
     //2.准备DSL
     //query
     buildBasicQuery(params, request, QueryBuilders.boolQuery());
    
     //2.1设置size
     request.source().size(0);
     buildAggregation(request);
     //3.发出请求
     SearchResponse response = client.search(request, RequestOptions.DEFAULT);
     //4.解析结果
     Map<String,List<String>> result = new HashMap<>();
     Aggregations aggregations = response.getAggregations();
     //根据名称获取品牌结果
     List<String> brandList = getAggByName(aggregations,"brandAgg");
     List<String> cityList = getAggByName(aggregations,"cityAgg");
     List<String> starList = getAggByName(aggregations,"starAgg");
     //4.4放入map
     result.put("brand",brandList);
     result.put("city",cityList);
     result.put("starName",starList);
     System.out.println(response);
     return result;
    

    } catch (IOException e) {

     throw new RuntimeException(e);
    

    } }

private List getAggByName(Aggregations aggregations, String aggName) { //4.1根据聚合名称获取聚合结果 Terms brandTerms = aggregations.get(aggName); //4.2获取buckets List<? extends Terms.Bucket> buckets = brandTerms.getBuckets(); //4.3遍历 List brandList = new ArrayList<>(); for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString(); brandList.add(key); } return brandList; }

ps:map的key应改为brand、city、starName,否则前台显示空白,视频有误!!<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1123881/1638604510463-d8ef0f49-eb41-4c4a-8aef-1052131807e9.png#clientId=u1a1cf321-c86a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=68&id=ua277e31e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=135&originWidth=530&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19497&status=done&style=none&taskId=u0f803ab6-e19e-4b39-bdfa-e8299f3acd2&title=&width=265)
<a name="iuhLP"></a>
## 自动补全
<a name="aZVXi"></a>
### 安装拼音分词器
[https://github.com/medcl/elasticsearch-analysis-pinyin](https://github.com/medcl/elasticsearch-analysis-pinyin)

1. 下载
1. 解压到es的plugin目录
1. 重启es,测试
```java
POST /_analyze
{
  "text": ["如家还不错"],
  "analyzer": "pinyin"
}

自定义分词器

es中的分词器的组成:

  • character filters:删除字符、替换字符
  • tokenizer:按规则分词
  • tokenizer filter:进一步处理,如大小写转换、同义词处理、拼音处理

    // 自定义拼音分词器
    PUT /test
    {
    "settings": {
      "analysis": {
        "analyzer": { 
          "my_analyzer": { 
            "tokenizer": "ik_max_word",
            "filter": "py"
          }
        },
        "filter": {
          "py": { 
            "type": "pinyin",
            "keep_full_pinyin": false,
            "keep_joined_full_pinyin": true,
            "keep_original": true,
            "limit_first_letter_length": 16,
            "remove_duplicated_term": true,
            "none_chinese_pinyin_tokenize": false
          }
        }
      }
    }
    }
    
    POST /test/_analyze
    {
    "text": ["如家还不错"],
    "analyzer": "my_analyzer"
    }
    

    拼音分词器适合在创建倒排索引时使用,不能在搜索时使用
    因此在创建时应用my_analyzer分词器;搜索时用ik_smart分词器

    PUT /test
    {
    "settings": {
      "analysis": {
        "analyzer": { 
          "my_analyzer": { 
            "tokenizer": "ik_max_word",
            "filter": "py"
          }
        },
        "filter": {
          "py": { 
            "type": "pinyin",
            "keep_full_pinyin": false,
            "keep_joined_full_pinyin": true,
            "keep_original": true,
            "limit_first_letter_length": 16,
            "remove_duplicated_term": true,
            "none_chinese_pinyin_tokenize": false
          }
        }
      }
    },
    "mappings": {
      "properties": {
        "name": {
          "type": "text",
          "analyzer": "my_analyzer",
          "search_analyzer": "ik_smart"
        }
      }
    }
    }
    

    DSL实现自动补全

    completion suggester查询

  • 参与补全查询的字段completion类型

  • 字段内容一般用来补全多个词条形成的数组
    # 自动补全的索引库
    PUT test2
    {
    "mappings": {
      "properties": {
        "title":{
          "type": "completion"
        }
      }
    }
    }
    # 示例数据
    POST test2/_doc
    {
    "title": ["Sony", "WH-1000XM3"]
    }
    POST test2/_doc
    {
    "title": ["SK-II", "PITERA"]
    }
    POST test2/_doc
    {
    "title": ["Nintendo", "switch"]
    }
    
    ```json

    示例数据

    POST test2/_doc { “title”: [“Sony”, “WH-1000XM3”] } POST test2/_doc { “title”: [“SK-II”, “PITERA”] } POST test2/_doc { “title”: [“Nintendo”, “switch”] }

GET /test2/_search { “suggest”: { “titleSuggest”: { “text”: “s”, “completion”: { “field”: “title”, “skip_duplicates”: true, “size”: 10 } } } }

<a name="VFdkI"></a>
### 酒店数据自动补全
**demo:实现hotel索引库的自动补全、拼音搜索功能**

1. 修改hotel索引库结构,设置自定义拼音分词器
1. 修改索引name、all字段,使用自定义分词器
1. 索引库添加字段suggestion,类型completion,使用自定义分词器
1. 给HotelDoc类型添加suggestion字段,包含brand、business
1. 重新导入数据到hotel库
```json
# 酒店数据索引库
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

创建分词器

  • text_analyzer:做汉字和拼音分词,适用text
  • completion_analyzer:只做拼音分词,适用keyword

_mapping

  • analyzer:创建倒排索引时分词器
  • search_analyzer:搜索时分词器,只中文不拼音

suggestion

  • 用于补全的字段
    if(this.business.contains("/")){
    String[] arr = this.business.split("/");
    this.suggestion = new ArrayList<>();
    this.suggestion.add(this.brand);
    Collections.addAll(this.suggestion,arr);
    }else{
    this.suggestion = Arrays.asList(this.brand,this.business);
    }
    
    GET /hotel/_search
    {
    "suggest": {
      "suggestions": {
        "text": "h",
        "completion": {
          "field": "suggestion",
          "skip_duplicates": true,
          "size": 10
        }
      }
    }
    }
    

    RestAPI实现自动补全

    image.png

image.png

@Test
void testSuggest() throws IOException {
  //1.准备Request
  SearchRequest request = new SearchRequest("hotel");
//2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
  "suggestions",
  SuggestBuilders.completionSuggestion("suggestion")
  .prefix("h")
  .skipDuplicates(true)
  .size(10)
));
//3.发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
for (CompletionSuggestion.Entry.Option option : options) {
  String text = option.getText().toString();
System.out.println(text);
}
}

image.png

实现搜索框自动补全

image.png
↑在前端页面查看输入时的ajax请求,在服务端编写接口
ctrl+alt+b 接口跳转实现类

@GetMapping("suggestion")
public List<String> getSuggestions(@RequestParam("key")String prefix){
  return hotelService.getSuggestions(prefix);
}
@Override
public List<String> getSuggestions(String prefix) {
  try {
  //1.准备Request
  SearchRequest request = new SearchRequest("hotel");
  //2.准备DSL
  request.source().suggest(new SuggestBuilder().addSuggestion(
    "suggestions",
    SuggestBuilders.completionSuggestion("suggestion")
    .prefix(prefix)
    .skipDuplicates(true)
    .size(10)
  ));
  //3.发起请求
  SearchResponse response = client.search(request, RequestOptions.DEFAULT);
  //4.解析结果
  Suggest suggest = response.getSuggest();
  CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
  List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
  List<String> list = new ArrayList<>(options.size());
  for (CompletionSuggestion.Entry.Option option : options) {
    String text = option.getText().toString();
  list.add(text);
  }
  return list;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

数据同步

方案分析

方案一:同步调用
image.png

方案二:异步通知
image.png
方案三:监听binlog
image.png

导入酒店管理项目

demo:利用MQ实现mysql与es数据同步

  • 导入hotel-admn项目,测试CRUD
  • 生命exchange、queue、RoutingKey
  • 在hotel-admin的增删改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新es中数据
  • 联调测试

其他步骤:
安装Erlang、RabbitMQ
修改yml

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /

声明队列和交换机

image.png
ctrl+shift+u 转大写
MqConstants

package cn.itcast.hotel.constans;

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 监听队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";

    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";

    /**
     * RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";

    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

MqConfig

package cn.itcast.hotel.config;

import cn.itcast.hotel.constans.MqConstants;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }

    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

发送MQ消息

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());

rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);

监听MQ消息

@Override
public void deleteById(Long id) {
    try {
        //1.准备Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        //2.发送请求
        client.delete(request,RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        //0.查询酒店数据
        Hotel hotel = getById(id);
        //转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);
        //1.准备Request对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        //2.准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        //3.发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

测试同步功能⭐

image.pngimage.png学习该篇MQ,Vue调试相关

集群

集群结构介绍

单机es面临问题:海量数据存储、单点故障

  • 海量数据存储:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
  • 单点故障:将分片数据在不同节点备份(replica)

image.png

搭建集群

创建集群

安装elasticsearch.md
docker-compose文件

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

通过FinalShell上传文件
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

cerebro:启动bat
访问http://localhost:9000
image.png

创建索引库

image.png
image.png

集群职责和脑裂

image.png
典型es集群职责划分:
image.png
脑裂是因为集群中的节点失联导致的。

解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

分布式新增和查询流程

在9200插入三条数据
测试看到,三条数据分别在不同分片

image.png

分片原理

image.png
说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

新增文档流程:
image.png

分布式查询

elasticsearch的查询分成两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

image.png

故障转移

image.png

  1. 重新选主
  2. 数据迁移