用路由的交换机 通过routingKey 将消息分发到对应的队列
每一条消息 分别 放入一个队列使用完成后删除队列

定义死信队列 死信交换机 普通交换机

  1. package com.matrix.queue.rabbitmq.config;
  2. import com.matrix.queue.rabbitmq.constant.QueuePassConstant;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.DirectExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @ClassName: QueueConfiguration
  13. * @Package: com.matrix.common.config
  14. * @description: 消息队列-演示队列 配置类
  15. * @author: swordmeng
  16. * @date: 2021.11.10
  17. * @version v1.0
  18. */
  19. @Configuration
  20. public class QueuePassConfiguration {
  21. //====================1.声明正常队列====================
  22. /**
  23. * 排队 信道配置
  24. * @return
  25. */
  26. @Bean
  27. public DirectExchange goodsExchange() {
  28. return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE, true, false);
  29. }
  30. /**
  31. * 排队 队列
  32. * @return:
  33. */
  34. @Bean
  35. public Queue goodsQueue() {
  36. Map<String, Object> queueArgs = new HashMap<>();
  37. queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);
  38. Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_PASS_NAME,true,false,false,queueArgs);
  39. return queue;
  40. }
  41. /**
  42. * 排队 队列绑定
  43. * @return
  44. */
  45. @Bean
  46. public Binding goodsBinding() {
  47. return BindingBuilder.bind(goodsQueue()).to(goodsExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);
  48. }
  49. 普通队列和 绑定其实在 这里是用不到的
  50. //====================2.声明死信队列====================
  51. /**
  52. * 死信 信道配置
  53. * @return
  54. */
  55. @Bean
  56. public DirectExchange queueDlxExchange() {
  57. return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE, true, false);
  58. }
  59. /**
  60. * 排队 死信队列
  61. * @return
  62. */
  63. @Bean
  64. public Queue queueDlxQueue() {
  65. Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME,true,false,false);
  66. return queue;
  67. }
  68. /**
  69. * 绑定 死信队列
  70. * @author swordmeng
  71. * @Datetime 2021/11/12 9:40
  72. * @return:
  73. */
  74. @Bean
  75. public Binding queueDlxBinding() {
  76. return BindingBuilder.bind(queueDlxQueue()).to(queueDlxExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);
  77. }
  78. }

nacos配置

  1. spring:
  2. rabbitmq:
  3. host: 119.3.13.80
  4. port: 5672
  5. username: admin
  6. password: matrix2019
  7. virtual-host: /alarm
  8. publisher-confirms: true #是否开启confirms机制
  9. publisher-returns: true #是否开启returns机制
  10. connection-timeout: 10000 #过期时间
  11. listener: # 消费者监听
  12. simple:
  13. concurrency: 5
  14. max-concurrency: 10
  15. acknowledge-mode: manual # ACK 手动签收
  16. prefetch: 2

rabbitmq配置类引入

  1. package com.matrix.queue.rabbitmq.entity;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.stereotype.Component;
  5. @Data
  6. @Component
  7. @ConfigurationProperties(prefix = "spring.rabbitmq")
  8. public class RabbitMQProperties {
  9. /**
  10. * rabbit 地址
  11. */
  12. private String host;
  13. /**
  14. * rabbit 端口
  15. */
  16. private String port;
  17. /**
  18. * rabbit 用户名
  19. */
  20. private String username;
  21. /**
  22. * rabbit password
  23. */
  24. private String password;
  25. /**
  26. * rabbit virtualHost
  27. */
  28. private String virtualHost;
  29. /**
  30. * rabbit publisher-confirms
  31. */
  32. private String publisherConfirms;
  33. /**
  34. * rabbit publisher-returns
  35. */
  36. private String publisherReturns;
  37. /**
  38. * rabbit connection-timeout
  39. */
  40. private String connectionTimeout;
  41. }

生产者

  1. package com.matrix.queue.rabbitmq.service.queuepass.service.impl;
  2. import com.matrix.queue.rabbitmq.constant.QueuePassConstant;
  3. import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
  4. import com.matrix.queue.rabbitmq.entity.RabbitMQProperties;
  5. import com.matrix.queue.rabbitmq.service.comm.CommSendService;
  6. import com.matrix.queue.rabbitmq.service.queuepass.service.IQueuePassService;
  7. import com.rabbitmq.client.Channel;
  8. import com.rabbitmq.client.Connection;
  9. import com.rabbitmq.client.ConnectionFactory;
  10. import lombok.RequiredArgsConstructor;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springblade.core.tool.utils.Func;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Service;
  15. import java.io.IOException;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.concurrent.TimeoutException;
  19. import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE;
  20. import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY;
  21. import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_SERVICE_NAME;
  22. /**
  23. * ClassName: GoodsOnlineServiceImpl
  24. * Package: com.matrix.rabbitmq.service.impl
  25. * Description: 消息队列实现类
  26. * @Datetime: 2021/11/9 15:59
  27. * @Author: swordmeng8@163.com
  28. */
  29. @Slf4j
  30. @Service(ANTIEPIDEMIC_QUEUE_SERVICE_NAME)
  31. @RequiredArgsConstructor(onConstructor = @__(@Autowired))
  32. public class QueuePassServiceImpl implements IQueuePassService {
  33. /** 通用发送方法 */
  34. private final CommSendService commSendService;
  35. //注入rabbitmq的配置
  36. private final RabbitMQProperties rabbitMQConfig;
  37. /**
  38. * 发送消息
  39. * @param reservationQueueId
  40. * @param times
  41. * @return
  42. */
  43. @Override
  44. public void send(Long reservationQueueId, long times, String callTime) {
  45. //临时队列 尝试解决 mq阻塞问题 后续可能用到
  46. try {
  47. QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);
  48. Map<String, Object> queueArgs = new HashMap<>();
  49. queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);
  50. queueArgs.put("x-dead-letter-routing-key", ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);//x-dead-letter-routing-key
  51. //获取connectionFactory
  52. ConnectionFactory connectionFactory = new ConnectionFactory();
  53. //配置链接属性
  54. connectionFactory.setVirtualHost(rabbitMQConfig.getVirtualHost());
  55. connectionFactory.setUsername(rabbitMQConfig.getUsername());
  56. connectionFactory.setPassword(rabbitMQConfig.getPassword());
  57. connectionFactory.setHost(rabbitMQConfig.getHost());
  58. Connection connection = connectionFactory.newConnection();
  59. Channel channel = connection.createChannel();
  60. //获取随机 队列名
  61. String queueName = Func.randomUUID();
  62. shopMessage.setQueueName(queueName);
  63. //生成队列
  64. channel.queueDeclare(queueName, true, false, false, queueArgs);
  65. //获取随机routingKey
  66. String routingKey = Func.randomUUID();
  67. //绑定队列
  68. channel.queueBind(queueName,QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);
  69. commSendService.commSend(shopMessage,times,ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);
  70. channel.close();
  71. connection.close();
  72. log.info("排队过号时间修改增加消息队列:商品id:{},失效时间Long值:{},叫号时间LocalDateTime值:{},推送结果:{}",reservationQueueId,times,callTime,true);
  73. } catch (IOException e) {
  74. e.printStackTrace();
  75. } catch (TimeoutException e) {
  76. e.printStackTrace();
  77. }
  78. //最初的死信队列 但是 不同时时间有消息阻塞问题
  79. QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);
  80. }
  81. }

给每条消息分别定时

  1. package com.matrix.queue.rabbitmq.service.comm;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.AllArgsConstructor;
  4. import org.springframework.amqp.AmqpException;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessagePostProcessor;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.stereotype.Service;
  9. /**
  10. * @className: CommSendService
  11. * @package: com.matrix.rabbitmq.service
  12. * @description: 消息队列通用发送类
  13. * @datetime: 2021/11/30 18:50
  14. * @author: swordmeng8@163.com
  15. */
  16. @Service
  17. @AllArgsConstructor
  18. public class CommSendService {
  19. /** 使用RabbitTemplate,这提供了接收/发送等等方法 */
  20. private final RabbitTemplate rabbitTemplate;
  21. public void commSend(Object obj, long times,String exchange,String exchangeRoutingKey){
  22. MessagePostProcessor processor = new MessagePostProcessor(){
  23. @Override
  24. public Message postProcessMessage(Message message) throws AmqpException {
  25. message.getMessageProperties().setExpiration(String.valueOf(times));
  26. return message;
  27. }
  28. };
  29. rabbitTemplate.convertAndSend(
  30. exchange,
  31. exchangeRoutingKey,
  32. JSON.toJSONString(obj), processor);
  33. }
  34. }

消费者

  1. package com.matrix.queue.rabbitmq.listener;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
  4. import com.matrix.queue.rabbitmq.service.comm.BasicAckService;
  5. import com.matrix.queue.reservationQueue.service.IReservationQueueService;
  6. import com.rabbitmq.client.Channel;
  7. import lombok.AllArgsConstructor;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  10. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  11. import org.springframework.messaging.Message;
  12. import org.springframework.stereotype.Component;
  13. import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME;
  14. /**
  15. * ClassName: MqMessageController
  16. * Package: com.matrix.rabbitmq.listener
  17. * Description: 处理消息监听逻辑
  18. * @Datetime: 2021/11/10 11:12
  19. * @Author: swordmeng8@163.com
  20. */
  21. @Slf4j
  22. @Component
  23. @AllArgsConstructor
  24. public class QueuePassListener {
  25. /**排队信息*/
  26. private final IReservationQueueService reservationQueueService;
  27. /** 签收服务 */
  28. private final BasicAckService basicAckService;
  29. /**
  30. * 描述:处理排队过号状态
  31. * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
  32. * @param message
  33. * @param channel
  34. */
  35. @RabbitHandler
  36. @RabbitListener(queues = ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME)
  37. public void handleQueuePassStatus(Message message, Channel channel) {
  38. QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);
  39. Long reservationQueueId = queuePassMessage.getReservationQueueId();
  40. log.info("排队叫号消费消息,排队叫号消息信息:{}",queuePassMessage);
  41. // 校验排队加号时间
  42. boolean checkResult = reservationQueueService.checkQueuePass(reservationQueueId);
  43. if (checkResult) {
  44. // 更新排队过号状态和过号时间
  45. //临时修改 状态码 9为3
  46. boolean result = reservationQueueService.updateQueuePassStatus(9,reservationQueueId).isSuccess();
  47. // 添加商品操作日志
  48. if (!result) {
  49. log.info(reservationQueueId+"排队自动过号失败");
  50. } else {
  51. log.info(reservationQueueId+"排队自动过号成功");
  52. }
  53. }
  54. // 签收
  55. basicAckService.basicAck(message,channel);
  56. }
  57. }

签收

  1. package com.matrix.queue.rabbitmq.service.comm;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
  4. import com.rabbitmq.client.Channel;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.support.AmqpHeaders;
  7. import org.springframework.messaging.Message;
  8. import org.springframework.messaging.MessageHeaders;
  9. import org.springframework.stereotype.Service;
  10. import java.io.IOException;
  11. /**
  12. * @className: BasicAckService
  13. * @package: com.matrix.rabbitmq.service.comm
  14. * @description: BasicAck
  15. * @datetime: 2021/11/30 18:57
  16. * @author: swordmeng8@163.com
  17. */
  18. @Slf4j
  19. @Service
  20. public class BasicAckService {
  21. /**
  22. * 手动签收
  23. * @param message
  24. * @param channel
  25. */
  26. public void basicAck(Message message, Channel channel){
  27. try {
  28. //手工签收
  29. Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  30. log.info("接受deliveryTag:{}",deliveryTag);
  31. channel.basicAck(deliveryTag, false);
  32. 删除 对应的队列 这是 临时队列代码尝试解决 mq 阻塞问题
  33. QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);
  34. String queueName = queuePassMessage.getQueueName();
  35. channel.queueDelete(queueName);
  36. } catch (
  37. IOException e) {
  38. e.printStackTrace();
  39. log.info("手动签收失败:Cause:{},Message:{}",e.getCause(),e.getMessage());
  40. }
  41. }
  42. }