一、消息中间件介绍:
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)常见的消息中间件产品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。
(2)RabbitMQ(elang语言)
AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
(5)RocketMQ:
阿里的开源项目,2012年己交给Apache。
二、目前项目中面临的问题:
问题: 我们已经完成了5个web模块和4个服务模块。其中运营商后台的调用关系最多,用到了商家商品服务、广告内容服务、搜索服务和页面生成服务。这种模块之间的依赖也称之为耦合。而耦合越多,之后的维护工作就越困难。那么如果改善系统模块调用关系、减少模块之间的耦合呢?我们接下来就介绍一种解决方案——消息中间件。
2.1 解决问题:
2.1.1 改造系统模块调用关系(解耦)
2.2 JMS简介
2.2.1什么是JMS
JMS(JavaMessaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(javaDatabase Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一
些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage—一个字符串对象 · MapMessage—一套名称-值对 · ObjectMessage—一个序列化的 Java 对象 · BytesMessage—一个字节的数据流 · StreamMessage — Java 原始值的数据流
2.2.2 JMS消息传递类型
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
三、使用消息中间件ActiveMQ:
3.1 在docker下安装activeMQ:
【下载镜像】:
docker pull rmohr/activemq
【安装容器】:
docker run -id --name activemq -p 61616:61616 -p 8161:8161 rmohr/activemq
【查看效果】:
说明: 第一次进入,用户名,密码都是admin
3.2 在springboot下使用activemq:
3.2.1 创建一个生产者工程:
3.2.2 生产者工程使用步骤如下:
第一步:添加依赖如下 :
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--4.引入单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
第二步:创建application.yml文件:
server:
port: 8801
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
jms:
pub-sub-domain: true #代表发布的是:发布/订阅消息,默认是点对点消息
第三步:创建启动类:
/**
* ------------------------------
* 功能:
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/7-15:21
* ------------------------------
*/
@SpringBootApplication
public class JmsProducerAppliation {
public static void main(String[] args) {
SpringApplication.run(JmsProducerAppliation.class);
}
}
第四步:创建单元测试类:
/**
* ------------------------------
* 功能:
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/7-15:07
* ------------------------------
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test01 {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 功能: 发送[点对点]消息
* 参数:
* 返回值: void
* 时间: 2021/8/7 15:08
*/
@Test
public void test01(){
//1. 发送点对点消息
jmsMessagingTemplate.convertAndSend("zelin-01","发送第一个消息。。。");
}
/**
* 功能: 发送 [发布/订阅] 消息
* 参数:
* 返回值:
* 时间: 2021/8/7 15:37
*/
@Test
public void test02(){
//2. 发送发布/订阅消息
Destination dest = new ActiveMQTopic("zelin-topic");
jmsMessagingTemplate.convertAndSend(dest,"发送发布/订阅消息!");
}
}
3.2.3 创建一个消费者工程:
3.2.3 消费者工程使用步骤:
第一步:在工程pom.xml添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--4.引入单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
第二步:添加application.yml映射文件:
server:
port: 8802
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
jms:
pub-sub-domain: true
第三步:复制配置,修改端口,创建一个新的消费者:
第四步:根据实际情况,定义监听方代码实现:
/**
* ------------------------------
* 功能:监听点对点消息(P2P)
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/7-15:11
* ------------------------------
*/
@Component
public class MyMessageListener {
@JmsListener(destination = "zelin-01")
public void getMessage01(String message){
System.out.println("得到消息:" + message);
}
}
/**
* ------------------------------
* 功能:监听: [发布/订阅] 消息
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/7-15:35
* ------------------------------
*/
@Component
public class MyMessageListener2 {
@JmsListener(destination = "zelin-topic")
public void getMessage(String message){
System.out.println("得到消息:" + message);
}
}
四、完成更新索引库的操作:
4.1 在zyg-manager-web中添加activemq依赖,并注掉对搜索服务的依赖:
4.2 在application.yml文件中添加配置:
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
4.3 修改goodsController中的updateStatus方法:
/**
* 功能: 批量审核数据[数据一致性同步方案二: 使用消息中间件]
* 参数:
* 返回值: com.zelin.utils.R
* 时间: 2021/7/30 16:11
*/
@RequestMapping("updateStatus")
public R updateStatus(Long[] ids,String status){
//1. 审核商品
goodsService.updateStatus(ids,status);
//2. 根据商品id查询sku列表
List<ItemEntity> entities = itemService.findItemsbyGoodsId(ids);
//3. 将商品列表包装成字符串发送出去
if(entities != null && entities.size() > 0) {
jmsMessagingTemplate.convertAndSend("updateStatus", JSON.toJSONString(entities));
}else{
return R.error().put("msg","没有要审核的商品!");
}
return R.ok();
}
4.4 在zyg-search-service中添加依赖:
<!--引入activemq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
4.5 在zyg-search-service中application.yml配置
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
4.6 定义监听器:
/**
* ------------------------------
* 功能:定义监听sku商品列表
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/9-14:41
* ------------------------------
*/
@Component
public class MyMessageListener {
@Autowired
private ItemSearchService itemSearchService;
@JmsListener(destination = "updateStatus")
public void getSkuList(String skuList){
//1. 得到sku列表,并转换为集合
List<ItemEntity> entities = JSON.parseArray(skuList, ItemEntity.class);
//2. 将sku商品列表添加到索引库中
itemSearchService.updateToIndex(entities);
//3. 打印提示信息
System.out.println("更新到索引库成功!");
}
}
4.7 商品审核效果:
五、完成生成静态页面的效果:
5.1 在zyg-manager-web中注掉对于生成静态页面服务依赖:
<!--引入生成静态页面的服务-->
<!--<dependency>-->
<!-- <groupId>com.zelin</groupId>-->
<!-- <artifactId>zyg-page-interface</artifactId>-->
<!-- <version>2.0</version>-->
<!--</dependency>-->
5.2 在zyg-manager-web中的goodsController修改生成静态页面的方法:
/**
* 功能: 根据商品id生成商品静态面[利用消息中间件发送消息]
* 参数:
* 返回值:
* 时间: 2021/8/6 14:31
*/
@RequestMapping("/item")
public void createHtml(String goodsId) throws IOException {
jmsMessagingTemplate.convertAndSend("createHtml",goodsId);
}
5.3 在zyg-page-service中添加依赖:
<!--2. 添加activemq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
5.4 在zyg-page-service中的application.yml中添加配置:
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
5.5 在zyg-page-service中添加监听器:
/**
* ------------------------------
* 功能:
* 作者:WF
* 微信:hbxfwf13590332912
* 创建时间:2021/8/9-15:02
* ------------------------------
*/
@Component
public class MyMessageListener {
@Autowired
private PageSerivce pageSerivce;
//1. 根据得到的商品id,生成静态页面
@JmsListener(destination = "createHtml")
public void createHtml(String goodsId) throws IOException {
System.out.println("goodsId = " + goodsId);
pageSerivce.createHtml(Long.parseLong(goodsId));
}
}
5.6 最后生成静态页面:
六、当我们删除商品时,从索引库删除相关sku商品及商品详情页面:
6.1 处理删除数据的序列化问题:(application.yml文件配置)
spring:
activemq:
broker-url: tcp://192.168.56.10:61616
packages:
trust-all: true
说明: 1、默认只处理java.lang包中的一些类的序列化,其它类不能处理 2、上面的配置packages.trust-all: 代表所有的包都可以进行序列化操作. 3、我们需要在消息发送方及消息监听方都做这一配置。(即: zyg-search-service,zyg-page-service,zyg-manager-web都要进行配置).
6.2 在goodsController中删除商品时,发送消息:
/**
* 逻辑删除
* 思路:
* ① 在删除商品时,同时删除索引库中相关sku的列表信息
* ② 同时删除此商品生成的静态页面
*/
@RequestMapping("/delete")
public R delete(@RequestBody Long[] ids){
//1. 根据商品id删除商品(逻辑删除)
goodsService.removeByIds(Arrays.asList(ids));
//2. 发送消息,访问搜索服务,从索引库中删除相关的sku列表
jmsMessagingTemplate.convertAndSend("deleteSku",Arrays.asList(ids));
//3. 发送消息,访问页面静态化服务,从生成的静态页面中删除此相关的静态页
jmsMessagingTemplate.convertAndSend("deletePage",Arrays.asList(ids));
return R.ok();
}
6.3 在zyg-search-service中监听删除的消息:
【在服务中定义删除功能】:
/**
* 功能: 从索引库中根据goodsid删除商品
* 参数:
* 返回值:
* 时间: 2021/8/9 15:45
*/
public void deleteSku(List<Long> goodsIds){
//1. 构造查询条件
Query query = new CriteriaQuery(new Criteria("goodsId").in(goodsIds));
//2. 从索引库中删除记录
restTemplate.delete(query,ItemEntity.class,IndexCoordinates.of("item"));
}
【定义监听器方法】:
/**
* 功能: 从索引库中根据商品id删除数据
* 参数:
* 返回值:
* 时间: 2021/8/9 15:44
*/
@JmsListener(destination = "deleteSku")
public void deleteSku(List<Long> ids){
itemSearchService.deleteSku(ids);
}
6.4 在zyg-page-service中监听删除的消息:
【在服务中定义删除功能】:
/**
* 功能: 根据商品id删除静态页面
* 参数:
* 返回值: void
* 时间: 2021/8/9 15:51
*/
@Override
public void deletePage(List<Long> ids) {
for (Long id : ids) {
String path = FILE_PATH + id + ".html";
System.out.println("path = " + path);
new File(path).delete();
}
}
【定义监听器方法】:
//2. 删除静态页面
@JmsListener(destination = "deletePage")
public void deletePage(List<Long> ids){
pageSerivce.deletePage(ids);
}