09、商城业务.pdf
es主要负责的场景有两个:
- 负责商品的全文检索
日志的检索(ELK ES + LogStash + Kibana)
一、es存储分析
es是将数据存储在内存中的,所以在存储时需要分析应该存储哪些内容,同时兼顾到时间与空间的消耗。
1.1 需要存储什么信息到es中
需要保存sku信息
- 当搜索商品名的时候,检索到的是sku的sku_title
- 还会通过sku的销量、标题、价格区间来进行检索
- 需要保存品牌分类等信息
- 点击分类,检索分类下的所有信息
- 点击品牌,检索品牌下的商品信息
- 需要保存spu的规格信息——选择规格的时候会检索共有这些规格的spu信息
1.2 如何设计存储结构 商品Mapping
1)、检索的时候输入名字,是需要按照 sku 的 title 进行全文检索的
2)、检索使用商品规格,规格是 spu 的公共属性,每个 spu 是一样的
3)、按照分类 id 进去的都是直接列出 spu 的,还可以切换。
4)、我们如果将 sku 的全量信息保存到 es 中(包括 spu 属性)就太多量字段了。
5)、我们如果将 spu 以及他包含的 sku 信息保存到 es 中,也可以方便检索。但是 sku 属于
spu 的级联对象,在 es 中需要 nested 模型,这种性能差点。
6)、但是存储与检索我们必须性能折中。
7)、如果我们分拆存储,spu 和 attr 一个索引,sku 单独一个索引可能涉及的问题。 ```json 建一个sku索引,只保存非冗余字段 { skuId:1 spuId:11 xxxx } 再建一个attr索引 { spuId:11 attrs:[{
skuId:1
spuId:11
skuTitle:"华为xx"
price:998
saleCount:99
attrs:[
{尺寸:"5寸"},
{CPU:高通945},
{分辨率:全高清}
]
}
// 优点:方便检索
// 缺点:会产生冗余字段,对于属于同一spu的商品,attrs字段数据会冗余
] } // 优点:空间利用率高 // 缺点:查询attr时,会检索当前所有的spu,耗时很长 假设搜索小米, 粮食、手机、电器等有很多sku都包含有小米 10000个包含小米的sku,涉及到4000个spu,我们会找到4000个spu的attr进行聚合,通过分步查询:{尺寸:"5寸"},
{CPU:高通945},
{分辨率:全高清}
- 查出包含小米的10000个sku,其涉及到4000个spu
- 查出4000个spu对应的所有可能属性 esClient:spuId:[4000个spuId] 40008=32000byte=32kb
32kb10000=320mb; 如果是百万并发会发送32GB的数据
检索商品的名字,如“手机”,对应的 spu 有很多,我们要分析出这些 spu 的所有关联属性,再做一次查询,就必须将所有 spu_id 都发出去。假设有 1 万个数据,数据传输一次就10000*4=4MB;并发情况下假设 1000 检索请求,那就是 4GB 的数据,传输阻塞时间会很长,业务更加无法继续。<br />所以,我们如下设计,这样才是文档区别于关系型数据库的地方,宽表设计,不能去考虑数据库范式。
```json
PUT product
{
"mappings": {
"properties": {
"skuId": { "type": "long" },
"spuId": { "type": "keyword" },
"skuTitle": {
"type": "text",
"analyzer": "ik_smart"
},
"skuPrice": { "type": "keyword" },
"skuImg": {
"type": "keyword",
"index": false,
"doc_values": false # 表示该属性不需要聚合等操作
},
"saleCount":{ "type":"long" },
"hasStock": { "type": "boolean" },
"hotScore": { "type": "long" },
"brandId": { "type": "long" },
"catalogId": { "type": "long" },
"brandName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"brandImg":{ # 品牌图片,只用来查看,不用来检索和聚合
"type": "keyword",
"index": false,
"doc_values": false
},
"catalogName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrs": { # 这里是spu的规格属性
"type": "nested", # 嵌入式,内部属性
"properties": {
"attrId": {"type": "long" },
"attrName": {
"type": "keyword",
"index": false,
"doc_values": false
},
"attrValue": { "type": "keyword" }
}
}
}
}
}
1.3 关于nested类型
nested官方文档
- Object 数据类型的数组会被扁平化处理为一个简单的键与值的列表,即对象的相同属性会放到同一个数组中,在检索时会出现错误。 对象类型的数组如何被扁平化处理
- 对于 Object 类型的数组,要使用 nested 字段类型。Using nested fields for arrays of objects
# 这里我们想寸的文档user属性里面包含了两个人:John Smith和Alice White
PUT my_index/_doc/1
{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
# 因为没有设置type为nested,es扁平化处理后,把first跟last分别放在一起了
# 这样就产生了笛卡尔积,共有四种组合
{
"group" : "fans",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
# 如果现在查询first为Alice,last为Smith的就会有对应document,实际上是没有的
GET my_index/_search
{
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}
}
二、商品上架功能
2.1 创建es存储的实体类
package com.atguigu.common.to.es;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
/**
* es进行文档存储的实体类
* @author mrlinxi
* @create 2022-03-17 22:42
*/
@Data
public class SkuEsModel {
private Long skuId;
private Long spuId;
private String skuTitle;
private BigDecimal skuPrice;
private String skuImg;
private Long saleCount;
/**
* 是否有库存
*/
private Boolean hasStock;
/**
* 热度
*/
private Long hotScore;
private Long brandId;
private Long catalogId;
private String brandName;
private String brandImg;
private String catalogName;
private List<Attrs> attrs;
@Data
public static class Attrs {
private Long attrId;
private String attrName;
private String attrValue;
}
}
2.2 实现上架业务接口
/**
* 商品上架
*/
// /product/spuinfo/{spuId}/up
@PostMapping("/{spuId}/up")
public R spuUp(@PathVariable("spuId") Long spuId) {
spuInfoService.up(spuId);
return R.ok();
}
2.2.1 获取spuId对应的sku
skuInfoService.getSkusBySpuId(spuId)
/**
* 通过SpuId找出所有对应的sku信息
* @param spuId
* @return
*/
List<SkuInfoEntity> getSkusBySpuId(Long spuId);
/**
* 根据spuid查出所有对应的sku信息
* @param spuId
* @return
*/
@Override
public List<SkuInfoEntity> getSkusBySpuId(Long spuId) {
List<SkuInfoEntity> skus = this.list(new QueryWrapper<SkuInfoEntity>().eq("spu_id", spuId));
return skus;
}
2.2.2 查询当前spu能检索的属性
attrService.selectSearchAttrIds(List<Long> spuAttrIds);
/**
* 在指定属性集合中,找出可检索的属性
*
* @param spuAttrIds
* @return
*/
List<Long> selectSearchAttrIds(List<Long> spuAttrIds);
/**
* 在指定的属性集合中,找出可检索的属性
*
* @param spuAttrIds
* @return
*/
@Override
public List<Long> selectSearchAttrIds(List<Long> spuAttrIds) {
// SELECT `attr_id` FROM `pms_attr` WHERE `attr_id` IN spuAttrIds AND `search_type` = 1;
return baseMapper.selectSearchAttrIds(spuAttrIds);
}
AttrDao中定义接口,写Mapper文件
<select id="selectSearchAttrIds" resultType="java.lang.Long">
SELECT `attr_id` FROM `pms_attr` WHERE `attr_id` IN
<foreach collection="spuAttrIds" item="attrId" separator="," open="(" close=")">
#{attrId}
</foreach>
AND `search_type` = 1
</select>
2.2.3 远程查询库存
我们希望将当前supId对应的所有skuId一次性发送给库存服务,查询其全部的库存信息,这样就不需要分次查询,以减少远程调用的消耗,提升性能降低延迟。
gulimall-ware服务中定义查询库存接口&实现&Mapper映射文件
package com.atguigu.gulimall.gulimallware.vo;
import lombok.Data;
@Data
public class SkuHasStockVo {
private Long skuId;
private Boolean hasStock;
}
package com.atguigu.common.utils;
public class R<T> extends HashMap<String, Object> {
private static final long serialVersionUID = 1L;
private T data;
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
/**
* 查询sku是否有库存
*/
@PostMapping("/hasStock")
public R<List<SkuHasStockVo>> getSkuHasStock(@RequestBody List<Long> skuIds) {
// 返回每个sku的id跟是否有库存(hasStock)
List<SkuHasStockVo> vos = wareSkuService.getSkuHasStock(skuIds);
R<List<SkuHasStockVo>> ok = R.ok();
ok.setData(vos);
return ok;
}
/**
* 检查sku是否有库存
* @param skuIds
* @return
*/
@Override
public List<SkuHasStockVo> getSkuHasStock(List<Long> skuIds) {
// SELECT sku_id, SUM(`stock` - `stock_locked`) FROM `wms_ware_sku` GROUP BY sku_id;
List<SkuHasStockVo> collect = skuIds.stream().map(skuId -> {
SkuHasStockVo skuHasStockVo = new SkuHasStockVo();
skuHasStockVo.setSkuId(skuId);
// 查询当前sku的总库存量
// SELECT SUM(`stock` - `stock_locked`) FROM `wms_ware_sku` WHERE `sku_id` = ?;
Long stock = baseMapper.getSkuStock(skuId);
skuHasStockVo.setHasStock(stock > 0);
return skuHasStockVo;
}).collect(Collectors.toList());
return collect;
}
<select id="getSkuStock" resultType="java.lang.Long">
SELECT SUM(`stock` - `stock_locked`) FROM `wms_ware_sku` WHERE `sku_id` = #{skuId}
</select>
gulimall-product调用ware服务
定义ware的远程调用接口,@PostMapping中一定要写完整路径!完整路径!完整路径!
/**
* @author mrlinxi
* @create 2022-03-18 13:50
*/
@FeignClient("gulimall-ware")
public interface WareFeignService {
/**
* 1.R在设计的时候可以加上泛型
* 2.直接返回我们想要的结果
* 3.自己封装解析结果
* @param skuIds
* @return
*/
@PostMapping("/gulimallware/waresku/hasStock")
R<List<SkuHasStockVo>> getSkuHasStock(@RequestBody List<Long> skuIds);
}
因为我们想获取到ware返回的List
2.2.4 将数据发给es保存
调用es保存数据是search服务的功能,我们需要在product服务中调用search服务。需要在search服务中完成存储业务实现。
gulimall-search服务实现索引接口
search服务中新建一个es常量类
package com.atguigu.gulimall.gulimallsearch.constant;
/**
* es常量类
* @author mrlinxi
* @create 2022-03-18 15:55
*/
public class EsConstant {
public static final String PRODUCT_INDEX = "product"; // sku数据在es中的索引
}
@Slf4j
@RestController
@RequestMapping("/gulimallsearch/save")
public class ElasticSaveController {
@Autowired
ProductSaveService productSaveService;
/**
* 上架商品
*/
@PostMapping("/product")
public R productStatusUp(@RequestBody List<SkuEsModel> skuEsModels) {
boolean status = false;
try {
// 如果status为true则表示没有发生错误
status = productSaveService.productStatusUp(skuEsModels);
} catch (IOException e) {
log.error("ElasticSaveController.productStatusUp 商品上架远程调用ware服务发生错误:{}", e);
return R.error(BizCodeEnum.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UP_EXCEPTION.getMsg());
}
return status ? R.ok() : R.error(BizCodeEnum.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UP_EXCEPTION.getMsg());
}
}
@Service("productSaveService")
@Slf4j
public class ProductSaveServiceImpl implements ProductSaveService {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 将商品信息存储到es中
*
* @param skuEsModels
*/
@Override
public boolean productStatusUp(List<SkuEsModel> skuEsModels) throws IOException {
//将数据保存到es中
// 1.给es中建立一个索引,建立好映射关系
// 2.给es中保存这些数据
// BulkRequest bulkRequest, RequestOptions options
BulkRequest bulkRequest = new BulkRequest();
for (SkuEsModel skuEsModel : skuEsModels) {
// 构造保存请求
IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);
indexRequest.id(skuEsModel.getSkuId().toString());
String jsonString = JSON.toJSONString(skuEsModel); // 将对象转为JSON字符串
indexRequest.source(jsonString, XContentType.JSON);
bulkRequest.add(indexRequest);
}
// bulk表示批量操作,类似于jdbc的batch操作
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);
// TODO 1.如果发生错误,需要对错误进行处理
// 如果发生错误,那么返回true,否则返回false
boolean hasFailures = bulkResponse.hasFailures();
List<String> collect = Arrays.stream(bulkResponse.getItems()).map(item -> item.getId()).collect(Collectors.toList());
log.info("商品上架成功:{}", collect);
return !hasFailures;
}
}
gulimall-product调用search服务
@FeignClient("gulimall-search")
public interface SearchFeignService {
@PostMapping("//gulimallsearch/save/product")
R productStatusUp(@RequestBody List<SkuEsModel> skuEsModels);
}
同时,当商品上架成功后,需要更新spu的上架信息和更新时间。新建商品上架状态枚举类:
public enum PublishStatusEum {
NEW_SPU(0, "新建"), SPU_UP(1, "商品上架"), SPU_DOWN(2, "商品下架");
private int code;
private String message;
PublishStatusEum(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
}
声明更新spu上架状态的方法baseMapper.updateSpuStatus
,写mapper映射文件。
public interface SpuInfoDao extends BaseMapper<SpuInfoEntity> {
/**
* 更新spu上架状态
* @param spuId
* @param code
*/
void updateSpuStatus(@Param("spuId") Long spuId, @Param("code") int code);
}
<update id="updateSpuStatus">
UPDATE `pms_spu_info` SET `publish_status` = #{code}, `update_time` = NOW() WHERE `id` = #{spuId};
</update>
2.3 测试——必看很重要
WareSkuDao.xml中这句SQL语句,stock_locked字段在数据库中如果不指定默认是null,如果为null那么不管stock是多少,查出来都是null。所以建议将stock_locked设置默认为0。
这里stockMap拿不到R中的data数据,debugWareSkuController里的getSkuHasStock方法的时候发现data并没有放入R。
这是因为R继承了HashMap,而JackSon对于HashMap类型会有特殊的处理方式,具体来说就是会对类进行向上转型为Map,导致子类的私有属性消失。
那么我们对R添加一个setData方法,和getData方法,getData方法中我们通过JSON.parseObject将存进R的data逆转为目标对象。
/**
* 使用alibaba提供的fastjson逆转
* @param typeReference
* @param <T>
* @return
*/
public <T> T getData(TypeReference<T> typeReference) {
Object data = get("data");
T t = JSON.parseObject(JSON.toJSONString(data), typeReference);
return t;
}
public R setData(Object data) {
put("data", data);
return this;
}
需要修改之前用到R泛型的地方
/**
* 查询sku是否有库存
*/
@PostMapping("/hasStock")
public R getSkuHasStock(@RequestBody List<Long> skuIds) {
// 返回每个sku的id跟是否有库存(hasStock)
List<SkuHasStockVo> vos = wareSkuService.getSkuHasStock(skuIds);
return R.ok().setData(vos);
}
@PostMapping("/gulimallware/waresku/hasStock")
R getSkuHasStock(@RequestBody List<Long> skuIds);
商品上架业务最终业务逻辑
@Autowired
BrandService brandService;
@Autowired
CategoryService categoryService;
@Autowired
WareFeignService wareFeignService;
/**
* 商品上架
*
* @param spuId
*/
@Override
public void up(Long spuId) {
// 组装我们需要的数据
// 1.查出当前spuId对应的所有sku信息,品牌的名字等等
List<SkuInfoEntity> skus = skuInfoService.getSkusBySpuId(spuId);
// TODO 3.查询品牌id、品牌名、分类名
// 先查出当前spuId对应的spu信息,里面包含了brandId catalogId
SpuInfoEntity spu = this.getById(spuId);
// 根据brandId 查出 brandName
BrandEntity brand = brandService.getById(spu.getBrandId());
// 查出CatalogName 也就是category的name
CategoryEntity category = categoryService.getById(spu.getCatalogId());
// TODO 4.查询当前spu对应的能被检索的所有属性 pms_product_attr_value pms_attr
List<ProductAttrValueEntity> spuAttrs = attrValueService.baseAttrListForSpu(spuId);
// 获取spu对应的所有属性的attrId
List<Long> spuAttrIds = spuAttrs.stream().map(ProductAttrValueEntity::getAttrId).collect(Collectors.toList());
// 筛选出可以被检索的属性
List<Long> availableAttrIds = attrService.selectSearchAttrIds(spuAttrIds);
// 这里为什么要先变为set,因为set的contains方法的效率要比list高
HashSet<Long> availableIdSet = new HashSet<>(availableAttrIds);
List<SkuEsModel.Attrs> spuSearchAttrs = spuAttrs.stream().filter(attr -> availableIdSet.contains(attr.getAttrId()))
.map(item -> {
SkuEsModel.Attrs attrs = new SkuEsModel.Attrs();
BeanUtils.copyProperties(item, attrs);
return attrs;
}).collect(Collectors.toList());
// TODO 1.发送远程调用,库存系统查询是否有库存
// 我们希望远程调用一次库存服务,直接一次性查所有sku是否有库存,这里直接进行幅值,而不调用远程接口
List<Long> skuIdList = skus.stream().map(SkuInfoEntity::getSkuId).collect(Collectors.toList());
Map<Long, Boolean> stockMap = null;
try {
R r = wareFeignService.getSkuHasStock(skuIdList);
TypeReference<List<SkuHasStockVo>> typeReference = new TypeReference<List<SkuHasStockVo>>() {
};
stockMap = r.getData(typeReference).stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, item -> item.getHasStock()));
} catch (Exception e) {
log.error("库存服务查询异常:" + e);
}
// 2.封装每个sku的信息
Map<Long, Boolean> finalStockMap = stockMap;
List<SkuEsModel> upProducts = skus.stream().map(sku -> {
SkuEsModel skuEsModel = new SkuEsModel();
BeanUtils.copyProperties(sku, skuEsModel);
// skuPrice skuImg
skuEsModel.setSkuPrice(sku.getPrice());
skuEsModel.setSkuImg(sku.getSkuDefaultImg());
// hasStock
skuEsModel.setHasStock(false);
// TODO 2.热度评分,默认为0 hotScore
skuEsModel.setHotScore(0L);
// 设置库存信息
if (finalStockMap == null) {
skuEsModel.setHasStock(true);
} else {
skuEsModel.setHasStock(finalStockMap.get(sku.getSkuId()));
}
// brandName brandImg catalogName
skuEsModel.setBrandName(brand.getName());
skuEsModel.setBrandImg(brand.getLogo());
skuEsModel.setCatalogName(category.getName());
// 设置检索属性 SkuEsModel:List<Attrs> attrs
skuEsModel.setAttrs(spuSearchAttrs);
return skuEsModel;
}).collect(Collectors.toList());
// TODO 5.将数据发给ES进行保存,通过gulimall-search服务
R r = searchFeignService.productStatusUp(upProducts);
if (r.getCode() == 0) {
// 远程调用成功
// 一旦上架成功,需要修改`pms_spu_info`中的 publish_status为1
baseMapper.updateSpuStatus(spuId, ProductConstant.PublishStatusEum.SPU_UP.getCode());
} else {
// 远程调用失败
// TODO 重复调用的问题?接口幂等性。比如接口调用失败了,需不需要重新调用一次?(重试机制)
// Feign调用流程 SynchronousMethodHandler.java invoke方法
// 1. 构造请求数据,讲对象转为JSON RequestTemplate template = buildTemplateFromArgs.create(argv);
// 2. 发送请求进行执行(执行成功会解码我们的响应数据) executeAndDecode(template, options);
// 3. 执行请求会有我们的重试机制
}
}