SpringBoot RabbitMQ 事务补偿

项目整合流程

创建一个 Maven工程,引入 amqp 包

  1. <!--amqp 支持-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

在全局文件中配置 rabbitMQ 服务信息

其中,spring.rabbitmq.addresses参数值为 rabbitmq 服务器地址

  1. spring.rabbitmq.addresses=197.168.24.206:5672
  2. spring.rabbitmq.username=guest
  3. spring.rabbitmq.password=guest
  4. spring.rabbitmq.virtual-host=/

Rabbitmq 配置类

  1. @Slf4j
  2. @Configuration
  3. public class RabbitConfig {
  4. /**
  5. * 初始化连接工厂
  6. * @param addresses
  7. * @param userName
  8. * @param password
  9. * @param vhost
  10. * @return
  11. */
  12. @Bean
  13. ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,
  14. @Value("${spring.rabbitmq.username}") String userName,
  15. @Value("${spring.rabbitmq.password}") String password,
  16. @Value("${spring.rabbitmq.virtual-host}") String vhost) {
  17. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  18. connectionFactory.setAddresses(addresses);
  19. connectionFactory.setUsername(userName);
  20. connectionFactory.setPassword(password);
  21. connectionFactory.setVirtualHost(vhost);
  22. return connectionFactory;
  23. }
  24. /**
  25. * 重新实例化 RabbitAdmin 操作类
  26. * @param connectionFactory
  27. * @return
  28. */
  29. @Bean
  30. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
  31. return new RabbitAdmin(connectionFactory);
  32. }
  33. /**
  34. * 重新实例化 RabbitTemplate 操作类
  35. * @param connectionFactory
  36. * @return
  37. */
  38. @Bean
  39. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
  40. RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
  41. //数据转换为json存入消息队列
  42. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  43. return rabbitTemplate;
  44. }
  45. /**
  46. * 将 RabbitUtil 操作工具类加入IOC容器
  47. * @return
  48. */
  49. @Bean
  50. public RabbitUtil rabbitUtil(){
  51. return new RabbitUtil();
  52. }
  53. }

RabbitUtil 工具类

  1. public class RabbitUtil {
  2. private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);
  3. @Autowired
  4. private RabbitAdmin rabbitAdmin;
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. /**
  8. * 创建Exchange
  9. * @param exchangeName
  10. */
  11. public void addExchange(String exchangeType, String exchangeName){
  12. Exchange exchange = createExchange(exchangeType, exchangeName);
  13. rabbitAdmin.declareExchange(exchange);
  14. }
  15. /**
  16. * 删除一个Exchange
  17. * @param exchangeName
  18. */
  19. public boolean deleteExchange(String exchangeName){
  20. return rabbitAdmin.deleteExchange(exchangeName);
  21. }
  22. /**
  23. * 创建一个指定的Queue
  24. * @param queueName
  25. * @return queueName
  26. */
  27. public void addQueue(String queueName){
  28. Queue queue = createQueue(queueName);
  29. rabbitAdmin.declareQueue(queue);
  30. }
  31. /**
  32. * 删除一个queue
  33. * @return queueName
  34. * @param queueName
  35. */
  36. public boolean deleteQueue(String queueName){
  37. return rabbitAdmin.deleteQueue(queueName);
  38. }
  39. /**
  40. * 按照筛选条件,删除队列
  41. * @param queueName
  42. * @param unused 是否被使用
  43. * @param empty 内容是否为空
  44. */
  45. public void deleteQueue(String queueName, boolean unused, boolean empty){
  46. rabbitAdmin.deleteQueue(queueName,unused,empty);
  47. }
  48. /**
  49. * 清空某个队列中的消息,注意,清空的消息并没有被消费
  50. * @return queueName
  51. * @param queueName
  52. */
  53. public void purgeQueue(String queueName){
  54. rabbitAdmin.purgeQueue(queueName, false);
  55. }
  56. /**
  57. * 判断指定的队列是否存在
  58. * @param queueName
  59. * @return
  60. */
  61. public boolean existQueue(String queueName){
  62. return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;
  63. }
  64. /**
  65. * 绑定一个队列到一个匹配型交换器使用一个routingKey
  66. * @param exchangeType
  67. * @param exchangeName
  68. * @param queueName
  69. * @param routingKey
  70. * @param isWhereAll
  71. * @param headers EADERS模式类型设置,其他模式类型传空
  72. */
  73. public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
  74. Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
  75. rabbitAdmin.declareBinding(binding);
  76. }
  77. /**
  78. * 声明绑定
  79. * @param binding
  80. */
  81. public void addBinding(Binding binding){
  82. rabbitAdmin.declareBinding(binding);
  83. }
  84. /**
  85. * 解除交换器与队列的绑定
  86. * @param exchangeType
  87. * @param exchangeName
  88. * @param queueName
  89. * @param routingKey
  90. * @param isWhereAll
  91. * @param headers
  92. */
  93. public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
  94. Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
  95. removeBinding(binding);
  96. }
  97. /**
  98. * 解除交换器与队列的绑定
  99. * @param binding
  100. */
  101. public void removeBinding(Binding binding){
  102. rabbitAdmin.removeBinding(binding);
  103. }
  104. /**
  105. * 创建一个交换器、队列,并绑定队列
  106. * @param exchangeType
  107. * @param exchangeName
  108. * @param queueName
  109. * @param routingKey
  110. * @param isWhereAll
  111. * @param headers
  112. */
  113. public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
  114. //声明交换器
  115. addExchange(exchangeType, exchangeName);
  116. //声明队列
  117. addQueue(queueName);
  118. //声明绑定关系
  119. addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);
  120. }
  121. /**
  122. * 发送消息
  123. * @param exchange
  124. * @param routingKey
  125. * @param object
  126. */
  127. public void convertAndSend(String exchange, String routingKey, final Object object){
  128. rabbitTemplate.convertAndSend(exchange, routingKey, object);
  129. }
  130. /**
  131. * 转换Message对象
  132. * @param messageType
  133. * @param msg
  134. * @return
  135. */
  136. public Message getMessage(String messageType, Object msg){
  137. MessageProperties messageProperties = new MessageProperties();
  138. messageProperties.setContentType(messageType);
  139. Message message = new Message(msg.toString().getBytes(),messageProperties);
  140. return message;
  141. }
  142. /**
  143. * 声明交换机
  144. * @param exchangeType
  145. * @param exchangeName
  146. * @return
  147. */
  148. private Exchange createExchange(String exchangeType, String exchangeName){
  149. if(ExchangeType.DIRECT.equals(exchangeType)){
  150. return new DirectExchange(exchangeName);
  151. }
  152. if(ExchangeType.TOPIC.equals(exchangeType)){
  153. return new TopicExchange(exchangeName);
  154. }
  155. if(ExchangeType.HEADERS.equals(exchangeType)){
  156. return new HeadersExchange(exchangeName);
  157. }
  158. if(ExchangeType.FANOUT.equals(exchangeType)){
  159. return new FanoutExchange(exchangeName);
  160. }
  161. return null;
  162. }
  163. /**
  164. * 声明绑定关系
  165. * @param exchangeType
  166. * @param exchangeName
  167. * @param queueName
  168. * @param routingKey
  169. * @param isWhereAll
  170. * @param headers
  171. * @return
  172. */
  173. private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){
  174. if(ExchangeType.DIRECT.equals(exchangeType)){
  175. return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);
  176. }
  177. if(ExchangeType.TOPIC.equals(exchangeType)){
  178. return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);
  179. }
  180. if(ExchangeType.HEADERS.equals(exchangeType)){
  181. if(isWhereAll){
  182. return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();
  183. }else{
  184. return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();
  185. }
  186. }
  187. if(ExchangeType.FANOUT.equals(exchangeType)){
  188. return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));
  189. }
  190. return null;
  191. }
  192. /**
  193. * 声明队列
  194. * @param queueName
  195. * @return
  196. */
  197. private Queue createQueue(String queueName){
  198. return new Queue(queueName);
  199. }
  200. /**
  201. * 交换器类型
  202. */
  203. public final static class ExchangeType {
  204. /**
  205. * 直连交换机(全文匹配)
  206. */
  207. public final static String DIRECT = "DIRECT";
  208. /**
  209. * 通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)
  210. */
  211. public final static String TOPIC = "TOPIC";
  212. /**
  213. * 头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)
  214. */
  215. public final static String HEADERS = "HEADERS";
  216. /**
  217. * 扇形(广播)交换机 (将消息转发到所有与该交互机绑定的队列上)
  218. */
  219. public final static String FANOUT = "FANOUT";
  220. }
  221. }

队列监听类(静态)

  1. @Slf4j
  2. @Configuration
  3. public class DirectConsumeListener {
  4. /**
  5. * 监听指定队列,名称:mq.direct.1
  6. * @param message
  7. * @param channel
  8. * @throws IOException
  9. */
  10. @RabbitListener(queues = "mq.direct.1")
  11. public void consume(Message message, Channel channel) throws IOException {
  12. log.info("DirectConsumeListener,收到消息: {}", message.toString());
  13. }
  14. }

如果需要监听指定的队列,只需要方法上加上@RabbitListener(queues = "")即可,同时填写对应的队列名称。

队列监听类(动态)

如果动态监听队列,而不是通过写死在方法上,需要重新实例化一个SimpleMessageListenerContainer对象,这个对象就是监听容器。

  1. @Slf4j
  2. @Configuration
  3. public class DynamicConsumeListener {
  4. /**
  5. * 使用SimpleMessageListenerContainer实现动态监听
  6. * @param connectionFactory
  7. * @return
  8. */
  9. @Bean
  10. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
  11. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  12. container.setMessageListener((MessageListener) message -> {
  13. log.info("ConsumerMessageListen,收到消息: {}", message.toString());
  14. });
  15. return container;
  16. }
  17. }

如果想向SimpleMessageListenerContainer添加监听队列或者移除队列,只需通过如下方式即可操作。

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/consumer")
  4. public class ConsumerController {
  5. @Autowired
  6. private SimpleMessageListenerContainer container;
  7. @Autowired
  8. private RabbitUtil rabbitUtil;
  9. /**
  10. * 添加队列到监听器
  11. * @param consumerInfo
  12. */
  13. @PostMapping("addQueue")
  14. public void addQueue(@RequestBody ConsumerInfo consumerInfo) {
  15. boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());
  16. if(!existQueue){
  17. throw new CommonExecption("当前队列不存在");
  18. }
  19. //消费mq消息的类
  20. container.addQueueNames(consumerInfo.getQueueName());
  21. //打印监听容器中正在监听到队列
  22. log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
  23. }
  24. /**
  25. * 移除正在监听的队列
  26. * @param consumerInfo
  27. */
  28. @PostMapping("removeQueue")
  29. public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {
  30. //消费mq消息的类
  31. container.removeQueueNames(consumerInfo.getQueueName());
  32. //打印监听容器中正在监听到队列
  33. log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
  34. }
  35. /**
  36. * 查询监听容器中正在监听到队列
  37. */
  38. @PostMapping("queryListenerQueue")
  39. public void queryListenerQueue() {
  40. log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));
  41. }
  42. }

发送消息到交换器

先编写一个请求参数实体类
  1. @Data
  2. public class ProduceInfo implements Serializable {
  3. private static final long serialVersionUID = 1l;
  4. /**
  5. * 交换器名称
  6. */
  7. private String exchangeName;
  8. /**
  9. * 路由键key
  10. */
  11. private String routingKey;
  12. /**
  13. * 消息内容
  14. */
  15. public String msg;
  16. }

编写接口api
  1. @RestController
  2. @RequestMapping("/produce")
  3. public class ProduceController {
  4. @Autowired
  5. private RabbitUtil rabbitUtil;
  6. /**
  7. * 发送消息到交换器
  8. * @param produceInfo
  9. */
  10. @PostMapping("sendMessage")
  11. public void sendMessage(@RequestBody ProduceInfo produceInfo) {
  12. rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);
  13. }
  14. }

当然,也可以直接使用rabbitTemplate操作类,来实现发送消息。

  1. rabbitTemplate.convertAndSend(exchange, routingKey, message);

参数内容解释

  • exchange:表示交换器名称
  • routingKey:表示路由键key
  • message:表示消息

    交换器、队列维护操作

    如果想通过接口对 rabbitMQ 中的交换器、队列以及绑定关系进行维护,通过如下方式接口操作,即可实现!

  • 先写一个请求参数实体类

    1. @Data
    2. public class QueueConfig implements Serializable{
    3. private static final long serialVersionUID = 1l;
    4. /**
    5. * 交换器类型
    6. */
    7. private String exchangeType;
    8. /**
    9. * 交换器名称
    10. */
    11. private String exchangeName;
    12. /**
    13. * 队列名称
    14. */
    15. private String queueName;
    16. /**
    17. * 路由键key
    18. */
    19. private String routingKey;
    20. }
  • 编写接口api ```java /**

    • rabbitMQ管理操作控制层 */ @RestController @RequestMapping(“/config”) public class RabbitController {
  1. @Autowired
  2. private RabbitUtil rabbitUtil;
  3. /**
  4. * 创建交换器
  5. * @param config
  6. */
  7. @PostMapping("addExchange")
  8. public void addExchange(@RequestBody QueueConfig config) {
  9. rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());
  10. }
  11. /**
  12. * 删除交换器
  13. * @param config
  14. */
  15. @PostMapping("deleteExchange")
  16. public void deleteExchange(@RequestBody QueueConfig config) {
  17. rabbitUtil.deleteExchange(config.getExchangeName());
  18. }
  19. /**
  20. * 添加队列
  21. * @param config
  22. */
  23. @PostMapping("addQueue")
  24. public void addQueue(@RequestBody QueueConfig config) {
  25. rabbitUtil.addQueue(config.getQueueName());
  26. }
  27. /**
  28. * 删除队列
  29. * @param config
  30. */
  31. @PostMapping("deleteQueue")
  32. public void deleteQueue(@RequestBody QueueConfig config) {
  33. rabbitUtil.deleteQueue(config.getQueueName());
  34. }
  35. /**
  36. * 清空队列数据
  37. * @param config
  38. */
  39. @PostMapping("purgeQueue")
  40. public void purgeQueue(@RequestBody QueueConfig config) {
  41. rabbitUtil.purgeQueue(config.getQueueName());
  42. }
  43. /**
  44. * 添加绑定
  45. * @param config
  46. */
  47. @PostMapping("addBinding")
  48. public void addBinding(@RequestBody QueueConfig config) {
  49. rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
  50. }
  51. /**
  52. * 解除绑定
  53. * @param config
  54. */
  55. @PostMapping("removeBinding")
  56. public void removeBinding(@RequestBody QueueConfig config) {
  57. rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);
  58. }
  59. /**
  60. * 创建头部类型的交换器
  61. * 判断条件是所有的键值对都匹配成功才发送到队列
  62. * @param config
  63. */
  64. @PostMapping("andExchangeBindingQueueOfHeaderAll")
  65. public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {
  66. HashMap<String, Object> header = new HashMap<>();
  67. header.put("queue", "queue");
  68. header.put("bindType", "whereAll");
  69. rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);
  70. }
  71. /**
  72. * 创建头部类型的交换器
  73. * 判断条件是只要有一个键值对匹配成功就发送到队列
  74. * @param config
  75. */
  76. @PostMapping("andExchangeBindingQueueOfHeaderAny")
  77. public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {
  78. HashMap<String, Object> header = new HashMap<>();
  79. header.put("queue", "queue");
  80. header.put("bindType", "whereAny");
  81. rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);
  82. }

}

  1. :::info
  2. RabbitMQ 管理器基本的 crud 全部开发完成
  3. :::
  4. <a name="Oc6U8"></a>
  5. ### 利用 MQ 实现事务补偿
  6. 以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:**支付订单、扣减库存、生成相应单据、发红包、发短信通知等等**。<br />在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取 MQ 的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。<br />这种是利用 MQ 实现业务解耦,其它的场景包括最终一致性、广播、错峰流控等等。<br />利用 MQ 实现业务解耦的过程其实也很简单。<br />当主流程结束之后,将消息推送到发红包、发短信交换器中即可
  7. <a name="YV7Aa"></a>
  8. #### 监听下订单操作
  9. ```java
  10. @Service
  11. public class OrderService {
  12. @Autowired
  13. private RabbitUtil rabbitUtil;
  14. /**
  15. * 创建订单
  16. * @param order
  17. */
  18. @Transactional
  19. public void createOrder(Order order){
  20. //1、创建订单
  21. //2、调用库存接口,减库存
  22. //3、向客户发放红包
  23. rabbitUtil.convertAndSend("exchange.send.bonus", null, order);
  24. //4、发短信通知
  25. rabbitUtil.convertAndSend("exchange.sms.message", null, order);
  26. }
  27. }

监听发红包操作

  1. /**
  2. * 监听发红包
  3. * @param message
  4. * @param channel
  5. * @throws IOException
  6. */
  7. @RabbitListener(queues = "exchange.send.bonus")
  8. public void consume(Message message, Channel channel) throws IOException {
  9. String msgJson = new String(message.getBody(),"UTF-8");
  10. log.info("收到消息: {}", message.toString());
  11. //调用发红包接口
  12. }

监听发短信操作

  1. /**
  2. * 监听发短信
  3. * @param message
  4. * @param channel
  5. * @throws IOException
  6. */
  7. @RabbitListener(queues = "exchange.sms.message")
  8. public void consume(Message message, Channel channel) throws IOException {
  9. String msgJson = new String(message.getBody(),"UTF-8");
  10. log.info("收到消息: {}", message.toString());
  11. //调用发短信接口
  12. }

:::danger 既然 MQ 这么好用,那么是不是完全可以将以前的业务也按照整个模型进行拆分呢?
答案显然不是!
当引入 MQ 之后业务的确是解耦了,但是当 MQ 一旦挂了,所有的服务基本都挂了,是不是很可怕! :::