聚合
聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算
聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
-
Bucket聚合语法:
Metric聚合语法:
RestAPI实现聚合
聚合条件语法

聚合结果解析
void testMetric() throws IOException {//1.创建对应请求SearchRequest request = new SearchRequest("hotel");//2.DSLrequest.source().size(0);request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("_count",true)));//3.发请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析结果Aggregations aggregations = response.getAggregations();Terms brandAgg = aggregations.get("brandAgg");List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();System.out.println(key);}}
案例:实现搜索栏条件
```java
package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Service
public class HotelService extends ServiceImpl
@Autowiredprivate RestHighLevelClient client;/*** 根据关键字搜索酒店信息** @param params 请求参数对象,包含用户输入的关键字* @return 酒店文档列表*/@Overridepublic PageResult search(RequestParams params) {try {//1.SearchRequest request = new SearchRequest("hotel");//2.准备DSL并添加条件addQuery(request, params);//分页int size = params.getSize();int begin = (params.getPage() - 1) * size;request.source().from(begin).size(size);//距离排序String location = params.getLocation();if (location != null && !location.equals("")) {request.source().sort(SortBuilders.geoDistanceSort("location", new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}/*** 条件查询拼接DSL** @param request* @param params* @return*/private void addQuery(SearchRequest request, RequestParams params) {//2.1query//构建boolQuery,多条件使用bool查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();String key = params.getKey();//判断搜索词if (key == null || "".equals(key)) {boolQuery.must(QueryBuilders.matchAllQuery());} else {boolQuery.must(QueryBuilders.matchQuery("all", key));}// 3.城市条件if (params.getCity() != null && !params.getCity().equals("")) {boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}// 4.品牌条件if (params.getBrand() != null && !params.getBrand().equals("")) {boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}// 5.星级条件if (params.getStarName() != null && !params.getStarName().equals("")) {boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//6.价格if (params.getMinPrice() != null && params.getMaxPrice() != null) {boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}//function_scoreFunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(// 原始查询,相关性算分的查询boolQuery,// function score的数组new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{// 其中的一个function score 元素new FunctionScoreQueryBuilder.FilterFunctionBuilder(//过滤条件QueryBuilders.termQuery("isAD", true),//算分函数ScoreFunctionBuilders.weightFactorFunction(10))});//放入条件request.source().query(functionScoreQuery);}/*** 搜索栏条件添加** @param params* @return*/@Overridepublic Map<String, List<String>> getFilters(RequestParams params) {try {//1.SearchRequest request = new SearchRequest("hotel");//2.准备DSL并添加条件addQuery(request, params);//2.正常条件request.source().size(0);//3.聚合条件addFilter(request);//4.发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//解析结果Aggregations aggregations = response.getAggregations();//放入不同mapMap<String, List<String>> map = new HashMap<>();List<String> cityList = getAggField(aggregations, "cityAgg");map.put("city", cityList);List<String> starList = getAggField(aggregations, "starAgg");map.put("starName", starList);List<String> brandList = getAggField(aggregations, "brandAgg");map.put("brand", brandList);return map;} catch (IOException e) {throw new RuntimeException(e);}}/*** 自动补全** @param key* @return*/@Overridepublic List<String> getSuggestions(String key) {try {//1.请求SearchRequest request = new SearchRequest("hotel");//2.DSL的java代码request.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(key).skipDuplicates(true).size(10)));//3.发送请求获取参数SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4.解析参数Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestions= suggest.getSuggestion("suggestions");// 4.2.获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();List<String>list=new ArrayList<>();//4.3遍历放入集合for (CompletionSuggestion.Entry.Option option : options) {String string = option.getText().string();list.add(string);}return list;} catch (IOException e) {throw new RuntimeException(e);}}/*** 新增、修改* @param id*/@Overridepublic void insertById(Long id) {try {//查询对应id的酒店信息Hotel hotel = getById(id);HotelDoc hotelDoc=new HotelDoc(hotel);//请求IndexRequest request = new IndexRequest("hotel").id(id.toString());//组装DSLrequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);//发请求client.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 删除* @param id*/@Overridepublic void deleteById(Long id) {try {//创建DeleteRequest request = new DeleteRequest("hotel",id.toString());client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 得到对应搜索框信息** @param aggregations* @param aggName* @return*/private List<String> getAggField(Aggregations aggregations, String aggName) {Terms aggTerms = aggregations.get(aggName);List<? extends Terms.Bucket> buckets = aggTerms.getBuckets();List<String> list = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();list.add(key);}return list;}/*** 聚合条件** @param request*/private void addFilter(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));}/*** 结果解析** @param response* @return*/private PageResult handleResponse(SearchResponse response) {//解析结果SearchHits searchHits = response.getHits();//总数long total = searchHits.getTotalHits().value;//集合SearchHit[] hits = searchHits.getHits();List<HotelDoc> hotelDocList = new ArrayList<>();for (SearchHit hit : hits) {//酒店对象JSONString hitSourceAsString = hit.getSourceAsString();//转换对象hotelDocHotelDoc hotelDoc = JSON.parseObject(hitSourceAsString, HotelDoc.class);//距离Object[] sortValues = hit.getSortValues();if (sortValues != null && sortValues.length > 0) {hotelDoc.setDistance(sortValues[0]);}hotelDocList.add(hotelDoc);}return new PageResult(total, hotelDocList);}
}
<a name="oioHr"></a>### 自动补全拼音分词器:<br />安装方式与IK分词器一样,分三步:<br /> ①解压<br /> ②上传到虚拟机中,elasticsearch的plugin目录<br /> ③重启elasticsearch<br /> ④测试<br />```javaPOST /_analyze{ "text": "如家酒店还不错", "analyzer": "pinyin"}
创建表时添加suggestion字段用于补全
只有在创建表的时候添加
// 酒店数据索引库
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}

实例DSL:
解析:
@Override
public List<String> getSuggestions(String prefix) {
try {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
// 3.发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Suggest suggest = response.getSuggest();
// 4.1.根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2.获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3.遍历
List<String> list = new ArrayList<>(options.size());
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
ES索引和数据库的数据同步问题:
三种数据同步方法
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog——相对高级
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
流程:
需要用到的模块都需要导入
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml中申明rabbitMq的端口。。
spring:
rabbitmq:
host: 121.36.164.132 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
可以用配置类声明也可以直接用注解,注解详细见上一片ES
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_INSERT_KEY
,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_INSERT_KEY
,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_DELETE_KEY
,id);
}
编写监听器
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
面试相关问题:
数据同步的解决方案(面试)
有以下几种解决方案
同步调用:一般出现在传统类型的项目中,耦合度高,速度较慢
异步通知:使用MQ队列来完成异步和一定程度的解耦,依赖MQ的可靠性
监听binlog:可以通过阿里与的canal组件来监听mysql的binlog日志来实现,可以完全的解耦,开启binlog会增加数据库负担、实现复杂度高
ES内部的实现原理
数据插入的分片如何确定
先通过hash算法算出数据的hash值,然后用这个值去和分片的数量进行%运算,得到的值就是放入第几个分片
,也因此,索引库一旦创建,分片数量就不能修改
数据的查询
分为两个阶段
第一阶段是分散阶段,在这个阶段,协调节点会把请求分发到每一个分片
第二阶段是聚集阶段,这个阶段,协调节点会汇总分片的数据结果,并经过处理后将最终结果返回给用户
脑裂(面试)
数据脑裂:在一个集群中出现了不止一个master,这就是脑裂
解决:
配置方面:使用奇数个节点,因为选取master要满足半数以上原则
增加内存,防止内存不够带来的无法响应
增加响应超时时间,把心跳响应时间从默认的30s调整为更长的时间,来避免网络波动带来的无法接受心跳消息
较少访问数量,不要在短时间内对master进行太高的写请求并发,防止高并发带来的无法处理其他请求信息
ES的集群故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这就是故障转移
