一、先来一张 RabbitMQ 流程图

image.png
本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列,实现生产者与消费者解耦,所以有必要先贴出来,涵盖了 RabbitMQ 很多知识点,如:

  • 消息发送确认机制
  • 消费确认机制
  • 消息的重新投递
  • 消费幂等性, 等等

    二、实现思路

  • 1.在虚拟机创建一个CentOS7上,并安装 RabbitMQ

  • 2.开放QQ邮箱或者其它邮箱授权码,用于发送邮件
  • 3.创建邮件发送项目并编写代码
  • 4.发送邮件测试
  • 5.消息发送失败处理

    三、RabbitMQ安装

    RabbitMQ 基于 erlang 进行通信,相比其它的软件,安装有些麻烦,不过本例采用rpm方式安装,任何新手都可以完成安装,过程如下!

    3.1、安装前命令准备

    输入如下命令,完成安装前的环境准备。
    yum install lsof build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim

    3.2、下载 RabbitMQ、erlang、socat 的安装包

    本次下载的是RabbitMQ-3.6.5版本,采用rpm一键安装,适合新手直接上手。
    先创建一个rabbitmq目录,本例的目录路径为/usr/app/rabbitmq,然后在目录下执行如下命令,下载安装包!

  • 下载erlang

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm

  • 下载socat

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

  • 下载rabbitMQ

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
最终目录文件如下:
image.png

3.3、安装软件包

下载完之后,按顺序依次安装软件包,这个很重要哦~

  • 安装erlang

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

  • 安装socat

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

  • 安装rabbitmq

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安装完成之后,修改rabbitmq的配置,默认配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目录下。
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
修改loopback_users节点的值!
image.png
最后只需通过如下命令,启动服务即可!
rabbitmq-server start &
运行脚本之后,如果报错,例如下图!
image.png
解决办法如下:
vim /etc/rabbitmq/rabbitmq-env.conf
在文件里添加一行,如下配置!
NODENAME=rabbit@localhost
然后,再保存!再次以下命令启动服务!
rabbitmq-server start &
通过如下命令,查询服务是否启动成功!
lsof -i:5672
如果出现5672已经被监听,说明已经启动成功!
image.png

3.4、启动可视化的管控台

输入如下命令,启动控制台!
rabbitmq-plugins enable rabbitmq_management
用浏览器打开http://ip:15672,这里的ip就是 CentOS 系统的 ip,结果如下:
image.png
账号、密码,默认为guest,如果出现无法访问,检测防火墙是否开启,如果开启将其关闭即可!
登录之后的监控平台,界面如下:
image.png

四、邮箱授权码的获取

获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:
点击【开启】按钮,然后发送短信,即可获取授权码,该授权码就是配置文件spring.mail.password需要的密码!
image.png

五、项目介绍

  • springboot版本:2.1.5.RELEASE
  • RabbitMQ版本:3.6.5
  • SendMailUtil:发送邮件工具类
  • ProduceServiceImpl:生产者,发送消息
  • ConsumerMailService:消费者,消费消息,发送邮件

    六、代码实现

    6.1、创建项目

    在 IDEA 下创建一个名称为smail的 Springboot 项目,pom文件中加入amqp和mail。
    1. <dependencies>
    2. <!--spring boot核心-->
    3. <dependency>
    4. <groupId>org.springframework.boot</groupId>
    5. <artifactId>spring-boot-starter</artifactId>
    6. </dependency>
    7. <!--spring boot 测试-->
    8. <dependency>
    9. <groupId>org.springframework.boot</groupId>
    10. <artifactId>spring-boot-starter-test</artifactId>
    11. <scope>test</scope>
    12. </dependency>
    13. <!--springmvc web-->
    14. <dependency>
    15. <groupId>org.springframework.boot</groupId>
    16. <artifactId>spring-boot-starter-web</artifactId>
    17. </dependency>
    18. <!--开发环境调试-->
    19. <dependency>
    20. <groupId>org.springframework.boot</groupId>
    21. <artifactId>spring-boot-devtools</artifactId>
    22. <optional>true</optional>
    23. </dependency>
    24. <!--mail 支持-->
    25. <dependency>
    26. <groupId>org.springframework.boot</groupId>
    27. <artifactId>spring-boot-starter-mail</artifactId>
    28. </dependency>
    29. <!--amqp 支持-->
    30. <dependency>
    31. <groupId>org.springframework.boot</groupId>
    32. <artifactId>spring-boot-starter-amqp</artifactId>
    33. </dependency>
    34. <!-- commons-lang3 -->
    35. <dependency>
    36. <groupId>org.apache.commons</groupId>
    37. <artifactId>commons-lang3</artifactId>
    38. <version>3.4</version>
    39. </dependency>
    40. <!--lombok-->
    41. <dependency>
    42. <groupId>org.projectlombok</groupId>
    43. <artifactId>lombok</artifactId>
    44. <version>1.16.10</version>
    45. </dependency>
    46. </dependencies>

    6.2、配置rabbitMQ、mail

    在application.properties文件中,配置amqp和mail! ```java

    rabbitmq

    spring.rabbitmq.host=192.168.0.103 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest

    开启confirms回调 P -> Exchange

    spring.rabbitmq.publisher-confirms=true

    开启returnedMessage回调 Exchange -> Queue

    spring.rabbitmq.publisher-returns=true

    设置手动确认(ack) Queue -> C

    spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100

mail

spring.mail.default-encoding=UTF-8 spring.mail.host=smtp.qq.com spring.mail.username=1370887518@qq.com spring.mail.password=获取的邮箱授权码 spring.mail.from=1370887518@qq.com spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true

  1. 其中,spring.mail.password第四步中获取的授权码,同时usernamefrom要一致!
  2. <a name="ilzzJ"></a>
  3. ### **6.3、RabbitConfig配置类**
  4. ```java
  5. @Configuration
  6. @Slf4j
  7. public class RabbitConfig {
  8. // 发送邮件
  9. public static final String MAIL_QUEUE_NAME = "mail.queue";
  10. public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
  11. public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
  12. @Autowired
  13. private CachingConnectionFactory connectionFactory;
  14. @Bean
  15. public RabbitTemplate rabbitTemplate() {
  16. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  17. rabbitTemplate.setMessageConverter(converter());
  18. // 消息是否成功发送到Exchange
  19. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  20. if (ack) {
  21. log.info("消息成功发送到Exchange");
  22. } else {
  23. log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
  24. }
  25. });
  26. // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
  27. rabbitTemplate.setMandatory(true);
  28. // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
  29. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  30. log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
  31. });
  32. return rabbitTemplate;
  33. }
  34. @Bean
  35. public Jackson2JsonMessageConverter converter() {
  36. return new Jackson2JsonMessageConverter();
  37. }
  38. @Bean
  39. public Queue mailQueue() {
  40. return new Queue(MAIL_QUEUE_NAME, true);
  41. }
  42. @Bean
  43. public DirectExchange mailExchange() {
  44. return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
  45. }
  46. @Bean
  47. public Binding mailBinding() {
  48. return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
  49. }
  50. }

6.4、Mail 邮件实体类

  1. @Getter
  2. @Setter
  3. @NoArgsConstructor
  4. @AllArgsConstructor
  5. public class Mail {
  6. @Pattern(regexp = "^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$", message = "邮箱格式不正确")
  7. private String to;
  8. @NotBlank(message = "标题不能为空")
  9. private String title;
  10. @NotBlank(message = "正文不能为空")
  11. private String content;
  12. private String msgId;// 消息id
  13. }

6.5、SendMailUtil邮件发送类

  1. @Component
  2. @Slf4j
  3. public class SendMailUtil {
  4. @Value("${spring.mail.from}")
  5. private String from;
  6. @Autowired
  7. private JavaMailSender mailSender;
  8. /**
  9. * 发送简单邮件
  10. *
  11. * @param mail
  12. */
  13. public boolean send(Mail mail) {
  14. String to = mail.getTo();// 目标邮箱
  15. String title = mail.getTitle();// 邮件标题
  16. String content = mail.getContent();// 邮件正文
  17. SimpleMailMessage message = new SimpleMailMessage();
  18. message.setFrom(from);
  19. message.setTo(to);
  20. message.setSubject(title);
  21. message.setText(content);
  22. try {
  23. mailSender.send(message);
  24. log.info("邮件发送成功");
  25. return true;
  26. } catch (MailException e) {
  27. log.error("邮件发送失败, to: {}, title: {}", to, title, e);
  28. return false;
  29. }
  30. }
  31. }

6.6、ProduceServiceImpl 生产者类

  1. @Service
  2. public class ProduceServiceImpl implements ProduceService {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Override
  6. public boolean send(Mail mail) {
  7. //创建uuid
  8. String msgId = UUID.randomUUID().toString().replaceAll("-", "");
  9. mail.setMsgId(msgId);
  10. //发送消息到rabbitMQ
  11. CorrelationData correlationData = new CorrelationData(msgId);
  12. rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);
  13. return true;
  14. }
  15. }

6.7、ConsumerMailService 消费者类

  1. @Component
  2. @Slf4j
  3. public class ConsumerMailService {
  4. @Autowired
  5. private SendMailUtil sendMailUtil;
  6. @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
  7. public void consume(Message message, Channel channel) throws IOException {
  8. //将消息转化为对象
  9. String str = new String(message.getBody());
  10. Mail mail = JsonUtil.strToObj(str, Mail.class);
  11. log.info("收到消息: {}", mail.toString());
  12. MessageProperties properties = message.getMessageProperties();
  13. long tag = properties.getDeliveryTag();
  14. boolean success = sendMailUtil.send(mail);
  15. if (success) {
  16. channel.basicAck(tag, false);// 消费确认
  17. } else {
  18. channel.basicNack(tag, false, true);
  19. }
  20. }
  21. }

6.8、TestController 控制层类

  1. @RestController
  2. @RequestMapping("/test")
  3. @Slf4j
  4. public class TestController {
  5. @Autowired
  6. private ProduceService testService;
  7. @PostMapping("send")
  8. public boolean sendMail(Mail mail) {
  9. return testService.send(mail);
  10. }
  11. }

七、测试服务

启动 SpringBoot 服务之后,用 postman 模拟请求接口。
image.png
查看控制台信息。
image.png
查询接受者邮件信息。
image.png
邮件发送成功!

八、消息发送失败处理

虽然,上面案例可以成功的实现消息的发送,但是上面的流程很脆弱,例如: rabbitMQ 突然蹦了、邮件发送失败了、重启 rabbitMQ 服务器出现消息重复消费,应该怎处理呢?
很显然,我们需要对原有的逻辑进行升级改造,因此我们需要引入数据库来记录消息的发送情况。

8.1、创建消息投递日志表

  1. CREATE TABLE `msg_log` (
  2. `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  3. `msg` text COMMENT '消息体, json格式化',
  4. `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
  5. `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
  6. `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  7. `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  8. `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  9. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  10. `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  11. PRIMARY KEY (`msg_id`),
  12. UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
  13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

8.2、编写 MsgLog 相关服务类

  1. public interface MsgLogService {
  2. /**
  3. * 插入消息日志
  4. * @param msgLog
  5. */
  6. void insert(MsgLog msgLog);
  7. /**
  8. * 更新消息状态
  9. * @param msgId
  10. * @param status
  11. */
  12. void updateStatus(String msgId, Integer status);
  13. /**
  14. * 查询消息
  15. * @param msgId
  16. * @return
  17. */
  18. MsgLog selectByMsgId(String msgId);
  19. }

8.3、改写服务逻辑

在生产服务类中,新增数据写入。
image.png
同时,在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑。
image.png
改造消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据!
image.png
这样即可保证,如果 rabbitMQ 服务器,即使重启之后重新推送消息,通过数据库判断,也不会重复消费进而发生业务异常!

8.4、利用定数任务对消息投递失败进行补偿

当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递。
image.png
利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!

九、总结

本文主要是通过发送邮件这个业务案例,来讲解 Springboot 与 rabbitMQ 技术的整合和使用!
当然解决这个业务需求的技术方案还有很多,例如 Springboot 与 rocketMQ 也可以实现这个需求,这个会在后期的文章讲解!