用路由的交换机 通过routingKey 将消息分发到对应的队列
每一条消息 分别 放入一个队列使用完成后删除队列
定义死信队列 死信交换机 普通交换机
package com.matrix.queue.rabbitmq.config;
import com.matrix.queue.rabbitmq.constant.QueuePassConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName: QueueConfiguration
* @Package: com.matrix.common.config
* @description: 消息队列-演示队列 配置类
* @author: swordmeng
* @date: 2021.11.10
* @version v1.0
*/
@Configuration
public class QueuePassConfiguration {
//====================1.声明正常队列====================
/**
* 排队 信道配置
* @return
*/
@Bean
public DirectExchange goodsExchange() {
return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE, true, false);
}
/**
* 排队 队列
* @return:
*/
@Bean
public Queue goodsQueue() {
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);
Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_PASS_NAME,true,false,false,queueArgs);
return queue;
}
/**
* 排队 队列绑定
* @return
*/
@Bean
public Binding goodsBinding() {
return BindingBuilder.bind(goodsQueue()).to(goodsExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);
}
普通队列和 绑定其实在 这里是用不到的
//====================2.声明死信队列====================
/**
* 死信 信道配置
* @return
*/
@Bean
public DirectExchange queueDlxExchange() {
return new DirectExchange(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE, true, false);
}
/**
* 排队 死信队列
* @return
*/
@Bean
public Queue queueDlxQueue() {
Queue queue = new Queue(QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME,true,false,false);
return queue;
}
/**
* 绑定 死信队列
* @author swordmeng
* @Datetime 2021/11/12 9:40
* @return:
*/
@Bean
public Binding queueDlxBinding() {
return BindingBuilder.bind(queueDlxQueue()).to(queueDlxExchange()).with(QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);
}
}
nacos配置
spring:
rabbitmq:
host: 119.3.13.80
port: 5672
username: admin
password: matrix2019
virtual-host: /alarm
publisher-confirms: true #是否开启confirms机制
publisher-returns: true #是否开启returns机制
connection-timeout: 10000 #过期时间
listener: # 消费者监听
simple:
concurrency: 5
max-concurrency: 10
acknowledge-mode: manual # ACK 手动签收
prefetch: 2
rabbitmq配置类引入
package com.matrix.queue.rabbitmq.entity;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMQProperties {
/**
* rabbit 地址
*/
private String host;
/**
* rabbit 端口
*/
private String port;
/**
* rabbit 用户名
*/
private String username;
/**
* rabbit password
*/
private String password;
/**
* rabbit virtualHost
*/
private String virtualHost;
/**
* rabbit publisher-confirms
*/
private String publisherConfirms;
/**
* rabbit publisher-returns
*/
private String publisherReturns;
/**
* rabbit connection-timeout
*/
private String connectionTimeout;
}
生产者
package com.matrix.queue.rabbitmq.service.queuepass.service.impl;
import com.matrix.queue.rabbitmq.constant.QueuePassConstant;
import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
import com.matrix.queue.rabbitmq.entity.RabbitMQProperties;
import com.matrix.queue.rabbitmq.service.comm.CommSendService;
import com.matrix.queue.rabbitmq.service.queuepass.service.IQueuePassService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.core.tool.utils.Func;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE;
import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY;
import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_SERVICE_NAME;
/**
* ClassName: GoodsOnlineServiceImpl
* Package: com.matrix.rabbitmq.service.impl
* Description: 消息队列实现类
* @Datetime: 2021/11/9 15:59
* @Author: swordmeng8@163.com
*/
@Slf4j
@Service(ANTIEPIDEMIC_QUEUE_SERVICE_NAME)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class QueuePassServiceImpl implements IQueuePassService {
/** 通用发送方法 */
private final CommSendService commSendService;
//注入rabbitmq的配置
private final RabbitMQProperties rabbitMQConfig;
/**
* 发送消息
* @param reservationQueueId
* @param times
* @return
*/
@Override
public void send(Long reservationQueueId, long times, String callTime) {
//临时队列 尝试解决 mq阻塞问题 后续可能用到
try {
QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-dead-letter-exchange", QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_EXCHANGE);
queueArgs.put("x-dead-letter-routing-key", ANTIEPIDEMIC_QUEUE_EXCHANGE_ROUTING_KEY);//x-dead-letter-routing-key
//获取connectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置链接属性
connectionFactory.setVirtualHost(rabbitMQConfig.getVirtualHost());
connectionFactory.setUsername(rabbitMQConfig.getUsername());
connectionFactory.setPassword(rabbitMQConfig.getPassword());
connectionFactory.setHost(rabbitMQConfig.getHost());
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//获取随机 队列名
String queueName = Func.randomUUID();
shopMessage.setQueueName(queueName);
//生成队列
channel.queueDeclare(queueName, true, false, false, queueArgs);
//获取随机routingKey
String routingKey = Func.randomUUID();
//绑定队列
channel.queueBind(queueName,QueuePassConstant.ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);
commSendService.commSend(shopMessage,times,ANTIEPIDEMIC_QUEUE_EXCHANGE,routingKey);
channel.close();
connection.close();
log.info("排队过号时间修改增加消息队列:商品id:{},失效时间Long值:{},叫号时间LocalDateTime值:{},推送结果:{}",reservationQueueId,times,callTime,true);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
//最初的死信队列 但是 不同时时间有消息阻塞问题
QueuePassMessage shopMessage = new QueuePassMessage(reservationQueueId,times,callTime);
}
}
给每条消息分别定时
package com.matrix.queue.rabbitmq.service.comm;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* @className: CommSendService
* @package: com.matrix.rabbitmq.service
* @description: 消息队列通用发送类
* @datetime: 2021/11/30 18:50
* @author: swordmeng8@163.com
*/
@Service
@AllArgsConstructor
public class CommSendService {
/** 使用RabbitTemplate,这提供了接收/发送等等方法 */
private final RabbitTemplate rabbitTemplate;
public void commSend(Object obj, long times,String exchange,String exchangeRoutingKey){
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(String.valueOf(times));
return message;
}
};
rabbitTemplate.convertAndSend(
exchange,
exchangeRoutingKey,
JSON.toJSONString(obj), processor);
}
}
消费者
package com.matrix.queue.rabbitmq.listener;
import com.alibaba.fastjson.JSONObject;
import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
import com.matrix.queue.rabbitmq.service.comm.BasicAckService;
import com.matrix.queue.reservationQueue.service.IReservationQueueService;
import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import static com.matrix.queue.rabbitmq.constant.QueuePassConstant.ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME;
/**
* ClassName: MqMessageController
* Package: com.matrix.rabbitmq.listener
* Description: 处理消息监听逻辑
* @Datetime: 2021/11/10 11:12
* @Author: swordmeng8@163.com
*/
@Slf4j
@Component
@AllArgsConstructor
public class QueuePassListener {
/**排队信息*/
private final IReservationQueueService reservationQueueService;
/** 签收服务 */
private final BasicAckService basicAckService;
/**
* 描述:处理排队过号状态
* 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
* @param message
* @param channel
*/
@RabbitHandler
@RabbitListener(queues = ANTIEPIDEMIC_QUEUE_DLX_PASS_NAME)
public void handleQueuePassStatus(Message message, Channel channel) {
QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);
Long reservationQueueId = queuePassMessage.getReservationQueueId();
log.info("排队叫号消费消息,排队叫号消息信息:{}",queuePassMessage);
// 校验排队加号时间
boolean checkResult = reservationQueueService.checkQueuePass(reservationQueueId);
if (checkResult) {
// 更新排队过号状态和过号时间
//临时修改 状态码 9为3
boolean result = reservationQueueService.updateQueuePassStatus(9,reservationQueueId).isSuccess();
// 添加商品操作日志
if (!result) {
log.info(reservationQueueId+"排队自动过号失败");
} else {
log.info(reservationQueueId+"排队自动过号成功");
}
}
// 签收
basicAckService.basicAck(message,channel);
}
}
签收
package com.matrix.queue.rabbitmq.service.comm;
import com.alibaba.fastjson.JSONObject;
import com.matrix.queue.rabbitmq.entity.QueuePassMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @className: BasicAckService
* @package: com.matrix.rabbitmq.service.comm
* @description: BasicAck
* @datetime: 2021/11/30 18:57
* @author: swordmeng8@163.com
*/
@Slf4j
@Service
public class BasicAckService {
/**
* 手动签收
* @param message
* @param channel
*/
public void basicAck(Message message, Channel channel){
try {
//手工签收
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
log.info("接受deliveryTag:{}",deliveryTag);
channel.basicAck(deliveryTag, false);
删除 对应的队列 这是 临时队列代码尝试解决 mq 阻塞问题
QueuePassMessage queuePassMessage = JSONObject.parseObject(message.getPayload().toString(), QueuePassMessage.class);
String queueName = queuePassMessage.getQueueName();
channel.queueDelete(queueName);
} catch (
IOException e) {
e.printStackTrace();
log.info("手动签收失败:Cause:{},Message:{}",e.getCause(),e.getMessage());
}
}
}