用路由的交换机 通过routingKey 将消息分发到对应的队列
每一条消息 分别 放入一个队列使用完成后删除队列
定义死信队列 死信交换机 普通交换机
package com.matrix.queue.rabbitmq.config;import com.matrix.queue.rabbitmq.constant.QueuePassConstant;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/*** @ClassName: QueueConfiguration* @Package: com.matrix.common.config* @description: 消息队列-演示队列 配置类* @author: swordmeng* @date: 2021.11.10* @version v1.0*/@Configurationpublic class QueuePassConfiguration {//====================1.声明正常队列====================/*** 排队 信道配置* @return*/@Beanpublic DirectExchange goodsExchange() {return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE, true, false);}/*** 排队 队列* @return:*/@Beanpublic Queue goodsQueue() {Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_PASS_NAME,true,false,false,queueArgs);return queue;}/*** 排队 队列绑定* @return*/@Beanpublic Binding goodsBinding() {return BindingBuilder.bind(goodsQueue()).to(goodsExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);}普通队列和 绑定其实在 这里是用不到的//====================2.声明死信队列====================/*** 死信 信道配置* @return*/@Beanpublic DirectExchange queueDlxExchange() {return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE, true, false);}/*** 排队 死信队列* @return*/@Beanpublic Queue queueDlxQueue() {Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME,true,false,false);return queue;}/*** 绑定 死信队列* @author swordmeng* @Datetime 2021/11/12 9:40* @return:*/@Beanpublic Binding queueDlxBinding() {return BindingBuilder.bind(queueDlxQueue()).to(queueDlxExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);}}
nacos配置
spring:rabbitmq:host: 119.3.13.80port: 5672username: adminpassword: matrix2019virtual-host: /alarmpublisher-confirms: true #是否开启confirms机制publisher-returns: true #是否开启returns机制connection-timeout: 10000 #过期时间listener: # 消费者监听simple:concurrency: 5max-concurrency: 10acknowledge-mode: manual # ACK 手动签收prefetch: 2
rabbitmq配置类引入
package com.matrix.queue.rabbitmq.entity;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Data@Component@ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitMQProperties {/*** rabbit 地址*/private String host;/*** rabbit 端口*/private String port;/*** rabbit 用户名*/private String username;/*** rabbit password*/private String password;/*** rabbit virtualHost*/private String virtualHost;/*** rabbit publisher-confirms*/private String publisherConfirms;/*** rabbit publisher-returns*/private String publisherReturns;/*** rabbit connection-timeout*/private String connectionTimeout;}
生产者
package com.matrix.queue.rabbitmq.service.queuepass.service.impl;import com.matrix.queue.rabbitmq.constant.QueuePassConstant;import com.matrix.queue.rabbitmq.entity.QueuePassMessage;import com.matrix.queue.rabbitmq.entity.RabbitMQProperties;import com.matrix.queue.rabbitmq.service.comm.CommSendService;import com.matrix.queue.rabbitmq.service.queuepass.service.IQueuePassService;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springblade.core.tool.utils.Func;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE;import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY;import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_SERVICE_NAME;/*** ClassName: GoodsOnlineServiceImpl* Package: com.matrix.rabbitmq.service.impl* Description: 消息队列实现类* @Datetime: 2021/11/9 15:59* @Author: swordmeng8@163.com*/@Slf4j@Service(ANTIEPIDEMIC_QUEUE_SERVICE_NAME)@RequiredArgsConstructor(onConstructor = @__(@Autowired))public class QueuePassServiceImpl implements IQueuePassService {/** 通用发送方法 */private final CommSendService commSendService;//注入rabbitmq的配置private final RabbitMQProperties rabbitMQConfig;/*** 发送消息* @param reservationQueueId* @param times* @return*/@Overridepublic void send(Long reservationQueueId, long times, String callTime) {//临时队列 尝试解决 mq阻塞问题 后续可能用到try {QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);queueArgs.put("x-dead-letter-routing-key", ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);//x-dead-letter-routing-key//获取connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//配置链接属性connectionFactory.setVirtualHost(rabbitMQConfig.getVirtualHost());connectionFactory.setUsername(rabbitMQConfig.getUsername());connectionFactory.setPassword(rabbitMQConfig.getPassword());connectionFactory.setHost(rabbitMQConfig.getHost());Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//获取随机 队列名String queueName = Func.randomUUID();shopMessage.setQueueName(queueName);//生成队列channel.queueDeclare(queueName, true, false, false, queueArgs);//获取随机routingKeyString routingKey = Func.randomUUID();//绑定队列channel.queueBind(queueName,QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);commSendService.commSend(shopMessage,times,ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);channel.close();connection.close();log.info("排队过号时间修改增加消息队列:商品id:{},失效时间Long值:{},叫号时间LocalDateTime值:{},推送结果:{}",reservationQueueId,times,callTime,true);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}//最初的死信队列 但是 不同时时间有消息阻塞问题QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);}}
给每条消息分别定时
package com.matrix.queue.rabbitmq.service.comm;import com.alibaba.fastjson.JSON;import lombok.AllArgsConstructor;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Service;/*** @className: CommSendService* @package: com.matrix.rabbitmq.service* @description: 消息队列通用发送类* @datetime: 2021/11/30 18:50* @author: swordmeng8@163.com*/@Service@AllArgsConstructorpublic class CommSendService {/** 使用RabbitTemplate,这提供了接收/发送等等方法 */private final RabbitTemplate rabbitTemplate;public void commSend(Object obj, long times,String exchange,String exchangeRoutingKey){MessagePostProcessor processor = new MessagePostProcessor(){@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(String.valueOf(times));return message;}};rabbitTemplate.convertAndSend(exchange,exchangeRoutingKey,JSON.toJSONString(obj), processor);}}
消费者
package com.matrix.queue.rabbitmq.listener;import com.alibaba.fastjson.JSONObject;import com.matrix.queue.rabbitmq.entity.QueuePassMessage;import com.matrix.queue.rabbitmq.service.comm.BasicAckService;import com.matrix.queue.reservationQueue.service.IReservationQueueService;import com.rabbitmq.client.Channel;import lombok.AllArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME;/*** ClassName: MqMessageController* Package: com.matrix.rabbitmq.listener* Description: 处理消息监听逻辑* @Datetime: 2021/11/10 11:12* @Author: swordmeng8@163.com*/@Slf4j@Component@AllArgsConstructorpublic class QueuePassListener {/**排队信息*/private final IReservationQueueService reservationQueueService;/** 签收服务 */private final BasicAckService basicAckService;/*** 描述:处理排队过号状态* 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息* @param message* @param channel*/@RabbitHandler@RabbitListener(queues = ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME)public void handleQueuePassStatus(Message message, Channel channel) {QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);Long reservationQueueId = queuePassMessage.getReservationQueueId();log.info("排队叫号消费消息,排队叫号消息信息:{}",queuePassMessage);// 校验排队加号时间boolean checkResult = reservationQueueService.checkQueuePass(reservationQueueId);if (checkResult) {// 更新排队过号状态和过号时间//临时修改 状态码 9为3boolean result = reservationQueueService.updateQueuePassStatus(9,reservationQueueId).isSuccess();// 添加商品操作日志if (!result) {log.info(reservationQueueId+"排队自动过号失败");} else {log.info(reservationQueueId+"排队自动过号成功");}}// 签收basicAckService.basicAck(message,channel);}}
签收
package com.matrix.queue.rabbitmq.service.comm;import com.alibaba.fastjson.JSONObject;import com.matrix.queue.rabbitmq.entity.QueuePassMessage;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.stereotype.Service;import java.io.IOException;/*** @className: BasicAckService* @package: com.matrix.rabbitmq.service.comm* @description: BasicAck* @datetime: 2021/11/30 18:57* @author: swordmeng8@163.com*/@Slf4j@Servicepublic class BasicAckService {/*** 手动签收* @param message* @param channel*/public void basicAck(Message message, Channel channel){try {//手工签收Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);log.info("接受deliveryTag:{}",deliveryTag);channel.basicAck(deliveryTag, false);删除 对应的队列 这是 临时队列代码尝试解决 mq 阻塞问题QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);String queueName = queuePassMessage.getQueueName();channel.queueDelete(queueName);} catch (IOException e) {e.printStackTrace();log.info("手动签收失败:Cause:{},Message:{}",e.getCause(),e.getMessage());}}}
