聚合
聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算
聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
- Avg:求平均值
- Max:求最大值
- Min:求最小值
- Stats:同时求max、min、avg、sum等
-
Bucket聚合语法:
Metric聚合语法:
RestAPI实现聚合
聚合条件语法
聚合结果解析void testMetric() throws IOException {
//1.创建对应请求
SearchRequest request = new SearchRequest("hotel");
//2.DSL
request.source().size(0);
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(20)
.order(BucketOrder.aggregation("_count",true))
);
//3.发请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析结果
Aggregations aggregations = response.getAggregations();
Terms brandAgg = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
案例:实现搜索栏条件
```java package cn.itcast.hotel.service.impl;
import cn.itcast.hotel.mapper.HotelMapper; import cn.itcast.hotel.pojo.Hotel; import cn.itcast.hotel.pojo.HotelDoc; import cn.itcast.hotel.pojo.PageResult; import cn.itcast.hotel.pojo.RequestParams; import cn.itcast.hotel.service.IHotelService; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
@Service
public class HotelService extends ServiceImpl
@Autowired
private RestHighLevelClient client;
/**
* 根据关键字搜索酒店信息
*
* @param params 请求参数对象,包含用户输入的关键字
* @return 酒店文档列表
*/
@Override
public PageResult search(RequestParams params) {
try {
//1.
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL并添加条件
addQuery(request, params);
//分页
int size = params.getSize();
int begin = (params.getPage() - 1) * size;
request.source().from(begin).size(size);
//距离排序
String location = params.getLocation();
if (location != null && !location.equals("")) {
request.source().sort(SortBuilders
.geoDistanceSort("location", new GeoPoint(location))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);
}
//发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 解析响应
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 条件查询拼接DSL
*
* @param request
* @param params
* @return
*/
private void addQuery(SearchRequest request, RequestParams params) {
//2.1query
//构建boolQuery,多条件使用bool查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
String key = params.getKey();
//判断搜索词
if (key == null || "".equals(key)) {
boolQuery.must(QueryBuilders.matchAllQuery());
} else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
// 3.城市条件
if (params.getCity() != null && !params.getCity().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
}
// 4.品牌条件
if (params.getBrand() != null && !params.getBrand().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
}
// 5.星级条件
if (params.getStarName() != null && !params.getStarName().equals("")) {
boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
}
//6.价格
if (params.getMinPrice() != null && params.getMaxPrice() != null) {
boolQuery.filter(QueryBuilders.rangeQuery("price")
.gte(params.getMinPrice())
.lte(params.getMaxPrice()));
}
//function_score
FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
// 原始查询,相关性算分的查询
boolQuery,
// function score的数组
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
// 其中的一个function score 元素
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
//过滤条件
QueryBuilders.termQuery("isAD", true),
//算分函数
ScoreFunctionBuilders.weightFactorFunction(10)
)
}
);
//放入条件
request.source().query(functionScoreQuery);
}
/**
* 搜索栏条件添加
*
* @param params
* @return
*/
@Override
public Map<String, List<String>> getFilters(RequestParams params) {
try {
//1.
SearchRequest request = new SearchRequest("hotel");
//2.准备DSL并添加条件
addQuery(request, params);
//2.正常条件
request.source().size(0);
//3.聚合条件
addFilter(request);
//4.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//解析结果
Aggregations aggregations = response.getAggregations();
//放入不同map
Map<String, List<String>> map = new HashMap<>();
List<String> cityList = getAggField(aggregations, "cityAgg");
map.put("city", cityList);
List<String> starList = getAggField(aggregations, "starAgg");
map.put("starName", starList);
List<String> brandList = getAggField(aggregations, "brandAgg");
map.put("brand", brandList);
return map;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 自动补全
*
* @param key
* @return
*/
@Override
public List<String> getSuggestions(String key) {
try {
//1.请求
SearchRequest request = new SearchRequest("hotel");
//2.DSL的java代码
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(key)
.skipDuplicates(true)
.size(10)
));
//3.发送请求获取参数
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4.解析参数
Suggest suggest = response.getSuggest();
// 4.1.根据补全查询名称,获取补全结果
CompletionSuggestion suggestions= suggest.getSuggestion("suggestions");
// 4.2.获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
List<String>list=new ArrayList<>();
//4.3遍历放入集合
for (CompletionSuggestion.Entry.Option option : options) {
String string = option.getText().string();
list.add(string);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 新增、修改
* @param id
*/
@Override
public void insertById(Long id) {
try {
//查询对应id的酒店信息
Hotel hotel = getById(id);
HotelDoc hotelDoc=new HotelDoc(hotel);
//请求
IndexRequest request = new IndexRequest("hotel").id(id.toString());
//组装DSL
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//发请求
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 删除
* @param id
*/
@Override
public void deleteById(Long id) {
try {
//创建
DeleteRequest request = new DeleteRequest("hotel",id.toString());
client.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 得到对应搜索框信息
*
* @param aggregations
* @param aggName
* @return
*/
private List<String> getAggField(Aggregations aggregations, String aggName) {
Terms aggTerms = aggregations.get(aggName);
List<? extends Terms.Bucket> buckets = aggTerms.getBuckets();
List<String> list = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
list.add(key);
}
return list;
}
/**
* 聚合条件
*
* @param request
*/
private void addFilter(SearchRequest request) {
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("starAgg")
.field("starName")
.size(100));
request.source().aggregation(AggregationBuilders
.terms("cityAgg")
.field("city")
.size(100)
);
}
/**
* 结果解析
*
* @param response
* @return
*/
private PageResult handleResponse(SearchResponse response) {
//解析结果
SearchHits searchHits = response.getHits();
//总数
long total = searchHits.getTotalHits().value;
//集合
SearchHit[] hits = searchHits.getHits();
List<HotelDoc> hotelDocList = new ArrayList<>();
for (SearchHit hit : hits) {
//酒店对象JSON
String hitSourceAsString = hit.getSourceAsString();
//转换对象hotelDoc
HotelDoc hotelDoc = JSON.parseObject(hitSourceAsString, HotelDoc.class);
//距离
Object[] sortValues = hit.getSortValues();
if (sortValues != null && sortValues.length > 0) {
hotelDoc.setDistance(sortValues[0]);
}
hotelDocList.add(hotelDoc);
}
return new PageResult(total, hotelDocList);
}
}
<a name="oioHr"></a>
### 自动补全
拼音分词器:<br />安装方式与IK分词器一样,分三步:<br /> ①解压<br /> ②上传到虚拟机中,elasticsearch的plugin目录<br /> ③重启elasticsearch<br /> ④测试<br />
```java
POST /_analyze{ "text": "如家酒店还不错", "analyzer": "pinyin"}
创建表时添加suggestion字段用于补全
只有在创建表的时候添加
// 酒店数据索引库
PUT /hotel
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}
实例DSL:
解析:
@Override
public List<String> getSuggestions(String prefix) {
try {
// 1.准备Request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix(prefix)
.skipDuplicates(true)
.size(10)
));
// 3.发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Suggest suggest = response.getSuggest();
// 4.1.根据补全查询名称,获取补全结果
CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
// 4.2.获取options
List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
// 4.3.遍历
List<String> list = new ArrayList<>(options.size());
for (CompletionSuggestion.Entry.Option option : options) {
String text = option.getText().toString();
list.add(text);
}
return list;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
ES索引和数据库的数据同步问题:
三种数据同步方法
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方式二:异步通知
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog——相对高级
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
流程:
需要用到的模块都需要导入
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml中申明rabbitMq的端口。。
spring:
rabbitmq:
host: 121.36.164.132 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
可以用配置类声明也可以直接用注解,注解详细见上一片ES
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_INSERT_KEY
,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_INSERT_KEY
,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE
,MQConstants.HOTEL_DELETE_KEY
,id);
}
编写监听器
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
面试相关问题:
数据同步的解决方案(面试)
有以下几种解决方案
同步调用:一般出现在传统类型的项目中,耦合度高,速度较慢
异步通知:使用MQ队列来完成异步和一定程度的解耦,依赖MQ的可靠性
监听binlog:可以通过阿里与的canal组件来监听mysql的binlog日志来实现,可以完全的解耦,开启binlog会增加数据库负担、实现复杂度高
ES内部的实现原理
数据插入的分片如何确定
先通过hash算法算出数据的hash值,然后用这个值去和分片的数量进行%运算,得到的值就是放入第几个分片
,也因此,索引库一旦创建,分片数量就不能修改
数据的查询
分为两个阶段
第一阶段是分散阶段,在这个阶段,协调节点会把请求分发到每一个分片
第二阶段是聚集阶段,这个阶段,协调节点会汇总分片的数据结果,并经过处理后将最终结果返回给用户
脑裂(面试)
数据脑裂:在一个集群中出现了不止一个master,这就是脑裂
解决:
配置方面:使用奇数个节点,因为选取master要满足半数以上原则
增加内存,防止内存不够带来的无法响应
增加响应超时时间,把心跳响应时间从默认的30s调整为更长的时间,来避免网络波动带来的无法接受心跳消息
较少访问数量,不要在短时间内对master进行太高的写请求并发,防止高并发带来的无法处理其他请求信息
ES的集群故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这就是故障转移