1. day06 监听数据库更新广告缓存

2. canal

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

canal是应对阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

06. day06 监听数据库更新广告缓存 - 图1

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

2.1. 环境部署

2.1.1. mysql开启binlog模式

  1. SHOW VARIABLES LIKE '%log_bin%'; -- 查看当前mysql是否开启binlog模式

如果log_bin的值为OFF是未开启,为ON是已开启。

修改/etc/my.cnf 需要开启binlog模式。

  1. vim /etc/my.cnf
  1. [mysqld]
  2. log-bin=mysql-bin
  3. binlog-format=ROW
  4. server_id=1

给root用户授权

  1. create user canal@'%' IDENTIFIED by 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
  3. FLUSH PRIVILEGES;

2.1.2. canal服务端安装配置

https://github.com/alibaba/canal/

上传到/usr/local/canal中

修改配置文件

  1. vi conf/example/instance.properties

06. day06 监听数据库更新广告缓存 - 图2

修改指定读取位置

  1. show master status; -- 查询filePosition的值

如果file中的binlog文件不为mysql-bin.000001 可以重置mysql

  1. reset master;

修改meta.data配置文件

  1. vim /usr/local/canal/conf/example/meta.dat
  2. #找到以下字段并修改
  3. "journalName":"mysql-bin.000001","position":120,"

启动服务

  1. cd /usr/local/canal
  2. ./bin/startup.sh
  3. cat /usr/local/canal/logs/canal/canal.log #如果显示server is running now 则启动成功

2.2. 数据监控微服务

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行相应的逻辑处理。

我们这里使用的一个开源的项目,它实现了springboot与canal的集成。比原生的canal更加优雅。

https://github.com/chenqian56131/spring-boot-starter-canal

2.2.1. 微服务搭建

创建changgou_canal项目 导入依赖sa

  1. <dependency>
  2. <groupId>com.xpand</groupId>
  3. <artifactId>starter-canal</artifactId>
  4. <version>0.0.1-SNAPSHOT</version>
  5. </dependency>

启动类

  1. package com.itheima.canal;
  2. import com.xpand.starter.canal.annotation.EnableCanalClient;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @EnableCanalClient //声明当前服务是canal客户端
  7. public class CanalApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(CanalApplication.class, args);
  10. }
  11. }

application

  1. canal.client.instances.example.host=192.168.130.128
  2. canal.client.instances.example.port=11111
  3. canal.client.instances.example.batchSize=1000

Canal监听类

  1. package com.itheima.canal.listener;
  2. import com.alibaba.otter.canal.protocol.CanalEntry;
  3. import com.xpand.starter.canal.annotation.CanalEventListener;
  4. import com.xpand.starter.canal.annotation.ListenPoint;
  5. @CanalEventListener //声明当前类为canal的监听类
  6. public class BusinessListener {
  7. /**
  8. * @param entryType 当前操作数据库的类型
  9. * @param rowData 当前操作数据库的数据
  10. */
  11. @ListenPoint(schema = "changgou_business", table = "tb_ad") //监听哪一个数据库 哪张表 当表中发生变化 执行此方法
  12. public void adUpdate(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {
  13. System.out.println("广告表数据发生变化");
  14. //获取改变之前的数据
  15. rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据" + c.getName() + ":" + c.getValue()));
  16. //获取改变之后的数据
  17. rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+":"+c.getValue()));
  18. }
  19. }

去数据库中更改tb_ad 表中任意一行数据的字段 查看控制台是否有对应内容输出

3. 首页广告缓存更新

当tb_ad(广告)表的数据发生变化时,更新redis中的广告数据。

  1. 修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到rabbitmq
  2. 从rabbitmq中提取消息,通过OkHttpClient调用ad_update来实现对广告缓存数据的更新。

06. day06 监听数据库更新广告缓存 - 图3

3.1. 发送消息到MQ

修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到rabbitmq

导入mq依赖

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit</artifactId>
  4. </dependency>

application添加

  1. spring.rabbitmq.host=192.168.130.128

新建rqbbitmq配置类

  1. package com.itheima.canal.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class RabbitMQConfig {
  7. //定义队列名称
  8. public static final String AD_UPDATE_QUEUE="ad_update_queue";
  9. //声明队列
  10. @Bean
  11. public Queue queue(){
  12. return new Queue(AD_UPDATE_QUEUE);
  13. }
  14. }

修改BusinessListener类

  1. package com.itheima.canal.listener;
  2. import com.alibaba.otter.canal.protocol.CanalEntry;
  3. import com.itheima.canal.config.RabbitMQConfig;
  4. import com.xpand.starter.canal.annotation.CanalEventListener;
  5. import com.xpand.starter.canal.annotation.ListenPoint;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. @CanalEventListener //声明当前类为canal的监听类
  9. public class BusinessListener {
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. * @param entryType 当前操作数据库的类型
  14. * @param rowData 当前操作数据库的数据
  15. */
  16. @ListenPoint(schema = "changgou_business", table = "tb_ad") //监听哪一个数据库 哪张表 当表中发生变化 执行此方法
  17. public void adUpdate(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {
  18. System.out.println("广告表数据发生变化");
  19. //获取改变之前的数据
  20. //rowData.getBeforeColumnsList().forEach((c) -> System.out.println("改变前的数据" + c.getName() + ":" + c.getValue()));
  21. //获取改变之后的数据
  22. //rowData.getAfterColumnsList().forEach((c) -> System.out.println("改变后的数据"+c.getName()+":"+c.getValue()));
  23. for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
  24. if ("position".equals(column.getName())){
  25. System.out.println("发送最新的消息给MQ"+column.getValue());
  26. //发送消息
  27. rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE,column.getValue());
  28. }
  29. }
  30. }
  31. }

访问http://192.168.130.128:15672 mq后台 账号密码guest

更新tb_ad数据库的任意一条数据 AD_UPDATE_QUEUE队列是否有消息

3.2. 消费mq消息执行更新

通过消费mq队列中的消息 执行指定地址 触发nginx中lua脚本 更新广告内容

在changgou_service_business工程pom.xml引入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>com.squareup.okhttp3</groupId>
  7. <artifactId>okhttp</artifactId>
  8. <version>3.9.0</version>
  9. </dependency>

application 在spring节点添加

  1. rabbitmq:
  2. host: 192.168.130.128

com.changgou.business包下创建listener包,包下创建类

  1. package com.changgou.business.listener;
  2. import okhttp3.*;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.io.IOException;
  6. @Component
  7. public class AdListener {
  8. @RabbitListener(queues = "ad_update_queue")
  9. public void receiverMessage(String message) {
  10. System.out.println("接受到消息为" + message);
  11. //发起远程调用
  12. OkHttpClient okHttpClient = new OkHttpClient();
  13. String url = "http://192.168.130.128/ad_update?position=" + message;
  14. Request request = new Request.Builder().url(url).build();
  15. Call call = okHttpClient.newCall(request);
  16. call.enqueue(new Callback() {
  17. //请求失败
  18. @Override
  19. public void onFailure(Call call, IOException e) {
  20. e.printStackTrace();
  21. }
  22. //请求成功
  23. @Override
  24. public void onResponse(Call call, Response response) throws IOException {
  25. System.out.println("请求成功" + response.message());
  26. }
  27. });
  28. }
  29. }

启动启动类 查看控制台输出 消费者已经从队列中消费了消息 队列目前消息为0

4. 商品上架索引库导入数据

  1. 在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为1时,表示商品上架,将spu的id发送到rabbitmq。
  2. 在rabbitmq管理后台创建商品上架交换器(fanout)。使用分列模式的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。
  3. 搜索微服务从rabbitmq的导入索引库的队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。

06. day06 监听数据库更新广告缓存 - 图4

4.1. 声明队列和交换机

更新rabbitmq配置类 创建交换器goods_up_exchange(类型为fanout),创建队列search_add_queue绑定交换器goods_up_exchange

  1. package com.itheima.canal.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //定义交换机名称
  9. public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";
  10. //定义队列名称
  11. public static final String AD_UPDATE_QUEUE = "ad_update_queue";
  12. public static final String SEARCH_ADD_QUEUE = "search_add_queue";
  13. //声明队列
  14. @Bean
  15. public Queue queue() {
  16. return new Queue(AD_UPDATE_QUEUE);
  17. }
  18. @Bean(SEARCH_ADD_QUEUE)
  19. public Queue SEARCH_ADD_QUEUE() {
  20. return new Queue(SEARCH_ADD_QUEUE);
  21. }
  22. //声明交换机
  23. @Bean(GOODS_UP_EXCHANGE)
  24. public Exchange GOODS_UP_EXCHANGE() {
  25. return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
  26. }
  27. //队列与交换机绑定
  28. @Bean()
  29. public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {
  30. return BindingBuilder.bind(queue).to(exchange).with("").noargs();
  31. }
  32. }

4.2. 监听商品上架数据的变化

数据监微服务新增SpuListener 如果商品是由未上架状态变成上架状态 则是最新上架的视频

将此商品的spuid 发送到队列中

  1. package com.itheima.canal.listener;
  2. import com.alibaba.otter.canal.protocol.CanalEntry;
  3. import com.itheima.canal.config.RabbitMQConfig;
  4. import com.xpand.starter.canal.annotation.CanalEventListener;
  5. import com.xpand.starter.canal.annotation.ListenPoint;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @CanalEventListener
  11. public class SpuListener {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @ListenPoint(schema = "changgou_goods", table = "tb_spu")
  15. public void goodsUp(CanalEntry.EntryType entryType, CanalEntry.RowData rowData) {
  16. //获取改变之前的数据 并将数据转换为map
  17. Map<String, String> oldData = new HashMap<>();
  18. rowData.getBeforeColumnsList().forEach((c) -> oldData.put(c.getName(), c.getValue()));
  19. //改变之前的数据 并转换为map
  20. Map<String, String> newData = new HashMap<>();
  21. rowData.getAfterColumnsList().forEach((c) -> newData.put(c.getName(), c.getValue()));
  22. //获取最新上架的商品 由上架状态0->1
  23. if ("0".equals(oldData.get("is_marketable")) && "1".equals(newData.get("is_marketable"))) {
  24. //将视频的spu id 发送到mq队列中
  25. rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCHANGE, "", newData.get("id"));
  26. }
  27. }
  28. }

4.3. 创建索引结构

新建changgou_service_search_api模块,并添加索引库实体类

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.changgou</groupId>
  4. <artifactId>changgou_common</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  10. </dependency>
  11. </dependencies>

在com.changgou.search.pojo 创建SkuInfo 实体类

  1. @Document(indexName = "skuinfo", type = "docs")
  2. public class SkuInfo implements Serializable {
  3. //商品id,同时也是商品编号
  4. @Id
  5. @Field(index = true, store = true, type = FieldType.Keyword)
  6. private Long id;
  7. //SKU名称
  8. @Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")
  9. private String name;
  10. //商品价格,单位为:元
  11. @Field(index = true, store = true, type = FieldType.Double)
  12. private Long price;
  13. //库存数量
  14. @Field(index = true, store = true, type = FieldType.Integer)
  15. private Integer num;
  16. //商品图片
  17. @Field(index = false, store = true, type = FieldType.Text)
  18. private String image;
  19. //商品状态,1-正常,2-下架,3-删除
  20. @Field(index = true, store = true, type = FieldType.Keyword)
  21. private String status;
  22. //创建时间
  23. private Date createTime;
  24. //更新时间
  25. private Date updateTime;
  26. //是否默认
  27. @Field(index = true, store = true, type = FieldType.Keyword)
  28. private String isDefault;
  29. //SPUID
  30. @Field(index = true, store = true, type = FieldType.Long)
  31. private Long spuId;
  32. //类目ID
  33. @Field(index = true, store = true, type = FieldType.Long)
  34. private Long categoryId;
  35. //类目名称
  36. @Field(index = true, store = true, type = FieldType.Keyword)
  37. private String categoryName;
  38. //品牌名称
  39. @Field(index = true, store = true, type = FieldType.Keyword)
  40. private String brandName;
  41. //规格
  42. private String spec;
  43. //规格参数
  44. private Map<String, Object> specMap;
  45. //getter & setter略
  46. }

注意生成 get 和 set 方法

4.4. 搜索微服务搭建

创建changgou_service_search模块 导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.changgou</groupId>
  4. <artifactId>changgou_common</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.cloud</groupId>
  9. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>com.changgou</groupId>
  17. <artifactId>changgou_service_goods_api</artifactId>
  18. <version>1.0-SNAPSHOT</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>com.changgou</groupId>
  22. <artifactId>changgou_service_search_api</artifactId>
  23. <version>1.0-SNAPSHOT</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-amqp</artifactId>
  28. </dependency>
  29. </dependencies>

application

  1. server:
  2. port: 9009
  3. spring:
  4. application:
  5. name: search
  6. rabbitmq:
  7. host: 192.168.130.128
  8. redis:
  9. host: 192.168.130.128
  10. main:
  11. allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册
  12. data:
  13. elasticsearch:
  14. cluster-name: elasticsearch
  15. cluster-nodes: 192.168.130.128:9300
  16. thymeleaf:
  17. cache: false
  18. eureka:
  19. client:
  20. service-url:
  21. defaultZone: http://127.0.0.1:6868/eureka
  22. instance:
  23. prefer-ip-address: true
  24. feign:
  25. hystrix:
  26. enabled: true
  27. client:
  28. config:
  29. default: #配置全局的feign的调用超时时间 如果 有指定的服务配置 默认的配置不会生效
  30. connectTimeout: 600000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接 单位是毫秒
  31. readTimeout: 600000 # 指定的是调用服务提供者的 服务 的超时时间() 单位是毫秒
  32. #hystrix 配置
  33. hystrix:
  34. command:
  35. default:
  36. execution:
  37. timeout:
  38. #如果enabled设置为false,则请求超时交给ribbon控制
  39. enabled: false
  40. isolation:
  41. strategy: SEMAPHORE

在com.changgou.search 创建启动类SearchApplication

  1. package com.changgou.search;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  5. import org.springframework.cloud.openfeign.EnableFeignClients;
  6. @SpringBootApplication
  7. @EnableEurekaClient //声明为Eureka客户端
  8. @EnableFeignClients(basePackages = {"com.changgou.goods.feign"}) //开启Feign
  9. public class SearchApplication {
  10. public static void main(String[] args) {
  11. SpringApplication.run(SearchApplication.class,args);
  12. }
  13. }

创建config包 将canal中的MQconfig复制过来

  1. package com.changgou.search.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //定义交换机名称
  9. public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";
  10. //定义队列名称
  11. public static final String AD_UPDATE_QUEUE = "ad_update_queue";
  12. public static final String SEARCH_ADD_QUEUE = "search_add_queue";
  13. //声明队列
  14. @Bean
  15. public Queue queue() {
  16. return new Queue(AD_UPDATE_QUEUE);
  17. }
  18. @Bean(SEARCH_ADD_QUEUE)
  19. public Queue SEARCH_ADD_QUEUE() {
  20. return new Queue(SEARCH_ADD_QUEUE);
  21. }
  22. //声明交换机
  23. @Bean(GOODS_UP_EXCHANGE)
  24. public Exchange GOODS_UP_EXCHANGE() {
  25. return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
  26. }
  27. //队列与交换机绑定
  28. @Bean()
  29. public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {
  30. return BindingBuilder.bind(queue).to(exchange).with("").noargs();
  31. }
  32. }

4.5. 商品服务查询商品信息

SkuController新增方法

  1. @GetMapping("/spu/{spuId}")
  2. public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId) {
  3. Map<String, Object> searchMap = new HashMap<>();
  4. if (!"all".equals(spuId)) {
  5. searchMap.put("spuId", spuId);
  6. }
  7. searchMap.put("status", "1");
  8. List<Sku> list = skuService.findList(searchMap);
  9. return list;
  10. }

changgou_service_goods_api新增common依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.changgou</groupId>
  4. <artifactId>changgou_common</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. </dependency>
  7. </dependencies>

在feign包下定义feign接口

  1. package com.changgou.goods.feign;
  2. import com.changgou.goods.pojo.Sku;
  3. import org.springframework.cloud.openfeign.FeignClient;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.PathVariable;
  6. import java.util.List;
  7. @FeignClient(name = "goods")
  8. public interface SkuFeign {
  9. @GetMapping("/sku/spu/{spuId}")
  10. List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId);
  11. }

4.6. 搜索微服务批量导入数据逻辑

创建 com.changgou.search.dao包,并新增ESManagerMapper接口

  1. package com.changgou.search.dao;
  2. import com.changgou.search.pojo.SkuInfo;
  3. import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
  4. public interface ESManagerMapper extends ElasticsearchRepository<SkuInfo,Long> {
  5. }

创建 com.changgou.search.service包,包下创建接口EsManagerService

  1. package com.changgou.search.service;
  2. public interface ESManagerService {
  3. //创建索引库结构
  4. void createMappingAndIndex();
  5. //导入全部数据进入es
  6. void importAll();
  7. //根据spuid查询skuList 再导入索引库
  8. void importDataBySpuId(String spuId);
  9. }

创建com.changgou.search.service.impl包,包下创建服务实现类

  1. package com.changgou.search.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.changgou.goods.feign.SkuFeign;
  4. import com.changgou.goods.pojo.Sku;
  5. import com.changgou.search.dao.ESManagerMapper;
  6. import com.changgou.search.pojo.SkuInfo;
  7. import com.changgou.search.service.ESManagerService;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
  10. import org.springframework.stereotype.Service;
  11. import java.util.List;
  12. import java.util.Map;
  13. @Service
  14. public class ESManagerServiceImpl implements ESManagerService {
  15. @Autowired
  16. private ElasticsearchTemplate elasticsearchTemplate;
  17. @Autowired
  18. private SkuFeign skuFeign;
  19. @Autowired
  20. private ESManagerMapper esManagerMapper;
  21. //创建索引结构
  22. @Override
  23. public void createMappingAndIndex() {
  24. //创建索引
  25. elasticsearchTemplate.createIndex(SkuInfo.class);
  26. //创建映射
  27. elasticsearchTemplate.putMapping(SkuInfo.class);
  28. }
  29. //导入全部sku集合到索引库
  30. @Override
  31. public void importAll() {
  32. //查询sku集合
  33. List<Sku> skuList = skuFeign.findSkuListBySpuId("all");
  34. if (skuList == null || skuList.size() <= 0) {
  35. throw new RuntimeException("当前没有数据被查询到,无法导入索引库");
  36. }
  37. //将skulist转换为json
  38. String jsonSkulist = JSON.toJSONString(skuList);
  39. //将json转换为skuinfo
  40. List<SkuInfo> skuInfos = JSON.parseArray(jsonSkulist, SkuInfo.class);
  41. for (SkuInfo skuInfo : skuInfos) {
  42. //将规格信息转换为map
  43. Map map = JSON.parseObject(skuInfo.getSpec(), Map.class);
  44. skuInfo.setSpecMap(map);
  45. }
  46. //导入索引库
  47. esManagerMapper.saveAll(skuInfos);
  48. }
  49. //根据spuid查询skulist 添加到索引库
  50. @Override
  51. public void importDataBySpuId(String spuId) {
  52. List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);
  53. if (skuList == null || skuList.size() <= 0) {
  54. throw new RuntimeException("当前没有数据被查询到,无法导入索引库");
  55. }
  56. //将集合转换为json
  57. String jsonSkuList = JSON.toJSONString(skuList);
  58. List<SkuInfo> skuInfos = JSON.parseArray(jsonSkuList, SkuInfo.class);
  59. for (SkuInfo skuInfo : skuInfos) {
  60. //将规格信息进行这
  61. Map map = JSON.parseObject(skuInfo.getSpec(), Map.class);
  62. skuInfo.setSpecMap(map);
  63. }
  64. //添加索引库
  65. esManagerMapper.saveAll(skuInfos);
  66. }
  67. }

创建com.changgou.search.controller.定义ESManagerController

  1. package com.changgou.search.controller;
  2. import com.changgou.entity.Result;
  3. import com.changgou.entity.StatusCode;
  4. import com.changgou.search.service.ESManagerService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @RestController
  10. @RequestMapping("/manager")
  11. public class ESManagerController {
  12. @Autowired
  13. private ESManagerService esManagerService;
  14. //创建索引库结构
  15. @GetMapping("/create")
  16. public Result create(){
  17. esManagerService.createMappingAndIndex();
  18. return new Result(true, StatusCode.OK,"创建索引结构成功");
  19. }
  20. //导入全部数据
  21. @GetMapping("importAll")
  22. public Result importAll(){
  23. esManagerService.importAll();
  24. return new Result(true, StatusCode.OK,"导入全部数据成功");
  25. }
  26. }

4.7. 接受mq消息执行导入

changgou_service_search工程创建com.changgou.search.listener包,包下创建类

  1. package com.changgou.search.listener;
  2. import com.changgou.search.config.RabbitMQConfig;
  3. import com.changgou.search.service.ESManagerService;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class GoodsUpListener {
  9. @Autowired
  10. private ESManagerService esManagerService;
  11. @RabbitListener(queues = RabbitMQConfig.SEARCH_ADD_QUEUE)
  12. public void receiveMessage(String spuId) {
  13. System.out.println("接受到的消息为" + spuId);
  14. //查询skulist 并导入到索引库中
  15. esManagerService.importDataBySpuId(spuId);
  16. }
  17. }

测试

先删除所有索引

改变changgou_goods库的tb_spu表 中任意一行数据的is_marketable 改为0 再改为

查看索引是否有被写入

5. 商品下架索引库删除数据

(1)在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为0时,表示商品下架,将spu的id发送到rabbitmq。

(2)在rabbitmq管理后台创建商品下架交换器(fanout)。使用分列模式的交换器是考虑商品下架会有很多种逻辑需要处理,索引库删除数据只是其中一项,另外还有删除商品详细页等操作。

(3)搜索微服务从rabbitmq的的队列中提取spu的id,通过调用elasticsearch的高级restAPI 将相关的sku列表从索引库删除。

06. day06 监听数据库更新广告缓存 - 图5

5.1. 创建交换机和队列

在MQconifig类中 添加交换机goods_down_exchange 队列 search_delete_queue 并进行绑定

  1. package com.changgou.search.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //定义交换机名称
  9. public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";
  10. public static final String GOODS_DOWN_EXCHANGE="goods_down_exchange";
  11. //定义队列名称
  12. public static final String AD_UPDATE_QUEUE = "ad_update_queue";
  13. public static final String SEARCH_ADD_QUEUE = "search_add_queue";
  14. public static final String SEARCH_DEL_QUEUE="search_del_queue";
  15. //声明队列
  16. @Bean
  17. public Queue queue() {
  18. return new Queue(AD_UPDATE_QUEUE);
  19. }
  20. @Bean(SEARCH_ADD_QUEUE)
  21. public Queue SEARCH_ADD_QUEUE() {
  22. return new Queue(SEARCH_ADD_QUEUE);
  23. }
  24. @Bean(SEARCH_DEL_QUEUE)
  25. public Queue SEARCH_DEL_QUEUE(){
  26. return new Queue(SEARCH_DEL_QUEUE);
  27. }
  28. //声明交换机
  29. @Bean(GOODS_UP_EXCHANGE)
  30. public Exchange GOODS_UP_EXCHANGE() {
  31. return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
  32. }
  33. @Bean(GOODS_DOWN_EXCHANGE)
  34. public Exchange GOODS_DOWN_EXCHANGE(){
  35. return ExchangeBuilder.fanoutExchange(GOODS_DOWN_EXCHANGE).durable(true).build();
  36. }
  37. //队列与交换机绑定
  38. @Bean
  39. public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE) Queue queue, @Qualifier(GOODS_UP_EXCHANGE) Exchange exchange) {
  40. return BindingBuilder.bind(queue).to(exchange).with("").noargs();
  41. }
  42. @Bean
  43. public Binding GOODS_DOWN_EXCHANGE_BINDING(@Qualifier(SEARCH_DEL_QUEUE)Queue queue,@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange){
  44. return BindingBuilder.bind(queue).to(exchange).with("").noargs();
  45. }
  46. }

注意canal下和search下的MQconfig都要添加

5.2. canal监听下架

在com.itheima.canal.listener.SpuListener 添加下架逻辑判断

  1. //获取最新下架的商品
  2. if ("1".equals(oldData.get("is_marketable")) && "0".equals(newData.get("is_marketable"))) {
  3. //将商品的spu id 发送到mq队列中
  4. rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_DOWN_EXCHANGE, "", newData.get("id"));
  5. }

5.3. 根据spuId删除索引数据

ESManagerService新增方法定义

  1. //根据spuid删除es索引中相关的sku数据
  2. void delDataBySpuId(String spuId);

impl实现类

  1. //根据spuid删除指定索引
  2. @Override
  3. public void delDataBySpuId(String spuId) {
  4. List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);
  5. if (skuList == null || skuList.size() <= 0) {
  6. throw new RuntimeException("当前没有数据被查询到,无法导入索引库");
  7. }
  8. for (Sku sku : skuList) {
  9. esManagerMapper.deleteById(Long.parseLong(sku.getId()));
  10. }
  11. }

5.4. 接收mq消息,执行索引库删除

从rabbitmq中提取消息,调动根据spuId删除索引库数据的方法 changgou_service_search新增监听类

  1. package com.changgou.search.listener;
  2. import com.changgou.search.config.RabbitMQConfig;
  3. import com.changgou.search.service.ESManagerService;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class GoodsDelListener {
  9. @Autowired
  10. private ESManagerService esManagerService;
  11. @RabbitListener(queues = RabbitMQConfig.SEARCH_DEL_QUEUE)
  12. public void receiveMessage(String spuId){
  13. System.out.println("删除索引库id为"+spuId);
  14. //调用业务层完成索引库数据删除
  15. esManagerService.delDataBySpuId(spuId);
  16. }
  17. }

测试 将上架状态1 改为0 查看索引库