一、消息中间件介绍:

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)常见的消息中间件产品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。
(2)RabbitMQ(elang语言)
AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
(5)RocketMQ
阿里的开源项目,2012年己交给Apache。

二、目前项目中面临的问题:

image.png

问题: 我们已经完成了5个web模块和4个服务模块。其中运营商后台的调用关系最多,用到了商家商品服务、广告内容服务、搜索服务和页面生成服务。这种模块之间的依赖也称之为耦合。而耦合越多,之后的维护工作就越困难。那么如果改善系统模块调用关系、减少模块之间的耦合呢?我们接下来就介绍一种解决方案——消息中间件。

2.1 解决问题:

2.1.1 改造系统模块调用关系(解耦)

image.png

2.2 JMS简介

2.2.1什么是JMS

JMS(JavaMessaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(javaDatabase Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一
些不同形式的数据,提供现有消息格式的一些级别的兼容性。

· TextMessage—一个字符串对象 · MapMessage—一套名称-值对 · ObjectMessage—一个序列化的 Java 对象 · BytesMessage—一个字节的数据流 · StreamMessage — Java 原始值的数据流

2.2.2 JMS消息传递类型

对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
image.png
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
image.png

三、使用消息中间件ActiveMQ:

3.1 在docker下安装activeMQ:

下载镜像】:

  1. docker pull rmohr/activemq

安装容器】:

  1. docker run -id --name activemq -p 61616:61616 -p 8161:8161 rmohr/activemq

查看效果】:
image.png

说明: 第一次进入,用户名,密码都是admin

3.2 在springboot下使用activemq:

3.2.1 创建一个生产者工程:

image.png

3.2.2 生产者工程使用步骤如下:

第一步:添加依赖如下 :

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>
  6. <!--4.引入单元测试-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-test</artifactId>
  10. <scope>test</scope>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-test</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>junit</groupId>
  18. <artifactId>junit</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework</groupId>
  22. <artifactId>spring-test</artifactId>
  23. </dependency>
  24. </dependencies>

第二步:创建application.yml文件:

  1. server:
  2. port: 8801
  3. spring:
  4. activemq:
  5. broker-url: tcp://192.168.56.10:61616
  6. jms:
  7. pub-sub-domain: true #代表发布的是:发布/订阅消息,默认是点对点消息

第三步:创建启动类:

  1. /**
  2. * ------------------------------
  3. * 功能:
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/7-15:21
  7. * ------------------------------
  8. */
  9. @SpringBootApplication
  10. public class JmsProducerAppliation {
  11. public static void main(String[] args) {
  12. SpringApplication.run(JmsProducerAppliation.class);
  13. }
  14. }

第四步:创建单元测试类:

  1. /**
  2. * ------------------------------
  3. * 功能:
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/7-15:07
  7. * ------------------------------
  8. */
  9. @RunWith(SpringRunner.class)
  10. @SpringBootTest
  11. public class Test01 {
  12. @Autowired
  13. private JmsMessagingTemplate jmsMessagingTemplate;
  14. /**
  15. * 功能: 发送[点对点]消息
  16. * 参数:
  17. * 返回值: void
  18. * 时间: 2021/8/7 15:08
  19. */
  20. @Test
  21. public void test01(){
  22. //1. 发送点对点消息
  23. jmsMessagingTemplate.convertAndSend("zelin-01","发送第一个消息。。。");
  24. }
  25. /**
  26. * 功能: 发送 [发布/订阅] 消息
  27. * 参数:
  28. * 返回值:
  29. * 时间: 2021/8/7 15:37
  30. */
  31. @Test
  32. public void test02(){
  33. //2. 发送发布/订阅消息
  34. Destination dest = new ActiveMQTopic("zelin-topic");
  35. jmsMessagingTemplate.convertAndSend(dest,"发送发布/订阅消息!");
  36. }
  37. }

3.2.3 创建一个消费者工程:

image.png

3.2.3 消费者工程使用步骤:

第一步:在工程pom.xml添加依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>
  6. <!--4.引入单元测试-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-test</artifactId>
  10. <scope>test</scope>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-test</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>junit</groupId>
  18. <artifactId>junit</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework</groupId>
  22. <artifactId>spring-test</artifactId>
  23. </dependency>
  24. </dependencies>

第二步:添加application.yml映射文件:

  1. server:
  2. port: 8802
  3. spring:
  4. activemq:
  5. broker-url: tcp://192.168.56.10:61616
  6. jms:
  7. pub-sub-domain: true

第三步:复制配置,修改端口,创建一个新的消费者:
image.png image.png
第四步:根据实际情况,定义监听方代码实现:

  1. /**
  2. * ------------------------------
  3. * 功能:监听点对点消息(P2P)
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/7-15:11
  7. * ------------------------------
  8. */
  9. @Component
  10. public class MyMessageListener {
  11. @JmsListener(destination = "zelin-01")
  12. public void getMessage01(String message){
  13. System.out.println("得到消息:" + message);
  14. }
  15. }
  1. /**
  2. * ------------------------------
  3. * 功能:监听: [发布/订阅] 消息
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/7-15:35
  7. * ------------------------------
  8. */
  9. @Component
  10. public class MyMessageListener2 {
  11. @JmsListener(destination = "zelin-topic")
  12. public void getMessage(String message){
  13. System.out.println("得到消息:" + message);
  14. }
  15. }

第五步:查看效果:
【发送消息】:
image.png
【接受消息】:
image.png

四、完成更新索引库的操作:

::【消息发送方】处理:

4.1 在zyg-manager-web中添加activemq依赖,并注掉对搜索服务的依赖:

image.png

4.2 在application.yml文件中添加配置:

  1. spring:
  2. activemq:
  3. broker-url: tcp://192.168.56.10:61616

4.3 修改goodsController中的updateStatus方法:

  1. /**
  2. * 功能: 批量审核数据[数据一致性同步方案二: 使用消息中间件]
  3. * 参数:
  4. * 返回值: com.zelin.utils.R
  5. * 时间: 2021/7/30 16:11
  6. */
  7. @RequestMapping("updateStatus")
  8. public R updateStatus(Long[] ids,String status){
  9. //1. 审核商品
  10. goodsService.updateStatus(ids,status);
  11. //2. 根据商品id查询sku列表
  12. List<ItemEntity> entities = itemService.findItemsbyGoodsId(ids);
  13. //3. 将商品列表包装成字符串发送出去
  14. if(entities != null && entities.size() > 0) {
  15. jmsMessagingTemplate.convertAndSend("updateStatus", JSON.toJSONString(entities));
  16. }else{
  17. return R.error().put("msg","没有要审核的商品!");
  18. }
  19. return R.ok();
  20. }

::【消息监听方】处理:

4.4 在zyg-search-service中添加依赖:

  1. <!--引入activemq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>

4.5 在zyg-search-service中application.yml配置

  1. spring:
  2. activemq:
  3. broker-url: tcp://192.168.56.10:61616

4.6 定义监听器:

  1. /**
  2. * ------------------------------
  3. * 功能:定义监听sku商品列表
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/9-14:41
  7. * ------------------------------
  8. */
  9. @Component
  10. public class MyMessageListener {
  11. @Autowired
  12. private ItemSearchService itemSearchService;
  13. @JmsListener(destination = "updateStatus")
  14. public void getSkuList(String skuList){
  15. //1. 得到sku列表,并转换为集合
  16. List<ItemEntity> entities = JSON.parseArray(skuList, ItemEntity.class);
  17. //2. 将sku商品列表添加到索引库中
  18. itemSearchService.updateToIndex(entities);
  19. //3. 打印提示信息
  20. System.out.println("更新到索引库成功!");
  21. }
  22. }

4.7 商品审核效果:

image.png
image.png

五、完成生成静态页面的效果:

::【消息发送方】处理:

5.1 在zyg-manager-web中注掉对于生成静态页面服务依赖:

  1. <!--引入生成静态页面的服务-->
  2. <!--<dependency>-->
  3. <!-- <groupId>com.zelin</groupId>-->
  4. <!-- <artifactId>zyg-page-interface</artifactId>-->
  5. <!-- <version>2.0</version>-->
  6. <!--</dependency>-->

5.2 在zyg-manager-web中的goodsController修改生成静态页面的方法:

  1. /**
  2. * 功能: 根据商品id生成商品静态面[利用消息中间件发送消息]
  3. * 参数:
  4. * 返回值:
  5. * 时间: 2021/8/6 14:31
  6. */
  7. @RequestMapping("/item")
  8. public void createHtml(String goodsId) throws IOException {
  9. jmsMessagingTemplate.convertAndSend("createHtml",goodsId);
  10. }

::【消息监听方】处理:

5.3 在zyg-page-service中添加依赖:

  1. <!--2. 添加activemq依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-activemq</artifactId>
  5. </dependency>

5.4 在zyg-page-service中的application.yml中添加配置:

  1. spring:
  2. activemq:
  3. broker-url: tcp://192.168.56.10:61616

5.5 在zyg-page-service中添加监听器:

  1. /**
  2. * ------------------------------
  3. * 功能:
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/9-15:02
  7. * ------------------------------
  8. */
  9. @Component
  10. public class MyMessageListener {
  11. @Autowired
  12. private PageSerivce pageSerivce;
  13. //1. 根据得到的商品id,生成静态页面
  14. @JmsListener(destination = "createHtml")
  15. public void createHtml(String goodsId) throws IOException {
  16. System.out.println("goodsId = " + goodsId);
  17. pageSerivce.createHtml(Long.parseLong(goodsId));
  18. }
  19. }

5.6 最后生成静态页面:

image.png

六、当我们删除商品时,从索引库删除相关sku商品及商品详情页面:

6.1 处理删除数据的序列化问题:(application.yml文件配置)

  1. spring:
  2. activemq:
  3. broker-url: tcp://192.168.56.10:61616
  4. packages:
  5. trust-all: true

说明: 1、默认只处理java.lang包中的一些类的序列化,其它类不能处理 2、上面的配置packages.trust-all: 代表所有的包都可以进行序列化操作. 3、我们需要在消息发送方及消息监听方都做这一配置。(即: zyg-search-service,zyg-page-service,zyg-manager-web都要进行配置).

6.2 在goodsController中删除商品时,发送消息:

  1. /**
  2. * 逻辑删除
  3. * 思路:
  4. * ① 在删除商品时,同时删除索引库中相关sku的列表信息
  5. * ② 同时删除此商品生成的静态页面
  6. */
  7. @RequestMapping("/delete")
  8. public R delete(@RequestBody Long[] ids){
  9. //1. 根据商品id删除商品(逻辑删除)
  10. goodsService.removeByIds(Arrays.asList(ids));
  11. //2. 发送消息,访问搜索服务,从索引库中删除相关的sku列表
  12. jmsMessagingTemplate.convertAndSend("deleteSku",Arrays.asList(ids));
  13. //3. 发送消息,访问页面静态化服务,从生成的静态页面中删除此相关的静态页
  14. jmsMessagingTemplate.convertAndSend("deletePage",Arrays.asList(ids));
  15. return R.ok();
  16. }

6.3 在zyg-search-service中监听删除的消息:

【在服务中定义删除功能】:

  1. /**
  2. * 功能: 从索引库中根据goodsid删除商品
  3. * 参数:
  4. * 返回值:
  5. * 时间: 2021/8/9 15:45
  6. */
  7. public void deleteSku(List<Long> goodsIds){
  8. //1. 构造查询条件
  9. Query query = new CriteriaQuery(new Criteria("goodsId").in(goodsIds));
  10. //2. 从索引库中删除记录
  11. restTemplate.delete(query,ItemEntity.class,IndexCoordinates.of("item"));
  12. }

【定义监听器方法】:

  1. /**
  2. * 功能: 从索引库中根据商品id删除数据
  3. * 参数:
  4. * 返回值:
  5. * 时间: 2021/8/9 15:44
  6. */
  7. @JmsListener(destination = "deleteSku")
  8. public void deleteSku(List<Long> ids){
  9. itemSearchService.deleteSku(ids);
  10. }

6.4 在zyg-page-service中监听删除的消息:

【在服务中定义删除功能】:

  1. /**
  2. * 功能: 根据商品id删除静态页面
  3. * 参数:
  4. * 返回值: void
  5. * 时间: 2021/8/9 15:51
  6. */
  7. @Override
  8. public void deletePage(List<Long> ids) {
  9. for (Long id : ids) {
  10. String path = FILE_PATH + id + ".html";
  11. System.out.println("path = " + path);
  12. new File(path).delete();
  13. }
  14. }

【定义监听器方法】:

  1. //2. 删除静态页面
  2. @JmsListener(destination = "deletePage")
  3. public void deletePage(List<Long> ids){
  4. pageSerivce.deletePage(ids);
  5. }

6.5 运行效果(略):