如何保证消息不丢失

方案一:开启事务模式

该方案会大幅降低性能,不建议使用

  1. channel.txSelect 是将信道设置成事务模式,
  2. channel.txCommit 用来提交事务
  3. channel.txRollback 用来回滚事务
  1. @Bean("rabbitTransactionManager")
  2. public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
  3. return new RabbitTransactionManager(connectionFactory);
  4. }
  1. @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
  2. public void sendMessage(Object msg) {
  3. // TODO 发送队列消息
  4. rabbitTemplate.convertAndSend(exchange,ingateQueue,msg);
  5. }

方案二:开启消息确认

  1. rabbitmq:
  2. publisher-confirms: true # 开启消息发送确认(保证消息正常到达broker)
  3. publisher-returns: true # 保证消息正确路由到相应的队列
  4. listener:
  5. direct:
  6. acknowledge-mode: manual # 采用手动应答ack
  1. // TODO 处理消息
  2. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //ACK确认

消费顺序处理

发送消息使用convertSendAndReceive方法

  1. # 输出时没有顺序,不需要等待,直接运行
  2. rabbitTemplate.convertAndSend()
  3. # 只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间
  4. rabbitTemplate.convertSendAndReceive()