**,其实也有点这个意思,
死信队列 是 因为下列原因:
1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
2.消息TTL过期
3.队列达到最大长度(队列满了,无法再添加数据到mq中)
应用场景分析
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息

  1. package com.itmayiedu.rabbitmq.config;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.springframework.amqp.core.Binding;
  5. import org.springframework.amqp.core.BindingBuilder;
  6. import org.springframework.amqp.core.DirectExchange;
  7. import org.springframework.amqp.core.FanoutExchange;
  8. import org.springframework.amqp.core.Queue;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.stereotype.Component;
  11. //Fanout 类型 发布订阅模式
  12. @Component
  13. public class FanoutConfig {
  14. /**
  15. * 定义死信队列相关信息
  16. */
  17. public final static String deadQueueName = "dead_queue";
  18. public final static String deadRoutingKey = "dead_routing_key";
  19. public final static String deadExchangeName = "dead_exchange";
  20. /**
  21. * 死信队列 交换机标识符
  22. */
  23. public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
  24. /**
  25. * 死信队列交换机绑定键标识符
  26. */
  27. public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
  28. // 邮件队列
  29. private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
  30. // 短信队列
  31. private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
  32. // fanout 交换机
  33. private String EXCHANGE_NAME = "fanoutExchange";
  34. // 1.定义邮件队列
  35. @Bean
  36. public Queue fanOutEamilQueue() {
  37. // 将普通队列绑定到死信队列交换机上
  38. Map<String, Object> args = new HashMap<>(2);
  39. args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
  40. args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
  41. Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
  42. return queue;
  43. }
  44. // 2.定义短信队列
  45. @Bean
  46. public Queue fanOutSmsQueue() {
  47. return new Queue(FANOUT_SMS_QUEUE);
  48. }
  49. // 2.定义交换机
  50. @Bean
  51. FanoutExchange fanoutExchange() {
  52. return new FanoutExchange(EXCHANGE_NAME);
  53. }
  54. // 3.队列与交换机绑定邮件队列
  55. @Bean
  56. Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
  57. return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
  58. }
  59. // 4.队列与交换机绑定短信队列
  60. @Bean
  61. Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
  62. return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
  63. }
  64. /**
  65. * 配置死信队列
  66. *
  67. * @return
  68. */
  69. @Bean
  70. public Queue deadQueue() {
  71. Queue queue = new Queue(deadQueueName, true);
  72. return queue;
  73. }
  74. @Bean
  75. public DirectExchange deadExchange() {
  76. return new DirectExchange(deadExchangeName);
  77. }
  78. @Bean
  79. public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
  80. return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
  81. }
  82. }
@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String queueName) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email", "644064779");
        jsonObject.put("timestamp", 0);

        String jsonString = jsonObject.toJSONString();
        System.out.println("jsonString:" + jsonString);
        // 生产者发送消息的时候需要设置消息id

        Message message = MessageBuilder.withBody(jsonString.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID() + "").setExpiration("1000").build();

        amqpTemplate.convertAndSend(queueName, message);
    }
}
//邮件队列
@Component
public class FanoutEamilConsumer {

    @RabbitListener(queues = "fanout_email_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        Integer timestamp = jsonObject.getInteger("timestamp");
        try {
            int result = 1 / timestamp;
            System.out.println("result:" + result);
            // 通知mq服务器删除该消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            // // 丢弃该消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }

    }

}

RabbitMQ解决分布式事务原理: 采用最终一致性原理。
需要保证以下三要素
1、确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
2、MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)
3、如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。

@Service
public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public ResponseBase addOrderAndDispatch() {
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setName("蚂蚁课堂永久会员充值");
        orderEntity.setOrderCreatetime(new Date());
        // 价格是300元
        orderEntity.setOrderMoney(300d);
        // 状态为 未支付
        orderEntity.setOrderState(0);
        Long commodityId = 30l;
        // 商品id
        orderEntity.setCommodityId(commodityId);
        String orderId = UUID.randomUUID().toString();
        orderEntity.setOrderId(orderId);
        // ##################################################
        // 1.先下单,创建订单 (往订单数据库中插入一条数据)
        int orderResult = orderMapper.addOrder(orderEntity);
        System.out.println("orderResult:" + orderResult);
        if (orderResult <= 0) {
            return setResultError("下单失败!");
        }
        // 2.使用消息中间件将参数存在派单队列中
        send(orderId);
        return setResultSuccess();
    }

    private void send(String orderId) {
        JSONObject jsonObect = new JSONObject();
        jsonObect.put("orderId", orderId);
        String msg = jsonObect.toJSONString();
        System.out.println("msg:" + msg);
        // 封装消息
        Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8").setMessageId(orderId).build();
        // 构建回调返回的数据
        CorrelationData correlationData = new CorrelationData(orderId);
        // 发送消息
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);

    }

    // 生产消息确认机制
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String orderId = correlationData.getId();
        System.out.println("消息id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送确认成功");
        } else {
            send(orderId);
            System.out.println("消息发送确认失败:" + cause);
        }

    }

}
@Component
public class RabbitmqConfig {

    // 下单并且派单存队列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue";
    // 补单队列,判断订单是否已经被创建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue";
    // 下单并且派单交换机
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";

    // 1.定义订单队列
    @Bean
    public Queue directOrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }

    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

}

补单消费者

@Component
public class CreateOrderConsumer {
    @Autowired
    private OrderMapper orderMapper;

    @RabbitListener(queues = "order_create_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("补单消费者" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        // 判断订单是否存在,如果不存在 实现自动补单机制
        OrderEntity orderEntityResult = orderMapper.findOrderId(orderId);
        if (orderEntityResult != null) {
            System.out.println("订单已经存在 无需补单  orderId:" + orderId);
            return;
        }
        // 订单不存在 ,则需要进行补单

        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setName("蚂蚁课堂永久会员充值");
        orderEntity.setOrderCreatetime(new Date());
        // 价格是300元
        orderEntity.setOrderMoney(300d);
        // 状态为 未支付
        orderEntity.setOrderState(0);
        Long commodityId = 30l;
        // 商品id
        orderEntity.setCommodityId(commodityId);
        orderEntity.setOrderId(orderId);
        // ##################################################
        // 1.先下单,创建订单 (往订单数据库中插入一条数据)
        try {
            int orderResult = orderMapper.addOrder(orderEntity);
            System.out.println("orderResult:" + orderResult);
            if (orderResult >= 0) {
                // 手动签收消息,通知mq服务器端删除该消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            // 丢弃该消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }

    }
}

记得将订单号存放到redis中来实现幂等性
在死性队列保存异常问题 定时job

/**
 * 派单服务
 *
 */
@Component
public class DispatchConsumer {
    @Autowired
    private DispatchMapper dispatchMapper;

    @RabbitListener(queues = "order_dic_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        System.out.println("派单服务平台" + msg + ",消息id:" + messageId);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String orderId = jsonObject.getString("orderId");
        if (StringUtils.isEmpty(orderId)) {
            // 日志记录
            return;
        }
        DispatchEntity dispatchEntity = new DispatchEntity();
        // 订单id
        dispatchEntity.setOrderId(orderId);
        // 外卖员id
        dispatchEntity.setTakeoutUserId(12l);
        // 外卖路线
        dispatchEntity.setDispatchRoute("40,40");
        try {
            int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
            if (insertDistribute > 0) {
                // 手动签收消息,通知mq服务器端删除该消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            e.printStackTrace();
            // // 丢弃该消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

}

RabbitMQ环境搭建.docx

练习导入代码.zip上课代码.zip