六、死信队列
6.1 死信的概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
6.2 死信的来源
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false
6.3 代码实践
6.3.1 消息TTL过期
```java package com.rem.rabbitmq.ee.HdeadLetterQueue;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rem.rabbitmq.ee.RabbitMqUtils;
import java.util.Scanner;
/**
- 死信队列
- 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
- 死信来源
- 消息TTL过去
- 队列达到最大长度
- 消息被拒绝 *
- @author Rem
- @date 2021-12-27 */
public class TTLProducer {
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();/*** 发送消息* 设置消息过期时间 ms*/AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL, properties, message.getBytes());System.err.println("发送消息完毕" + message);}}
}
```javapackage com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;/*** 普通消费者-验证ttl过期** @author Rem* @date 2021-12-27*/public class ConsumerTTL15 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);//声明死信队列channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);//死信队列绑定死信交换机与routingkeychannel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);/********************************************************************************///声明普通交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);//声明普通队列 并且绑定死信交换机//设置参数Map<String, Object> arguments = new HashMap<>();//设置过期时间 单位ms 10s 一般都在生产方设置//arguments.put("x-message-ttl", 10000);arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_TTL, false, false, false, arguments);//普通交换机与普通队列通过普通routingkey 进行绑定channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_TTL, RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL);//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者-NORMAL-TTL 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_TTL, true, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** 死信消费者** @author Rem* @date 2021-12-27*/public class ConsumerDead16 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机// channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);//声明死信队列// channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);//死信队列绑定死信交换机与routingkey// channel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的死信消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者-DEAD 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_DEAD, true, deliverCallback, cancelCallback);}}
6.3.2 消息到达最大长度
package com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 死信队列* 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。* <p>* 死信来源* * 消息TTL过去* * 队列达到最大长度* * 消息被拒绝** @author Rem* @date 2021-12-29*/public class MaxLengthProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, BuiltinExchangeType.DIRECT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, RabbitMqUtils.ROUTING_KEY_NORMAL_MAXLENGTH, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;/*** 普通消费者-验证队列最大长度** @author Rem* @date 2021-12-27*/public class ConsumerMaxLength17 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);//声明死信队列channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);//死信队列绑定死信交换机与routingkeychannel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);/********************************************************************************///声明普通交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, BuiltinExchangeType.DIRECT);//声明普通队列 并且绑定死信交换机//设置参数Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);/*** 设置队列最大长度*/arguments.put("x-max-length", 6);channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, false, false, false, arguments);//普通交换机与普通队列通过普通routingkey 进行绑定channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, RabbitMqUtils.EXCHANGE_NORMAL_MAXLENGTH, RabbitMqUtils.ROUTING_KEY_NORMAL_MAXLENGTH);//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者-NORMAL-MaxLength 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_MAXLENGTH, true, deliverCallback, cancelCallback);}}
6.3.3 消息被拒绝
package com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 死信队列* 某些时候由于特定的原因导致queue中的某些消息无法被消费 这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。* <p>* 死信来源* * 消息TTL过去* * 队列达到最大长度* * 消息被拒绝** @author Rem* @date 2021-12-29*/public class RejectProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, BuiltinExchangeType.DIRECT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, RabbitMqUtils.ROUTING_KEY_NORMAL_REJECT, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.HdeadLetterQueue;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;/*** 普通消费者-验证拒绝消息** @author Rem* @date 2021-12-27*/public class ConsumerReject18 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);//声明死信队列channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);//死信队列绑定死信交换机与routingkeychannel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);/********************************************************************************///声明普通交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_REJECT, BuiltinExchangeType.DIRECT);//声明普通队列 并且绑定死信交换机//设置参数Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);/*** 设置队列最大长度*/arguments.put("x-max-length", 6);channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_REJECT, false, false, false, arguments);//普通交换机与普通队列通过普通routingkey 进行绑定channel.queueBind(RabbitMqUtils.QUEUE_NORMAL_REJECT, RabbitMqUtils.EXCHANGE_NORMAL_REJECT, RabbitMqUtils.ROUTING_KEY_NORMAL_REJECT);//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息,并且拒绝:" + new String(message.getBody()));/*** 拒绝消息* DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签* requeue – 如果被拒绝的消息应该重新排队而不是丢弃/死信,则为真*/channel.basicReject(message.getEnvelope().getDeliveryTag(), false);};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者-NORMAL-REJECT 等待消费");/*** 设置自动应答为false*/boolean autoAck = false;channel.basicConsume(RabbitMqUtils.QUEUE_NORMAL_REJECT, autoAck, deliverCallback, cancelCallback);}}
七、延迟队列
使用死信队列实现延迟队列
package com.rem.rabbitmq.ee.Idelay;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 延迟队列 -死信队列中ttl的实现* 设置ttl过期两种方法* ** 在队列上设置过期时间 arguments.put("x-message-ttl", 10000) 一旦消息过期就被丢弃,如果配置了死信队列就丢到死信队列* ** 在消息上设置过期时间 new AMQP.BasicProperties().builder().expiration("10000").build() 如果不设置代表永不过期;如果设置为0代表直接发送给消费者,否则丢弃消息** @author Rem* @date 2021-12-27*/public class DelayProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();/*** 发送消息* 设置消息过期时间 ms*/AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("8000").build();channel.basicPublish(RabbitMqUtils.EXCHANGE_NORMAL_TTL, RabbitMqUtils.ROUTING_KEY_NORMAL_TTL, properties, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.Idelay;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;/*** 普通消费者-延迟队列** @author Rem* @date 2021-12-27*/public class Consumer19 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DEAD, BuiltinExchangeType.DIRECT);//声明死信队列channel.queueDeclare(RabbitMqUtils.QUEUE_DEAD, false, false, false, null);//死信队列绑定死信交换机与routingkeychannel.queueBind(RabbitMqUtils.QUEUE_DEAD, RabbitMqUtils.EXCHANGE_DEAD, RabbitMqUtils.ROUTING_KEY_DEAD);/********************************************************************************///声明普通交换机channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_NORMAL_TTL, BuiltinExchangeType.DIRECT);//声明普通队列 并且绑定死信交换机//设置参数Map<String, Object> arguments = new HashMap<>();//设置过期时间 单位ms 10s 一般都在生产方设置//arguments.put("x-message-ttl", 10000);arguments.put("x-dead-letter-exchange", RabbitMqUtils.EXCHANGE_DEAD);arguments.put("x-dead-letter-routing-key", RabbitMqUtils.ROUTING_KEY_DEAD);channel.queueDeclare(RabbitMqUtils.QUEUE_NORMAL_TTL, false, false, false, arguments);//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的死信消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");/*** 直接延迟队列接收消息*/System.out.println("消费者-DEAD 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_DEAD, true, deliverCallback, cancelCallback);}}
安装插件实现延迟队列
#拷贝到rabbitmq容器 773067241f96 中docker cp /home/rabbitmq/rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins#进入容器docker exec -it rabbitmq /bin/bash#启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange#查看rabbitmq-plugins list#重新启动容器docker restart rabbitmq
集成springboot代码实现
server:servlet:context-path: /rabbitspring:rabbitmq:host: remzhi.topport: 5672username: rempassword: 123456
package com.rem.rabbitmq.boot.Adelay;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 使用死信队列实现 延迟队列** @author Rem* @date 2021-12-29*/@Configurationpublic class QueueConfig {/*** 普通交换机 队列 routingKey*/public static final String X_EXCHANGE = "X";public static final String XA_ROUTING_KEY = "XA";public static final String XB_ROUTING_KEY = "XB";public static final String QA_QUEUE = "QA";public static final String QB_QUEUE = "QB";public static final String XC_ROUTING_KEY = "XC";public static final String QC_QUEUE = "QC";/*** 死信交换机 队列 routingKey*/public static final String Y_DEAD_EXCHANGE = "Y";public static final String YD_DEAD_ROUTING_KEY = "YD";public static final String QD_DEAD_QUEUE = "QD";/*********************************普通队列QA*********************************************************//*** 声明普通交换机X** @return*/@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}/*** 声明普通队列QA并且绑定死信交换机** @return*/@Bean("queueA")public Queue queueA() {/*** 手动设置参数*//* Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key", YD_DEAD_ROUTING_KEY);arguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();*//*** 根据内置api设置参数 过期时间 ms*/return QueueBuilder.durable(QA_QUEUE).deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).ttl(10000).build();}/*** 普通交换机与普通队列QA通过 普通routingKey XA 绑定** @param queueA* @param xExchange* @return*/@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with(XA_ROUTING_KEY);}/*********************************普通队列QB*********************************************************//*** 声明普通队列QB 并且绑定死信交换机** @return*/@Bean("queueB")public Queue queueB() {/*** 根据内置api设置参数*/return QueueBuilder.durable(QB_QUEUE).deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).ttl(40000).build();}/*** 普通交换机与普通队列通过QB 普通routingKey 绑定** @param queueB* @param xExchange* @return*/@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with(XB_ROUTING_KEY);}/*********************************死信队列QD*********************************************************//*** 声明死信交换机Y** @return*/@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_EXCHANGE);}/*** 声明死信队列** @return*/@Bean("queueD")public Queue queueD() {return new Queue(QD_DEAD_QUEUE);}/*** 死信交换机与死信队列通过 死信routingKey XA 绑定** @param queueD* @param yExchange* @return*/@Beanpublic Binding deadLetterBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with(YD_DEAD_ROUTING_KEY);}/*********************************普通队列QC*********************************************************//*** 声明普通队列QC 并且绑定死信交换机* 不设置过期时间 交给生产方设置* *因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,* *而第二个消息的延时时长很短,第二个消息并不会优先得到执行。** @return*/@Bean("queueC")public Queue queueC() {/*** 根据内置api设置参数*/return QueueBuilder.durable(QC_QUEUE).deadLetterExchange(Y_DEAD_EXCHANGE).deadLetterRoutingKey(YD_DEAD_ROUTING_KEY).build();}/*** 普通交换机与普通队列通过QC 普通routingKey 绑定** @param queueC* @param xExchange* @return*/@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with(XC_ROUTING_KEY);}}
package com.rem.rabbitmq.boot.Adelay;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 使用插件实现rabbitMQ 延迟队列** @author Rem* @date 2022-01-03*/@Configurationpublic class DelayQueueConfig {/*** 延迟交换机 队列 routingKey*/public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";public static final String DELAY_QUEUE = "DELAY_QUEUE";@Beanpublic DirectExchange delayExchange() {//自定义参数// Map<String, Object> arguments = new HashMap<>();// //自定义交换机的类型// arguments.put("x-delayed-type", "direct");// return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);//使用api设置return ExchangeBuilder.directExchange(DELAY_EXCHANGE).delayed().build();}@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE).build();}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}
package com.rem.rabbitmq.boot.Adelay;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;/*** @author Rem* @date 2021-12-30*/@Slf4j@RestController@RequestMapping("/delay")@Api(tags = "延迟队列")public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "队列设置过期时间 的延迟消息")@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送一条信息给两个TTL队列:{}", LocalDateTime.now(), message);rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XA_ROUTING_KEY, "消息来自ttl为10S的队列:" + message);rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XB_ROUTING_KEY, "消息来自ttl为40S的队列:" + message);}@ApiOperation(value = "消息设置过期时间 的延迟消息")@GetMapping("/sendMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {log.info("当前时间:{},发送一条信息:{},,并且设置ttl:{}", LocalDateTime.now(), message, ttlTime);/*** setExpiration 设置过期时间*/MessagePostProcessor messagePostProcessor = messageProperties -> {messageProperties.getMessageProperties().setExpiration(ttlTime);return messageProperties;};rabbitTemplate.convertAndSend(QueueConfig.X_EXCHANGE, QueueConfig.XC_ROUTING_KEY, message, messagePostProcessor);}@ApiOperation(value = "设置插件-发送生产端设置过期时间 的延迟消息")@GetMapping("/sendDelayMsg/{message}/{ttlTime}")public void sendDelayMsg(@PathVariable String message, @PathVariable Integer ttlTime) {log.info("当前时间:{},发送一条信息:{},,使用延迟队列插件 并且设置ttl:{}", LocalDateTime.now(), message, ttlTime);/*** setDelay 设置延迟时间*/MessagePostProcessor messagePostProcessor = messageProperties -> {messageProperties.getMessageProperties().setDelay(ttlTime);return messageProperties;};rabbitTemplate.convertAndSend(DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_ROUTING_KEY, message, messagePostProcessor);}}
/*** 监听死信队列里的消息** @param message*/@RabbitListener(queues = QueueConfig.QD_DEAD_QUEUE)public void receiveD(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列信息{}", LocalDateTime.now(), msg);}/*** 监听使用插件的延迟队列消息** @param message*/@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)public void receiveDelay(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列信息{}", LocalDateTime.now(), msg);}
八、发布确认高级
server:servlet:context-path: /rabbitspring:rabbitmq:host: remzhi.topport: 5672username: rempassword: 123456#设置发布确认模式# correlated :发布消息成功到交换器后会触发回调方法# simple:其一效果和CORRELATED值一样会触发回调方法#其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法 等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑#要注意的点是 waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到brokerpublisher-confirm-type: correlated#设置消息回退 默认false#true时 当无法被路由的消息会回退给生产者publisher-returns: true
package com.rem.rabbitmq.boot.Bconfirm;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author Rem* @date 2021-12-29*/@Configurationpublic class ConfirmConfig {/*** 发布确认交换机 队列 routingKey*/public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";public static final String CONFIRM_ROUTING_KEY = "CONFIRM_ROUTING_KEY";public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";/*** 声明业务交换机** @return*/@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).build();}/*** 声明发布确认队列** @return*/@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE).build();}/*** 声明绑定关系** @param confirmQueue* @param confirmExchange* @return*/@Beanpublic Binding confirmQueueaBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}}
package com.rem.rabbitmq.boot.Bconfirm;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;/*** @author Rem* @date 2021-12-30*/@Slf4j@RestController@RequestMapping("/comfirm")@Api(tags = "发布确认")public class ConfirmController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "发送一条确认的消息")@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);CorrelationData correlationData = new CorrelationData("111");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUTING_KEY, ConfirmConfig.CONFIRM_ROUTING_KEY + ":" + message, correlationData);/*** 发送到一个假的exchange上* 消息ack为false*/CorrelationData correlationData2 = new CorrelationData("222");rabbitTemplate.convertAndSend("FAKE_EXCHANGE", ConfirmConfig.CONFIRM_ROUTING_KEY, ConfirmConfig.CONFIRM_ROUTING_KEY + ":" + message, correlationData2);/*** 发送到一个假的routingKey上* 消息会被退回*/CorrelationData correlationData3 = new CorrelationData("333");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, "FAKE_ROUTING_KEY", "FAKE_ROUTING_KEY" + ":" + message, correlationData3);log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);}}
/*** 发布确认队列监听** @param message*/@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void receiveConfirem(Message message) {String msg = new String(message.getBody());log.info("发布确认消费者收到的消息是:{}", msg);}/*** 报警队列监听** @param message*/@RabbitListener(queues = BackupExchangeConfig.WARING_QUEUE)public void receiveBackupWaring(Message message) {String msg = new String(message.getBody());log.info("报警队列收到的消息是:{}", msg);}
备份交换机
package com.rem.rabbitmq.boot.CbackupExchange;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** @author Rem* @date 2022/1/8*/@Configurationpublic class BackupExchangeConfig {/*** 普通交换机(发布确认) 队列 routingKey*/public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_ROUTING_KEY = "NORMAL_ROUTING_KEY";public static final String NORMAL_QUEUE = "NORMAL_QUEUE";/*** 备份交换机 备份队列 报警队列*/public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";public static final String BACKUP_QUEUE = "BACKUP_QUEUE";public static final String WARING_QUEUE = "WARING_QUEUE";/*** 声明备份交换机** @return*/@Bean("backupExchange")public FanoutExchange backupExchange() {return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).build();}/*** 声明备份队列** @return*/@Bean("backupQueue")public Queue backupQueue() {return QueueBuilder.durable(BACKUP_QUEUE).build();}/*** 声明报警队列** @return*/@Bean("waringQueue")public Queue waringQueue() {return QueueBuilder.durable(WARING_QUEUE).build();}/*** 声明备份队列绑定关系** @param backupQueue* @param backupExchange* @return*/@Beanpublic Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {return BindingBuilder.bind(backupQueue).to(backupExchange);}/*** 声明报警队列绑定关系** @param waringQueue* @param backupExchange* @return*/@Beanpublic Binding waringQueueaBinding(@Qualifier("waringQueue") Queue waringQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {return BindingBuilder.bind(waringQueue).to(backupExchange);}/*** 声明业务交换机* **绑定备份交换机** @return*/@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).alternate(BACKUP_EXCHANGE).build();}/*** 声明发布确认队列** @return*/@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();}/*** 声明普通绑定关系** @param normalQueue* @param normalExchange* @return*/@Beanpublic Binding normalQueueaBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}
package com.rem.rabbitmq.boot;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @author Rem* @date 2022/1/5*/@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//依赖注入rabbitTemplate之后再设置它的回调对象@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 回调接口* 当发送的消息传给不存在的交换机 ack为false* 当发送的消息传给存在的交换机,但是routingkey错误时 ack依然是true 表示交换机收到了消息但是是路由出错** @param correlationData 发布确认基类* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData != null) {String id = correlationData.getId();if (ack) {log.info("交换机已经收到id为:{}的消息", id);} else {log.warn("交换机还未收到id为:{}消息,由于原因:{}", id, cause);}}}/*** 只有消息不可到达时 才会回退消息* 回退消息** @param returned*/@Overridepublic void returnedMessage(ReturnedMessage returned) {String message = new String(returned.getMessage().getBody());log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",message, returned.getExchange(), returned.getReplyText(), returned.getRoutingKey());}}
package com.rem.rabbitmq.boot.CbackupExchange;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;/*** @author Rem* @date 2022/1/8*/@Slf4j@RestController@RequestMapping("/backup")@Api(tags = "备份交换机")public class BackupExchangeController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "备份交换机发送一条确认的消息")@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);CorrelationData correlationData = new CorrelationData("111");rabbitTemplate.convertAndSend(BackupExchangeConfig.NORMAL_EXCHANGE, BackupExchangeConfig.NORMAL_ROUTING_KEY, BackupExchangeConfig.NORMAL_ROUTING_KEY + ":" + message, correlationData);/*** 发送到一个假的exchange上* 消息ack为false*/CorrelationData correlationData2 = new CorrelationData("222");rabbitTemplate.convertAndSend("FAKE_EXCHANGE", BackupExchangeConfig.NORMAL_EXCHANGE, BackupExchangeConfig.NORMAL_ROUTING_KEY + ":" + message, correlationData2);/*** 发送到一个假的routingKey上* 设置了备份交换机 消息不会被退回 (备份交换机的级别比设置退回消息权限高)*/CorrelationData correlationData3 = new CorrelationData("333");rabbitTemplate.convertAndSend(BackupExchangeConfig.NORMAL_EXCHANGE, "FAKE_ROUTING_KEY", "FAKE_ROUTING_KEY" + ":" + message, correlationData3);log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);}}
九、rabbitMQ中其他知识点
9.1 幂等性
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。
解决思路
- redis天然具有幂等性 可以使用redis执行setnx,从而实现不重复消费
- 唯一id加指纹码机制,在分布式系统中生成唯一的mq id 来实现
9.2 优先级队列
```java package com.rem.rabbitmq.boot.Dpriority;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
/**
- 优先级队列 *
- @author Rem
- @date 2021-12-29 */
@Configuration public class PriotiryConfig {
public static final String PRIOTIRY_EXCHANGE = "PRIOTIRY_EXCHANGE";public static final String PRIOTIRY_ROUTING_KEY = "PRIOTIRY_ROUTING_KEY";public static final String PRIOTIRY_QUEUE = "PRIOTIRY_QUEUE";@Bean("priotiryExchange")public DirectExchange priotiryExchange() {return ExchangeBuilder.directExchange(PRIOTIRY_EXCHANGE).build();}/*** 设置优先级 0~255 官网推荐1-10 设置高比较吃内存和cpu* 发送的消息也必须设置优先级** @return*/@Bean("priotiryQueue")public Queue priotiryQueue() {return QueueBuilder.durable(PRIOTIRY_QUEUE).maxPriority(5).build();}@Beanpublic Binding priotiryQueueaBinding(@Qualifier("priotiryQueue") Queue priotiryQueue, @Qualifier("priotiryExchange") DirectExchange priotiryExchange) {return BindingBuilder.bind(priotiryQueue).to(priotiryExchange).with(PRIOTIRY_ROUTING_KEY);}
}
```javapackage com.rem.rabbitmq.boot.Dpriority;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;/*** @author Rem* @date 2021-12-30*/@Slf4j@RestController@RequestMapping("/priotiry")@Api(tags = "优先级队列")public class PriotiryController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "发送一条的消息并且遍历")@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {for (int i = 0; i < 10; i++) {String msg = i + ":" + message;log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), msg);if (i % 3 == 0) {//给消息赋予一个priority属性Integer finalI = i;MessagePostProcessor messagePostProcessor = messageProperties -> {messageProperties.getMessageProperties().setPriority(finalI);return messageProperties;};rabbitTemplate.convertAndSend(PriotiryConfig.PRIOTIRY_EXCHANGE, PriotiryConfig.PRIOTIRY_ROUTING_KEY, msg, messagePostProcessor);} else {rabbitTemplate.convertAndSend(PriotiryConfig.PRIOTIRY_EXCHANGE, PriotiryConfig.PRIOTIRY_ROUTING_KEY, msg);}log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), msg);}}}
/*** 优先级队列监听** @param message*/@RabbitListener(queues = PriotiryConfig.PRIOTIRY_QUEUE)public void receivePriotiry(Message message) {String msg = new String(message.getBody());log.info("优先级队列收到的消息是:{}", msg);}
9.3 惰性队列
package com.rem.rabbitmq.boot.Elazy;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 惰性队列* 在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB** @author Rem* @date 2021-12-29*/@Configurationpublic class LazyConfig {public static final String LAZY_EXCHANGE = "LAZY_EXCHANGE";public static final String LAZY_ROUTING_KEY = "LAZY_ROUTING_KEY";public static final String LAZY_QUEUE = "LAZY_QUEUE";@Bean("lazyExchange")public DirectExchange lazyExchange() {return ExchangeBuilder.directExchange(LAZY_EXCHANGE).build();}/*** 声明队列为惰性队列 消息直接存放在磁盘中** @return*/@Bean("lazyQueue")public Queue lazyQueue() {return QueueBuilder.durable(LAZY_QUEUE).lazy().build();}@Beanpublic Binding lazyQueueaBinding(@Qualifier("lazyQueue") Queue lazyQueue, @Qualifier("lazyExchange") DirectExchange lazyExchange) {return BindingBuilder.bind(lazyQueue).to(lazyExchange).with(LAZY_ROUTING_KEY);}}
package com.rem.rabbitmq.boot.Elazy;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;/*** @author Rem* @date 2021-12-30*/@Slf4j@RestController@RequestMapping("/lazy")@Api(tags = "惰性队列")public class LazyController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "发送一条的消息")@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送消息开始:{}", LocalDateTime.now(), message);rabbitTemplate.convertAndSend(LazyConfig.LAZY_EXCHANGE, LazyConfig.LAZY_ROUTING_KEY, message);log.info("当前时间:{},发送消息完毕:{}", LocalDateTime.now(), message);}}
/*** 惰性队列监听** @param message*/@RabbitListener(queues = LazyConfig.LAZY_QUEUE)public void receiveLazy(Message message) {String msg = new String(message.getBody());log.info("惰性队列 的消息是:{}", msg);}
十、rabbitMQ集群
10.1 docker 搭建rabbitMQ集群
创建三个子文件夹
mkdir -p /home/rabbitmqcd /mydata/rabbitmq-clustermkdir rabbitmq-1 rabbitmq-2 rabbitmq-3
分别创建三个docker容器
docker run --hostname rabbitmq-1 --name rabbitmq-1 \-p 15675:15672 \-p 5675:5672 \-v /home/rabbitmq/rabbitmq-1:/var/lib/rabbitmq \-e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \-id rabbitmq:management
docker run --hostname rabbitmq-2 --name rabbitmq-2 \-p 15673:15672 \-p 5673:5672 \-v /home/rabbitmq/rabbitmq-2:/var/lib/rabbitmq \-e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \--link rabbitmq-1:rabbitmq-1 \-id rabbitmq:management
docker run --hostname rabbitmq-3 --name rabbitmq-3 \-p 15674:15672 \-p 5674:5672 \-v /home/rabbitmq/rabbitmq-3:/var/lib/rabbitmq \-e RABBITMQ_ERLANG_COOKIE='rabbitmq-cookie' \--link rabbitmq-1:rabbitmq-1 \--link rabbitmq-2:rabbitmq-2 \-id rabbitmq:management
让节点加入集群
docker exec -it rabbitmq-1 /bin/bashrabbitmqctl stop_apprabbitmqctl resetrabbitmqctl start_app
docker exec -it rabbitmq-2 /bin/bashrabbitmqctl stop_apprabbitmqctl resetrabbitmqctl join_cluster --ram rabbit@rabbitmq-1rabbitmqctl start_app
docker exec -it rabbitmq-1 /bin/bashrabbitmqctl stop_apprabbitmqctl resetrabbitmqctl start_app
10.2 镜像队列
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
自定义一个已rem 开头的 自定义镜像队列

创建结果
创建一个已rem开始的队列 在控制台就可以看到备份了一份,如果1号宕机,就会自动在三号再备份一份
10.3 高可用负载均衡
HAProxy提供高可用性、负载均衡及基于TCPHTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
如果前面配置的HAProxy主机突然宕机或者网卡失效,那么虽然RbbitMQ集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入Keepalived它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移.
搭建效果图
