1. day06 监听数据库更新广告缓存
2. canal
canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
canal是应对阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
2.1. 环境部署
2.1.1. mysql开启binlog模式
SHOW VARIABLES LIKE '%log_bin%'; -- 查看当前mysql是否开启binlog模式
如果log_bin的值为OFF是未开启,为ON是已开启。
修改/etc/my.cnf 需要开启binlog模式。
vim /etc/my.cnf
[mysqld]log-bin=mysql-binbinlog-format=ROWserver_id=1
给root用户授权
create user canal@'%' IDENTIFIED by 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;
2.1.2. canal服务端安装配置
https://github.com/alibaba/canal/
上传到/usr/local/canal中
修改配置文件
vi conf/example/instance.properties

修改指定读取位置
show master status; -- 查询file和Position的值
如果file中的binlog文件不为mysql-bin.000001 可以重置mysql
reset master;
修改meta.data配置文件
vim /usr/local/canal/conf/example/meta.dat#找到以下字段并修改"journalName":"mysql-bin.000001","position":120,"
启动服务
cd /usr/local/canal./bin/startup.shcat /usr/local/canal/logs/canal/canal.log #如果显示server is running now 则启动成功
2.2. 数据监控微服务
当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。
我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。
https://github.com/chenqian56131/spring-boot-starter-canal
2.2.1. 微服务搭建
创建changgou_canal项目 导入依赖sa
<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency>
启动类
package com.itheima.canal;import com.xpand.starter.canal.annotation.EnableCanalClient;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableCanalClient //声明当前服务是canal客户端public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}}
application
canal.client.instances.example.host=192.168.130.128canal.client.instances.example.port=11111canal.client.instances.example.batchSize=1000
Canal监听类
package com.itheima.canal.listener;import com.alibaba.otter.canal.protocol.CanalEntry;import com.xpand.starter.canal.annotation.CanalEventListener;import com.xpand.starter.canal.annotation.ListenPoint;@CanalEventListener //声明当前类为canal的监听类public class BusinessListener {/*** @param entryType 当前操作数据库的类型* @param rowData 当前操作数据库的数据*/@ListenPoint(schema = "changgou_business", table = "tb_ad") //监听哪一个数据库 哪张表 当表中发生变化 执行此方法public void adUpdate(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {System.out.println("广告表数据发生变化");//获取改变之前的数据rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据" + c.getName() + ":" + c.getValue()));//获取改变之后的数据rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+":"+c.getValue()));}}
去数据库中更改tb_ad 表中任意一行数据的字段 查看控制台是否有对应内容输出
3. 首页广告缓存更新
当tb_ad(广告)表的数据发生变化时,更新redis中的广告数据。
- 修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到rabbitmq
- 从rabbitmq中提取消息,通过OkHttpClient调用ad_update来实现对广告缓存数据的更新。

3.1. 发送消息到MQ
修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到rabbitmq
导入mq依赖
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency>
application添加
spring.rabbitmq.host=192.168.130.128
新建rqbbitmq配置类
package com.itheima.canal.config;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {//定义队列名称public static final String AD_UPDATE_QUEUE="ad_update_queue";//声明队列@Beanpublic Queue queue(){return new Queue(AD_UPDATE_QUEUE);}}
修改BusinessListener类
package com.itheima.canal.listener;import com.alibaba.otter.canal.protocol.CanalEntry;import com.itheima.canal.config.RabbitMQConfig;import com.xpand.starter.canal.annotation.CanalEventListener;import com.xpand.starter.canal.annotation.ListenPoint;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;@CanalEventListener //声明当前类为canal的监听类public class BusinessListener {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** @param entryType 当前操作数据库的类型* @param rowData 当前操作数据库的数据*/@ListenPoint(schema = "changgou_business", table = "tb_ad") //监听哪一个数据库 哪张表 当表中发生变化 执行此方法public void adUpdate(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {System.out.println("广告表数据发生变化");//获取改变之前的数据//rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据" + c.getName() + ":" + c.getValue()));//获取改变之后的数据//rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+":"+c.getValue()));for (CanalEntry.Column column : rowData.getAfterColumnsList()) {if ("position".equals(column.getName())){System.out.println("发送最新的消息给MQ"+column.getValue());//发送消息rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());}}}}
访问http://192.168.130.128:15672 mq后台 账号密码guest
更新tb_ad数据库的任意一条数据 AD_UPDATE_QUEUE队列是否有消息
3.2. 消费mq消息执行更新
通过消费mq队列中的消息 执行指定地址 触发nginx中lua脚本 更新广告内容
在changgou_service_business工程pom.xml引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>3.9.0</version></dependency>
application 在spring节点添加
rabbitmq:host: 192.168.130.128
com.changgou.business包下创建listener包,包下创建类
package com.changgou.business.listener;import okhttp3.*;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class AdListener {@RabbitListener(queues = "ad_update_queue")public void receiverMessage(String message) {System.out.println("接受到消息为" + message);//发起远程调用OkHttpClient okHttpClient = new OkHttpClient();String url = "http://192.168.130.128/ad_update?position=" + message;Request request = new Request.Builder().url(url).build();Call call = okHttpClient.newCall(request);call.enqueue(new Callback() {//请求失败@Overridepublic void onFailure(Call call, IOException e) {e.printStackTrace();}//请求成功@Overridepublic void onResponse(Call call, Response response) throws IOException {System.out.println("请求成功" + response.message());}});}}
启动启动类 查看控制台输出 消费者已经从队列中消费了消息 队列目前消息为0
4. 商品上架索引库导入数据
- 在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为1时,表示商品上架,将spu的id发送到rabbitmq。
- 在rabbitmq管理后台创建商品上架交换器(fanout)。使用分列模式的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。
- 搜索微服务从rabbitmq的导入索引库的队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。

4.1. 声明队列和交换机
更新rabbitmq配置类 创建交换器goods_up_exchange(类型为fanout),创建队列search_add_queue绑定交换器goods_up_exchange
package com.itheima.canal.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {//定义交换机名称public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";//定义队列名称public static final String AD_UPDATE_QUEUE = "ad_update_queue";public static final String SEARCH_ADD_QUEUE = "search_add_queue";//声明队列@Beanpublic Queue queue() {return new Queue(AD_UPDATE_QUEUE);}@Bean(SEARCH_ADD_QUEUE)public Queue SEARCH_ADD_QUEUE() {return new Queue(SEARCH_ADD_QUEUE);}//声明交换机@Bean(GOODS_UP_EXCHANGE)public Exchange GOODS_UP_EXCHANGE() {return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();}//队列与交换机绑定@Bean()public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("").noargs();}}
4.2. 监听商品上架数据的变化
数据监微服务新增SpuListener 如果商品是由未上架状态变成上架状态 则是最新上架的视频
将此商品的spuid 发送到队列中
package com.itheima.canal.listener;import com.alibaba.otter.canal.protocol.CanalEntry;import com.itheima.canal.config.RabbitMQConfig;import com.xpand.starter.canal.annotation.CanalEventListener;import com.xpand.starter.canal.annotation.ListenPoint;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import java.util.HashMap;import java.util.Map;@CanalEventListenerpublic class SpuListener {@Autowiredprivate RabbitTemplate rabbitTemplate;@ListenPoint(schema = "changgou_goods", table = "tb_spu")public void goodsUp(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {//获取改变之前的数据 并将数据转换为mapMap<String, String> oldData = new HashMap<>();rowData.getBeforeColumnsList().forEach((c) -> oldData.put(c.getName(), c.getValue()));//改变之前的数据 并转换为mapMap<String, String> newData = new HashMap<>();rowData.getAfterColumnsList().forEach((c) -> newData.put(c.getName(), c.getValue()));//获取最新上架的商品 由上架状态0->1if ("0".equals(oldData.get("is_marketable")) && "1".equals(newData.get("is_marketable"))) {//将视频的spu id 发送到mq队列中rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCHANGE, "", newData.get("id"));}}}
4.3. 创建索引结构
新建changgou_service_search_api模块,并添加索引库实体类
<dependencies><dependency><groupId>com.changgou</groupId><artifactId>changgou_common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency></dependencies>
在com.changgou.search.pojo 创建SkuInfo 实体类
@Document(indexName = "skuinfo", type = "docs")public class SkuInfo implements Serializable {//商品id,同时也是商品编号@Id@Field(index = true, store = true, type = FieldType.Keyword)private Long id;//SKU名称@Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")private String name;//商品价格,单位为:元@Field(index = true, store = true, type = FieldType.Double)private Long price;//库存数量@Field(index = true, store = true, type = FieldType.Integer)private Integer num;//商品图片@Field(index = false, store = true, type = FieldType.Text)private String image;//商品状态,1-正常,2-下架,3-删除@Field(index = true, store = true, type = FieldType.Keyword)private String status;//创建时间private Date createTime;//更新时间private Date updateTime;//是否默认@Field(index = true, store = true, type = FieldType.Keyword)private String isDefault;//SPUID@Field(index = true, store = true, type = FieldType.Long)private Long spuId;//类目ID@Field(index = true, store = true, type = FieldType.Long)private Long categoryId;//类目名称@Field(index = true, store = true, type = FieldType.Keyword)private String categoryName;//品牌名称@Field(index = true, store = true, type = FieldType.Keyword)private String brandName;//规格private String spec;//规格参数private Map<String, Object> specMap;//getter & setter略}
注意生成 get 和 set 方法
4.4. 搜索微服务搭建
创建changgou_service_search模块 导入依赖
<dependencies><dependency><groupId>com.changgou</groupId><artifactId>changgou_common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>com.changgou</groupId><artifactId>changgou_service_goods_api</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.changgou</groupId><artifactId>changgou_service_search_api</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
application
server:port: 9009spring:application:name: searchrabbitmq:host: 192.168.130.128redis:host: 192.168.130.128main:allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册data:elasticsearch:cluster-name: elasticsearchcluster-nodes: 192.168.130.128:9300thymeleaf:cache: falseeureka:client:service-url:defaultZone: http://127.0.0.1:6868/eurekainstance:prefer-ip-address: truefeign:hystrix:enabled: trueclient:config:default: #配置全局的feign的调用超时时间 如果 有指定的服务配置 默认的配置不会生效connectTimeout: 600000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接 单位是毫秒readTimeout: 600000 # 指定的是调用服务提供者的 服务 的超时时间() 单位是毫秒#hystrix 配置hystrix:command:default:execution:timeout:#如果enabled设置为false,则请求超时交给ribbon控制enabled: falseisolation:strategy: SEMAPHORE
在com.changgou.search 创建启动类SearchApplication
package com.changgou.search;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.netflix.eureka.EnableEurekaClient;import org.springframework.cloud.openfeign.EnableFeignClients;@SpringBootApplication@EnableEurekaClient //声明为Eureka客户端@EnableFeignClients(basePackages = {"com.changgou.goods.feign"}) //开启Feignpublic class SearchApplication {public static void main(String[] args) {SpringApplication.run(SearchApplication.class,args);}}
创建config包 将canal中的MQconfig复制过来
package com.changgou.search.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {//定义交换机名称public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";//定义队列名称public static final String AD_UPDATE_QUEUE = "ad_update_queue";public static final String SEARCH_ADD_QUEUE = "search_add_queue";//声明队列@Beanpublic Queue queue() {return new Queue(AD_UPDATE_QUEUE);}@Bean(SEARCH_ADD_QUEUE)public Queue SEARCH_ADD_QUEUE() {return new Queue(SEARCH_ADD_QUEUE);}//声明交换机@Bean(GOODS_UP_EXCHANGE)public Exchange GOODS_UP_EXCHANGE() {return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();}//队列与交换机绑定@Bean()public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("").noargs();}}
4.5. 商品服务查询商品信息
SkuController新增方法
@GetMapping("/spu/{spuId}")public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId) {Map<String, Object> searchMap = new HashMap<>();if (!"all".equals(spuId)) {searchMap.put("spuId", spuId);}searchMap.put("status", "1");List<Sku> list = skuService.findList(searchMap);return list;}
changgou_service_goods_api新增common依赖
<dependencies><dependency><groupId>com.changgou</groupId><artifactId>changgou_common</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>
在feign包下定义feign接口
package com.changgou.goods.feign;import com.changgou.goods.pojo.Sku;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import java.util.List;@FeignClient(name = "goods")public interface SkuFeign {@GetMapping("/sku/spu/{spuId}")List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId);}
4.6. 搜索微服务批量导入数据逻辑
创建 com.changgou.search.dao包,并新增ESManagerMapper接口
package com.changgou.search.dao;import com.changgou.search.pojo.SkuInfo;import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface ESManagerMapper extends ElasticsearchRepository<SkuInfo,Long> {}
创建 com.changgou.search.service包,包下创建接口EsManagerService
package com.changgou.search.service;public interface ESManagerService {//创建索引库结构void createMappingAndIndex();//导入全部数据进入esvoid importAll();//根据spuid查询skuList 再导入索引库void importDataBySpuId(String spuId);}
创建com.changgou.search.service.impl包,包下创建服务实现类
package com.changgou.search.service.impl;import com.alibaba.fastjson.JSON;import com.changgou.goods.feign.SkuFeign;import com.changgou.goods.pojo.Sku;import com.changgou.search.dao.ESManagerMapper;import com.changgou.search.pojo.SkuInfo;import com.changgou.search.service.ESManagerService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;import org.springframework.stereotype.Service;import java.util.List;import java.util.Map;@Servicepublic class ESManagerServiceImpl implements ESManagerService {@Autowiredprivate ElasticsearchTemplate elasticsearchTemplate;@Autowiredprivate SkuFeign skuFeign;@Autowiredprivate ESManagerMapper esManagerMapper;//创建索引结构@Overridepublic void createMappingAndIndex() {//创建索引elasticsearchTemplate.createIndex(SkuInfo.class);//创建映射elasticsearchTemplate.putMapping(SkuInfo.class);}//导入全部sku集合到索引库@Overridepublic void importAll() {//查询sku集合List<Sku> skuList = skuFeign.findSkuListBySpuId("all");if (skuList == null || skuList.size() <= 0) {throw new RuntimeException("当前没有数据被查询到,无法导入索引库");}//将skulist转换为jsonString jsonSkulist = JSON.toJSONString(skuList);//将json转换为skuinfoList<SkuInfo> skuInfos = JSON.parseArray(jsonSkulist, SkuInfo.class);for (SkuInfo skuInfo : skuInfos) {//将规格信息转换为mapMap map = JSON.parseObject(skuInfo.getSpec(), Map.class);skuInfo.setSpecMap(map);}//导入索引库esManagerMapper.saveAll(skuInfos);}//根据spuid查询skulist 添加到索引库@Overridepublic void importDataBySpuId(String spuId) {List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);if (skuList == null || skuList.size() <= 0) {throw new RuntimeException("当前没有数据被查询到,无法导入索引库");}//将集合转换为jsonString jsonSkuList = JSON.toJSONString(skuList);List<SkuInfo> skuInfos = JSON.parseArray(jsonSkuList, SkuInfo.class);for (SkuInfo skuInfo : skuInfos) {//将规格信息进行这Map map = JSON.parseObject(skuInfo.getSpec(), Map.class);skuInfo.setSpecMap(map);}//添加索引库esManagerMapper.saveAll(skuInfos);}}
创建com.changgou.search.controller.定义ESManagerController
package com.changgou.search.controller;import com.changgou.entity.Result;import com.changgou.entity.StatusCode;import com.changgou.search.service.ESManagerService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/manager")public class ESManagerController {@Autowiredprivate ESManagerService esManagerService;//创建索引库结构@GetMapping("/create")public Result create(){esManagerService.createMappingAndIndex();return new Result(true, StatusCode.OK,"创建索引结构成功");}//导入全部数据@GetMapping("importAll")public Result importAll(){esManagerService.importAll();return new Result(true, StatusCode.OK,"导入全部数据成功");}}
4.7. 接受mq消息执行导入
changgou_service_search工程创建com.changgou.search.listener包,包下创建类
package com.changgou.search.listener;import com.changgou.search.config.RabbitMQConfig;import com.changgou.search.service.ESManagerService;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class GoodsUpListener {@Autowiredprivate ESManagerService esManagerService;@RabbitListener(queues = RabbitMQConfig.SEARCH_ADD_QUEUE)public void receiveMessage(String spuId) {System.out.println("接受到的消息为" + spuId);//查询skulist 并导入到索引库中esManagerService.importDataBySpuId(spuId);}}
测试
先删除所有索引
改变changgou_goods库的tb_spu表 中任意一行数据的is_marketable 改为0 再改为
查看索引是否有被写入
5. 商品下架索引库删除数据
(1)在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为0时,表示商品下架,将spu的id发送到rabbitmq。
(2)在rabbitmq管理后台创建商品下架交换器(fanout)。使用分列模式的交换器是考虑商品下架会有很多种逻辑需要处理,索引库删除数据只是其中一项,另外还有删除商品详细页等操作。
(3)搜索微服务从rabbitmq的的队列中提取spu的id,通过调用elasticsearch的高级restAPI 将相关的sku列表从索引库删除。

5.1. 创建交换机和队列
在MQconifig类中 添加交换机goods_down_exchange 队列 search_delete_queue 并进行绑定
package com.changgou.search.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {//定义交换机名称public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";public static final String GOODS_DOWN_EXCHANGE="goods_down_exchange";//定义队列名称public static final String AD_UPDATE_QUEUE = "ad_update_queue";public static final String SEARCH_ADD_QUEUE = "search_add_queue";public static final String SEARCH_DEL_QUEUE="search_del_queue";//声明队列@Beanpublic Queue queue() {return new Queue(AD_UPDATE_QUEUE);}@Bean(SEARCH_ADD_QUEUE)public Queue SEARCH_ADD_QUEUE() {return new Queue(SEARCH_ADD_QUEUE);}@Bean(SEARCH_DEL_QUEUE)public Queue SEARCH_DEL_QUEUE(){return new Queue(SEARCH_DEL_QUEUE);}//声明交换机@Bean(GOODS_UP_EXCHANGE)public Exchange GOODS_UP_EXCHANGE() {return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();}@Bean(GOODS_DOWN_EXCHANGE)public Exchange GOODS_DOWN_EXCHANGE(){return ExchangeBuilder.fanoutExchange(GOODS_DOWN_EXCHANGE).durable(true).build();}//队列与交换机绑定@Beanpublic Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("").noargs();}@Beanpublic Binding GOODS_DOWN_EXCHANGE_BINDING(@Qualifier(SEARCH_DEL_QUEUE)Queue queue,@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("").noargs();}}
注意canal下和search下的MQconfig都要添加
5.2. canal监听下架
在com.itheima.canal.listener.SpuListener 添加下架逻辑判断
//获取最新下架的商品if ("1".equals(oldData.get("is_marketable")) && "0".equals(newData.get("is_marketable"))) {//将商品的spu id 发送到mq队列中rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_DOWN_EXCHANGE, "", newData.get("id"));}
5.3. 根据spuId删除索引数据
ESManagerService新增方法定义
//根据spuid删除es索引中相关的sku数据void delDataBySpuId(String spuId);
impl实现类
//根据spuid删除指定索引@Overridepublic void delDataBySpuId(String spuId) {List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);if (skuList == null || skuList.size() <= 0) {throw new RuntimeException("当前没有数据被查询到,无法导入索引库");}for (Sku sku : skuList) {esManagerMapper.deleteById(Long.parseLong(sku.getId()));}}
5.4. 接收mq消息,执行索引库删除
从rabbitmq中提取消息,调动根据spuId删除索引库数据的方法 changgou_service_search新增监听类
package com.changgou.search.listener;import com.changgou.search.config.RabbitMQConfig;import com.changgou.search.service.ESManagerService;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class GoodsDelListener {@Autowiredprivate ESManagerService esManagerService;@RabbitListener(queues = RabbitMQConfig.SEARCH_DEL_QUEUE)public void receiveMessage(String spuId){System.out.println("删除索引库id为"+spuId);//调用业务层完成索引库数据删除esManagerService.delDataBySpuId(spuId);}}
测试 将上架状态1 改为0 查看索引库
