1、引入 rabbitmq 依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、修改 application.properties 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root@mq
spring.rabbitmq.virtual-host=/
# 发送者开启 confirm 确认机制
# 监听消息是否 到达 exchange
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
#监听消息是否 没有到达 queue
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
#spring.rabbitmq.listener.simple.retry.enabled=true
3、定义交换机和队列
@Configuration
public class QueueConfig {
/**
* 确认队列
* @return
*/
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
/**
* 交换机
* @return
*/
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
/**
* 测试绑定队列和广播交换机
* @param confirmTestExchange
* @param confirmTestQueue
* @return
*/
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
4、回调确认配置
4.1、什么是确认
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue 的过程中,消息是否成功投递。
- 消息从 producer 到 rabbitmq broker 有一个 confirmCallback 确认模式。
- 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
4.2、 ConfirmCallback 确认模式
@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
* ack:消息投递到 broker 的状态,true 表示成功。
* cause:表示投递失败的原因。
* 消息没有到达 exchange 会回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
4.3、ReturnCallback 退回模式
//旧版本 RabbitTemplate.ReturnCallback
@Slf4j
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
/**
* 如果消息未能投递到目标 queue 里将触发回调 returnCallback ,
* 一旦向 queue 投递消息未成功,
* 这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}"
, returnedMessage.getReplyCode()
, returnedMessage.getReplyText()
, returnedMessage.getMessage()
, returnedMessage.getRoutingKey());
}
}
4.4、配置 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//消息投递到 交换机 确认回调
template.setConfirmCallback(confirmCallbackService());
//消息投递到 队列 确认回调
template.setReturnsCallback(returnCallbackService());
return template;
}
4.5、配置消息过期(可选)
设置消息的过期时间
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置单个消息过期
messageProperties.setExpiration();
return message;
}
});
设置队列过期(超时之后不进入死信)
@Bean
public Queue newMerchantQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key","lock_merchant_routing_key");
//过期时间,单位毫秒
args.put("x-message-ttl",10000);
return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();
}
5、消息监听
@Slf4j
@Component
//监听队列
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("小富收到消息:{}", msg);
//TODO 具体业务
//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
6、踩坑日志
6.1、不消息确认
开启消息确认机制,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。
6.2、消息无限投递
开启消息确认机制,发生异常后将消息重新投入队列, channel.basicNack 是从重新入队列头部。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
解决方案
- 先将消息进行应答,重新发送消息到队列,此时,消息是入队尾
- 优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入 MySQL 并推送报警,进行人工处理和定时任务做补偿。
6.3、重复消费
借助 MySQL、或者 redis 将消息持久化,通过再消息中的唯一性属性校验。
7、延迟队列实现
7.1、方式
- 使用死信队列
- 使用插件 ,需要安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
7.2、死信队列实现
7.2.1、介绍
- RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现
- TTL:RabbitMQ可以对队列和消息各自设置存活时间,规则是两者中较小的值,即队列无消费者连接的消息过期时间,或者消息在队列中一直未被消费的过期时间
- DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
进入死信队列的情况
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期。TTL:Time To Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。
-
7.2.2、声明队列
```java /**
- 创建死信交换机
- @return */ @Bean public Exchange lockMerchantDeadExchange(){ return new TopicExchange(“lock_merchant_dead_exchange”,true,false); }
/**
- 创建死信队列
- @return */ @Bean public Queue lockMerchantDeadQueue(){ return QueueBuilder.durable(“lock_merchant_dead_queue”).build(); }
/**
- 绑定死信交换机和死信队列
@return */ @Bean public Binding lockMerchantBinding(){
return new Binding(“lock_merchant_dead_queue”
,Binding.DestinationType.QUEUE,
"lock_merchant_dead_exchange"
,"lock_merchant_routing_key"
,null);
}
/**
* 创建普通交换机
* @return
*/
@Bean
public Exchange newMerchantExchange(){
return new TopicExchange("new_merchant_exchange",true,false);
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue newMerchantQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange","lock_merchant_dead_exchange");
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key","lock_merchant_routing_key");
//过期时间,单位毫秒
args.put("x-message-ttl",10000);
return QueueBuilder.durable("new_merchant_queue").withArguments(args).build();
}
/**
* 绑定交换机和队列
* @return
*/
@Bean
public Binding newMerchantBinding(){
return new Binding("new_merchant_queue",
Binding.DestinationType.QUEUE,
"new_merchant_exchange",
"new_merchant_routing_key",
null);
}
<a name="yACOr"></a>
##### 7.2.3、监听队列
```java
@Slf4j
@Component
//监听队列
@RabbitListener(queues = "lock_merchant_dead_queue")
public class DelayReceiverMessageHandler {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("小富收到消息:{}", msg);
//TODO 具体业务
log.info("队列延迟的时间 {} ",message.getMessageProperties().getDelay());
log.info("接受时间 {}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
7.2.4、发送
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置单个消息过期(优先级高于队列的过期时间配置)
messageProperties.setExpiration("6000");
return message;
}
});