Author:Gorit
Date:2020年11月29日
reference:《一步一步学 SpringBoot2微服务项目实战》

一、异步消息与异步调用

本节内容主要围绕 ActiveMQ的安装和使用, SpringBoot 集成 ActiveMQ,利用 ActiveMQ 实现一个异步发表朋友圈以及 SpringBoot 异步调用 @Async

二、JMS 消息介绍

JMS (Java Message Service)即 Java 消息服务,是一组 Java 应用程序接口,提供消息的创建,发送,读取等一系列服务。JMS 提供了一组公共应用程序接口和响应的语法

JMS 还支持两种消息发送 和 消息接收模型

2.1 JMS通信类型 (P2P)

采用点对点的方式发送消息。P2P 的模型是基于 队列, 消费生产者(Producer)发送消息到队列,消息消费者(Consumer)从队列中接收消息,队列的存在,使得消息的异步成为可能。

image.png

P2P的特点

  1. 每个消息只有一个消费者(消息一旦被消费,消息就不存在队列之中了)
  2. 发送者 和 接受者在时间上没有依赖
  3. 接受者不管是否运行,都不会影响到消息被发送到队列中
  4. 接收者在成功接收消息之后需向队列应答成功

2.2 JMS通信类型 (Pub/Sub)

即 发布/订阅模型,发布-订阅模型定义了如何像一个内容节点发布 和 订阅消息,这个内容节点称为 topic(主题),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者 和 消息的发布者相互保持独立,不需要进行接触,即可进行消息的传递

发布 — 订阅模型在消息一对多广播时采用
image.png
Pub/Sub 特点

  1. 每个消息可以有多个消费者
  2. 发布者 与 订阅者有时间上的依赖性
  3. 针对某个主题的(Topic)的订阅者,必须创建一个订阅者之后才能消费发布者的消息
  4. 为了消费信息,订阅者必须保持运行状态

三、SpringBoot 集成 ActiveMQ

3.1 ActiveMQ 概述

MQ 即 MessageQueue,中文名是消息队列,是一个消息接收和转发的一个容器,可用于消息推送。 ActiveMQ 是 Apache 提供的一个开源消息系统,完全采用 Java 实现,因此能很好的支持 JavaEE 提出的 JMS (Java Message Service,Java 消息服务)规范。

3.2 使用 ActiveMQ

提前下载好 ActiveMQ
apache-activemq-5.16.0.zip

然后解压,进入 apache-activemq-5.16.0\bin\win64 目录下,打开 activemq.bat 启动服务

前提是一定要安装好 Java 环境
image.png
然后登陆 http://127.0.0.1:8161/
账号和密码都是:admin

image.png

3.3 开发一个简单的朋友圈发说说的功能

  1. 导入 Maven 坐标依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-web</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.springframework.boot</groupId>
    8. <artifactId>spring-boot-starter-activemq</artifactId>
    9. </dependency>
    10. <dependency>
    11. <groupId>org.springframework.boot</groupId>
    12. <artifactId>spring-boot-starter-data-jpa</artifactId>
    13. </dependency>
    14. <dependency>
    15. <groupId>mysql</groupId>
    16. <artifactId>mysql-connector-java</artifactId>
    17. <version>8.0.21</version>
    18. <scope>runtime</scope>
    19. </dependency>
    20. <dependency>
    21. <groupId>org.springframework.boot</groupId>
    22. <artifactId>spring-boot-starter-test</artifactId>
    23. <scope>test</scope>
    24. <exclusions>
    25. <exclusion>
    26. <groupId>org.junit.vintage</groupId>
    27. <artifactId>junit-vintage-engine</artifactId>
    28. </exclusion>
    29. </exclusions>
    30. </dependency>
    31. </dependencies>
    32. <dependencyManagement>
    33. <dependencies>
    34. <dependency>
    35. <groupId>org.springframework.boot</groupId>
    36. <artifactId>spring-boot-dependencies</artifactId>
    37. <version>${spring-boot.version}</version>
    38. <type>pom</type>
    39. <scope>import</scope>
    40. </dependency>
    41. </dependencies>
    42. </dependencyManagement>
  2. 配置文件编写 ```xml spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false spring.activemq.in-memory=true spring.activemq.pool.max-connections=100 spring.jms.pub-sub-domain=true spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.username=root spring.datasource.password=root spring.datasource.url=jdbc:mysql://localhost:3306/springboot?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC

spring.jpa.show-sql=true spring.jpa.database=mysql spring.jpa.hibernate.ddl-auto=update


3. 创建实体类
```java
package com.example.entity;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.util.Date;

/**
 * @Classname Mood
 * @Description 说说实体类
 * @Date 2020/11/29 0:30
 * @Created by CodingGorit
 * @Version 1.0
 */
@Entity
@Table
public class Mood {

    @Id
    private String id;
    // 朋友圈内容
    private String content;
    // 用户 ID
    private String userId;
    // 点赞数量
    private Integer praiseNum;
    // 发布日期
    private Date publishTime;

    // getter 和 setter 省略
}
  1. 持久层接口 ```java package com.example.repository;

import com.example.entity.Mood; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository;

/**

  • @Classname ModdRepository
  • @Description TODO
  • @Date 2020/11/29 0:39
  • @Created by CodingGorit
  • @Version 1.0 */ @Repository public interface ModdRepository extends JpaRepository { } ```
  1. service 接口与实现类 ```java package com.example.service;

import com.example.entity.Mood;

/**

  • @Classname MoodService
  • @Description TODO
  • @Date 2020/11/29 0:40
  • @Created by CodingGorit
  • @Version 1.0 */ public interface MoodService {

    Mood save(Mood mood); }

```java
package com.example.service;

import com.example.entity.Mood;
import com.example.repository.ModdRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Classname MoodServiceImpl
 * @Description 说说服务层
 * @Date 2020/11/29 0:40
 * @Created by CodingGorit
 * @Version 1.0
 */
@Service
public class MoodServiceImpl implements MoodService{

    @Autowired
    private ModdRepository moddRepository;

    @Override
    public Mood save(Mood mood) {
        return moddRepository.save(mood);
    }
}
  1. 添加说说 ```java package com.example.demo;

import com.example.entity.Mood; import com.example.service.MoodService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource; import java.util.Date;

@SpringBootTest class DemoApplicationTests {

@Resource
private MoodService moodService;

@Test
void contextLoads() {
    Mood mood = new Mood();
    mood.setId("1");
    mood.setUserId("1");
    mood.setPraiseNum(0);
    // 说说内容
    mood.setContent("这是第一条说说");
    mood.setPublishTime(new Date());
    Mood mood1 =  moodService.save(mood);
}

}

执行完测试之后,我们发现数据库中已经出现了一条记录,这就是 Spring Data Jpa的魅力<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/473179/1606583133146-8ec73266-9ec4-4ba5-aca8-202ef4d52969.png#align=left&display=inline&height=417&margin=%5Bobject%20Object%5D&name=image.png&originHeight=417&originWidth=839&size=47633&status=done&style=none&width=839)<br />但是这还没完,我们并没有用到消息队列。实现这个功能有很多方法,但是我们知道微信的用户群体非常多,每天有几亿用户发不同的说说,如果都按照这么操作的话。用户每发一条说说,后端开辟一个线程,将说说的内容实时保存到数据库中,但是后端的系统资源非常宝贵。这样会对服务器和数据库造成巨大的压力。因此接下来我们会使用 ActiveMQ 做异步消费来抵抗用户量极大的发表说说产生的压力,从而提高系统性能。

<a name="8EsPy"></a>
### 3.4 使用 ActiveMQ 完善的发说说功能

1. 在主配置类开启 Jms @EnableJms
1. 编写说说发布者
```java
package com.example.service.producer;

import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;


/**
 * @Classname MoodProducer
 * @Description 生产者
 * @Date 2020/11/29 10:33
 * @Created by CodingGorit
 * @Version 1.0
 */
@Service
public class MoodProducer {

    // 发消息的工具类,也可以注入 JmsTemplate,
    // 下面这个对 JmsTemplate 进行了封装,参数 destination 是发送到队列的,message 是待发送的消息
    @Resource
    private JmsMessagingTemplate jmt;

    public void sendMessage(Destination destination, final String message) {
        jmt.convertAndSend(destination,message);
    }

}
  1. 编写说说消费者 ```java package com.example.consumer;

import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component;

/**

  • @Classname MoodConsumer
  • @Description 消费者
  • @Date 2020/11/29 10:42
  • @Created by CodingGorit
  • @Version 1.0 */ @Component public class MoodConsumer {

    // 使用 JmsListener 配置消费者监听队列 mood.queue,text 是接收到的消息 @JmsListener(destination = “mood.queue”) public void receiveQueue(String text) {

     System.out.println("用户发表说说【"+text+"】成功");
    

    } } ```

  1. 测试

     @Test
     void testActiveMQ() {
         Destination destination = new ActiveMQQueue("mood.queue");
         moodProducer.sendMessage(destination,"Hello msg");
     }
    
  2. 打开 activeMQ 的管理界面

image.png
可以看到消息发布者的消息
image.png