参考文章:https://blog.csdn.net/qq_37892957/article/details/89296157

安装

MAC系统

安装命令:brew install rabbitmq

安装的路径是 /usr/local/Cellar/rabbitmq/3.8.3,具体情况要视版本而定,我安装的版本是 3.8.3。

接下来就可以启动了,进入安装目录,执行命令:./sbin/rabbitmq-server

接下来可以在浏览器打开 http://localhost:15672,可以看到 RabbitMQ 的管理页面。

登录账号密码:guest/guest

导入Maven依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

TTL 方式

application.properties 配置信息

  1. # 应用名称
  2. spring.application.name=rabbitMq
  3. # 应用服务 WEB 访问端口
  4. server.port=8080
  5. spring.rabbitmq.host=127.0.0.1
  6. spring.rabbitmq.port=5672
  7. spring.rabbitmq.username=guest
  8. spring.rabbitmq.password=guest
  9. # 开启消息确认机制 confirm 异步
  10. spring.rabbitmq.publisher-confirm-type=correlated
  11. # 之前的旧版本 开启消息确认机制的方式
  12. # spring.rabbitmq.publisher-confirms=true
  13. # 开启return机制
  14. spring.rabbitmq.publisher-returns=true
  15. # 消息开启手动确认
  16. spring.rabbitmq.listener.direct.acknowledge-mode=manual
  17. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  18. # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
  19. spring.rabbitmq.listener.simple.prefetch=1
  20. # 是否支持重试
  21. spring.rabbitmq.listener.simple.retry.enabled=true
  22. #消费者最小数量
  23. spring.rabbitmq.listener.simple.concurrency=1
  24. #消费之最大数量
  25. spring.rabbitmq.listener.simple.max-concurrency=10

RabbitmqConfig 配置信息

  1. package com.bean.springcloudproduct.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  5. import org.springframework.context.annotation.Bean;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. /**
  9. * @Description //TODO
  10. * @Author huangwb
  11. **/
  12. @Configuration
  13. public class RabbitmqConfig {
  14. /**
  15. * 死信交换机
  16. *
  17. * @return
  18. */
  19. @Bean
  20. public DirectExchange userOrderDelayExchange() {
  21. return new DirectExchange("user.order.delay_exchange");
  22. }
  23. /**
  24. * 死信队列
  25. *
  26. * @return
  27. */
  28. @Bean
  29. public Queue userOrderDelayQueue() {
  30. Map<String, Object> map = new HashMap<>(16);
  31. map.put("x-dead-letter-exchange", "user.order.receive_exchange");
  32. map.put("x-dead-letter-routing-key", "user.order.receive_key");
  33. return new Queue("user.order.delay_queue", true, false, false, map);
  34. }
  35. /**
  36. * 给死信队列绑定交换机
  37. *
  38. * @return
  39. */
  40. @Bean
  41. public Binding userOrderDelayBinding() {
  42. return BindingBuilder.bind(userOrderDelayQueue()).to(userOrderDelayExchange()).with("user.order.delay_key");
  43. }
  44. /**
  45. * 死信接收交换机
  46. *
  47. * @return
  48. */
  49. @Bean
  50. public DirectExchange userOrderReceiveExchange() {
  51. return new DirectExchange("user.order.receive_exchange");
  52. }
  53. /**
  54. * 死信接收队列
  55. *
  56. * @return
  57. */
  58. @Bean
  59. public Queue userOrderReceiveQueue() {
  60. return new Queue("user.order.receive_queue");
  61. }
  62. /**
  63. * 死信交换机绑定消费队列
  64. *
  65. * @return
  66. */
  67. @Bean
  68. public Binding userOrderReceiveBinding() {
  69. return BindingBuilder.bind(userOrderReceiveQueue()).to(userOrderReceiveExchange()).with("user.order.receive_key");
  70. }
  71. }

生产者

  1. @Slf4j
  2. @Service
  3. public class DeadLetterSenderServiceImpl implements DeadLetterSenderService {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @Override
  7. public void sendLetterSenderMsg() {
  8. User user = new User(1, "confirm", "confirm123456");
  9. MessagePostProcessor postProcessor = message -> {
  10. //在这里也可以设置超时时间,也可以设置x-message-ttl
  11. message.getMessageProperties().setExpiration("5000");
  12. return message;
  13. };
  14. rabbitTemplate.setMandatory(true);
  15. rabbitTemplate.setConfirmCallback(confirmCallback);
  16. rabbitTemplate.setReturnCallback(returnCallback);
  17. CorrelationData correlationData = new CorrelationData("confirm-" + System.currentTimeMillis());
  18. this.rabbitTemplate.convertAndSend("user.order.delay_exchange", "user.order.delay_key", user, postProcessor, correlationData);
  19. }
  20. /**
  21. * 配置 confirm 机制
  22. * 交换机错误出发
  23. */
  24. private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
  25. /**
  26. * @param correlationData 消息相关的数据,一般用于获取 唯一标识 id
  27. * @param b true 消息确认成功,false 失败
  28. * @param s 确认失败的原因
  29. */
  30. @Override
  31. public void confirm(CorrelationData correlationData, boolean b, String s) {
  32. if (b) {
  33. System.out.println("confirm 消息确认成功..." + correlationData.getId() + new Date());
  34. } else {
  35. System.out.println("confirm 消息确认失败..." + correlationData.getId() + " cause: " + s);
  36. }
  37. }
  38. };
  39. /**
  40. * 配置 return 消息机制
  41. * 找不到路由才会触发
  42. */
  43. private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
  44. /**
  45. * return 的回调方法(找不到路由才会触发)
  46. * @param message 消息的相关信息
  47. * @param i 错误状态码
  48. * @param s 错误状态码对应的文本信息
  49. * @param s1 交换机的名字
  50. * @param s2 路由的key
  51. */
  52. @Override
  53. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  54. System.out.println("消息:" + message);
  55. System.out.println(new String(message.getBody()));
  56. System.out.println("回应码:" + i);
  57. System.out.println("回应信息:" + s);
  58. System.out.println("交换机:" + s1);
  59. System.out.println("路由键:" + s2);
  60. }
  61. };
  62. }

消费者

  1. Slf4j
  2. @Component
  3. public class DeadLetterSenderListener {
  4. /**
  5. * @Description 延迟队列
  6. * @Author jxb
  7. * @Date 2019-04-04 16:34:28
  8. */
  9. @RabbitListener(queues = "user.order.receive_queue")
  10. public void getDLMessage(User user, Channel channel, Message message) throws IOException {
  11. try {
  12. System.out.println("延迟队列参数数据 : " + user + new Date());
  13. //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
  14. //deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  15. //multiple(false/true):是否批量确认,false只确认当前consumer一个消息收到,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
  16. //举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
  17. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  18. System.out.println("延迟队列接受到的消息为:" + new String(message.getBody()));
  19. } catch (Exception e) {
  20. if (message.getMessageProperties().getRedelivered()) {
  21. log.error("延迟队列消息已重复处理失败,拒绝再次接收...");
  22. // 拒绝消息
  23. //basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
  24. //deliveryTag:表示消息投递序号。
  25. //requeue:值为 true 消息将重新入队列。
  26. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  27. } else {
  28. log.error("延迟队列消息即将再次返回队列处理...");
  29. //basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
  30. //deliveryTag:表示消息投递序号。
  31. //multiple:是否批量确认。
  32. //requeue:值为 true 消息将重新入队列。
  33. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  34. }
  35. }
  36. }
  37. }

DLX

首先我们需要下载并安装RabbitMQ的延迟插件。

地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将插件文件复制到RabbitMQ安装目录的plugins目录下;

进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代码实现:

配置文件:

  1. /**
  2. * 延时队列交换机 DLX方式
  3. * 注意这里的交换机类型:CustomExchange
  4. *
  5. * @return
  6. */
  7. @Bean
  8. public CustomExchange delayExchange() {
  9. Map<String, Object> args = new HashMap<>(1);
  10. args.put("x-delayed-type", "direct");
  11. return new CustomExchange("exchange.xdelay.delayed", "x-delayed-message", true, false, args);
  12. }
  13. /**
  14. * 延时队列
  15. * DLX方式
  16. * @return
  17. */
  18. @Bean
  19. public Queue immediateQueue() {
  20. // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
  21. return new Queue("queue.xdelay.immediate", true);
  22. }
  23. /**
  24. * 给延时队列绑定交换机
  25. * DLX方式
  26. * @return
  27. */
  28. @Bean
  29. public Binding bindingNotify() {
  30. return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with("exchange.xdelay.delayed").noargs();
  31. }

生产者

  1. @Override
  2. public void sendLetterDLXSenderMsg() {
  3. User user = new User(1, "confirm", "confirm123456");
  4. MessagePostProcessor postProcessor = message -> {
  5. message.getMessageProperties().setDelay(5000);
  6. return message;
  7. };
  8. rabbitTemplate.setMandatory(true);
  9. rabbitTemplate.setConfirmCallback(confirmCallback);
  10. // rabbitTemplate.setReturnCallback(returnCallback);
  11. CorrelationData correlationData = new CorrelationData("confirm-" + System.currentTimeMillis());
  12. this.rabbitTemplate.convertAndSend("exchange.xdelay.delayed", "exchange.xdelay.delayed", user, postProcessor, correlationData);
  13. }

消费者

  1. /**
  2. * DLX方式
  3. * @param user
  4. * @param channel
  5. * @param message
  6. * @throws IOException
  7. */
  8. @RabbitListener(queues = "queue.xdelay.immediate")
  9. public void getDLXMessage(User user, Channel channel, Message message) throws IOException {
  10. try {
  11. System.out.println("DLX延迟队列参数数据 : " + user + new Date());
  12. //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
  13. //deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  14. //multiple(false/true):是否批量确认,false只确认当前consumer一个消息收到,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
  15. //举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
  16. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  17. System.out.println("DLX延迟队列接受到的消息为:" + new String(message.getBody()));
  18. } catch (Exception e) {
  19. if (message.getMessageProperties().getRedelivered()) {
  20. log.error("DLX延迟队列消息已重复处理失败,拒绝再次接收...");
  21. // 拒绝消息
  22. //basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
  23. //deliveryTag:表示消息投递序号。
  24. //requeue:值为 true 消息将重新入队列。
  25. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  26. } else {
  27. log.error("DLX延迟队列消息即将再次返回队列处理...");
  28. //basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
  29. //deliveryTag:表示消息投递序号。
  30. //multiple:是否批量确认。
  31. //requeue:值为 true 消息将重新入队列。
  32. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  33. }
  34. }
  35. }