Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。
springboot集成rabbitmq
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml配置
spring:
rabbitmq:
addresses: localhost:5672
virtual-host: /rabbitmq
username: admin
password: admin
3.创建RabbitmqConfig.java
package com.itmck.springbootmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 太阳当空照,花儿对我笑
* <p>
* Create by M ChangKe 2021/7/17 16:00
**/
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() {
/**
* name:队列名
* durable:是否持久化,默认false.持久化队列会被存储在磁盘上.当消息代理重启仍然存在
* exclusive:默认false,只能被当前的连接使用.当前连接关闭后队列被删除,优先级高于durable
* autoDelete:默认false.没有生产者或者消费者,队列会被删除
* @return
*/
// return new Queue("hello");//源码可知当前和下面一样功能
return new Queue("hello", true, false, false);
}
}
4.创建生产者
package com.itmck.springbootmq.compnent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 太阳当空照,花儿对我笑
* <p>
* Create by M ChangKe 2021/7/17 16:01
**/
@Slf4j
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
log.info("发送消息:{}",message);
this.rabbitTemplate.convertAndSend("hello",message);
}
}
5.创建消费端
package com.itmck.springbootmq.compnent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 太阳当空照,花儿对我笑
* <p>
* Create by M ChangKe 2021/7/17 16:00
**/
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String message) {
log.info("消费端接收消息: {}", message);
}
}
测试如下:
package com.itmck.springbootmq;
import com.itmck.springbootmq.compnent.HelloSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() {
helloSender.send("hello rabbitmq");
}
}
直接启动测试类,因为客生产者以及消费者都在一个服务里面,所以打印如下:
通过api进行创建队列
上面是通过配置进行队列的创建,现在使用api进行队列的创建,这种方式可动态创建多个队列.
代码如下:
@Test
public void create() {
String queueName = "mck_queue";
String dlExchangeName = "mckExchange";
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
try (
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(true)
) {
log.debug("创建交换机");
channel.exchangeDeclare(dlExchangeName, "direct", true);//创建交换机
channel.queueDeclare(queueName, true, false, false, null);//创建队列
channel.queueBind(queueName, dlExchangeName, "DL_KEY", null);//将交换机和队列绑定
channel.close();
log.info("队列创建完成");
} catch (Exception e) {
log.error("队列创建失败", e);
}
}
步骤总结:
- 通过RabbitTemplate获取连接管道Channel实例
- 通过管道创建交换机 channel.exchangeDeclare(dlExchangeName, “direct”, true);
- 声明创建队列 channel.queueDeclare(queueName, true, false, false, null);
- 将交换机和队列绑定 channel.queueBind(queueName, dlExchangeName, “DL_KEY”, null);
控制台如下:
通过rabbitmq控制台可以看到创建完成,交换机如下:
队列如下
通过控制台进行创建队列
rabbitmq提供了控制台进行队列的操作登录控制台 ip:15672 注意:默认控制台端口是15672 不同于配置文件的5672
手动创建队列
点击队列名
创建交换机
同理队列
上面就是rabbitmq几种创建队列的方式
几种常用队列的使用
下面图示: P:代表publisher生产者 c:代表consumer消费者 红色长条:代表队列 X:代表交换机
1.简单模式
简单模式就是使用简单的队列进行监听.一个生产者一个消费者,一个队列.如下:
//通过javaconfig方式进行创建队列
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("hello", true, false, false);
}
}
//创建生产者
@Slf4j
@Component
public class HelloSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
log.info("发送消息:{}",message);
this.rabbitTemplate.convertAndSend("hello",message);
}
}
//创建消费者(监听者),进行消费
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String message) {
log.info("消费端接收消息: {}", message);
}
}
2.work queues模式
工作队列方式:在工人之间分配任务(竞争消费者模式),能者多劳动 默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮询。
代码与上述一样,开启多实例消费端
模拟一个生产者,两个消费者.默认情况下,轮询进行消费
idea开启多实例,配置如下.操作是先进行一个实例启动.然后修改端口再次运行.
3.Publish/Subscribe
发布/订阅模式:同时将向多个消费者传递一条消息。这种模式被称为“发布/订阅”。或者说,发布一条消息同时被多个消费者进行监听. 扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。
所有该exchagne上指定的routing-key都会被ignore掉。
/**
* 太阳当空照,花儿对我笑
* <p>
* Create by M ChangKe 2021/7/17 16:01
**/
@Slf4j
@Component
public class HelloSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
@Slf4j
@Component
@RabbitListener(queues = "fanout.A")
public class FanOutHelloReceiver1 {
@RabbitHandler
public void process(String message) {
log.info("消费端接收消息: {}", message);
}
}
@Slf4j
@Component
@RabbitListener(queues = "fanout.B")
public class FanOutHelloReceiver2 {
@RabbitHandler
public void process(String message) {
log.info("消费端接收消息: {}", message);
}
}
4.Routing路由方式
路由方式这里多了一个路由键,根据不同的路由键,可以将消息分别送到不同的队列,进行消费
Directed Exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。那消息交换机是怎么知道的呢?其实,producer deliver消息的时候会把routing-key add到 message header中。routing-key只是一个messgae的attribute。
Default Exchange
这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
@Configuration
public class RabbitConfig {
@Bean
public Queue directQueue() {
return new Queue("direct_queue");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_Exchange");
}
@Bean
public Binding bindingDirectExchange() { //直连交换机
return BindingBuilder
.bind(directQueue())
.to(directExchange())
.with("direct_key");
}
}
5.Topics主题模式
通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a..c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
@Slf4j
@Component
@RabbitListener(queues = "topic.message")
public class FanOutHelloReceiver1 {
@RabbitHandler
public void process(String message) {
log.info("topic.message消费端接收消息: {}", message);
}
}
@Slf4j
@Component
@RabbitListener(queues = "topic.messages")
public class FanOutHelloReceiver2 {
@RabbitHandler
public void process(String message) {
log.info("topic.messages 消费端接收消息: {}", message);
}
}
运行send1此时 topic.message与topic.messages都可以收到消息
运行send2 此时只有topic.messages可以收到消息
6.RPC模式
不怎么用,省略
7.Publisher Confirms
发布确认模式:包括,是否发送到交换机的确认,以及消息是否路由到队列确认.消费者的nack/ack机制
先看一下知乎帖子:
其实这个确认机制如果不让回原队列,可以配置死信队列进行使用.nack消息,进入死信队列. 如果是程序性bug导致nack,那程序修复后,可以使死信队列中的消息重新入队列,二次消费即可
application.yml
spring:
rabbitmq:
addresses: localhost:5672
virtual-host: /rabbitmq
username: admin
password: admin
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
RabbitmqConfig.java
使用javaconfig方式进行队列的创建与绑定
@Configuration
@EnableRabbit
public class RabbitmqConfig {
//创建队列
@Bean
public Queue ttlQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "spring.direct.deadExchange");
args.put("x-dead-letter-routing-key", "spring.deadRouting");
//args.put("x-message-ttl",10000); // 设置消息过期时间无效果
//args.put("x-expires",10000); // 设置队列过期时间无效果
return new Queue("spring.direct.ttl.queue", true, false, false, args);
}
//创建死信队列
@Bean
public Queue deadQueue() {
return new Queue("spring.direct.deadQueue");
}
//创建交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("spring.direct.exchange");
}
// 这里直接将死信队列以及正常消费队列都绑定到一个交换机上
@Bean
public Binding queueBinding1() {
return BindingBuilder.bind(ttlQueue()).to(directExchange()).with("spring.routing");
}
@Bean
public Binding queueBinding2() {
return BindingBuilder.bind(deadQueue()).to(directExchange()).with("spring.deadRouting");
}
// 这里直接将死信队列以及正常消费队列都绑定到一个交换机上
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
*
* 这是生产端,使用RabbitTemplate连接
*
* @param connectionFactory 连接工厂
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/**
* 这是配置在消费端,SimpleRabbitListenerContainerFactory进行连接
* @param connectionFactory 连接工厂
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
生产者
@Component
@Slf4j
public class RabbitOrderSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// private IBrokerMessageLogService brokerMessageLogService;
@Autowired
private BrokerMessageLogServiceImpl brokerMessageLogService2;
/**
* 发送消息, 构建自定义对象消息
*
* @param order
*/
public void sendOrder(TOrder order) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
/**
* 自定义对象,消息唯一ID,
* 每个发送的消息都需要配备一个 CorrelationData 相关数据对象,
* CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
* 真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。
*/
CorrelationData correlationData = new CorrelationData(order.getMessageId());
// 发送消息
rabbitTemplate.convertAndSend(Constant.ORDER_EXCHANGE, Constant.ORDER_ROUTING, order, correlationData);
log.info("消息已发送,messageId={}", order.getMessageId());
}
/**
* 成功接收后回调, 确认消息被rabbitmq成功接收
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
String messageId = correlationData.getId();
if (ack) {
// 如果成功接收了,就修改消息记录表的状态为success。
BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
brokerMessageLog.setStatus(Constant.ORDER_SEND_SUCCESS);
brokerMessageLog.setUpdateTime(LocalDateTime.now());
QueryWrapper<BrokerMessageLog> queryWrapper = new QueryWrapper<BrokerMessageLog>();
queryWrapper.eq("message_id", messageId);
brokerMessageLogService2.update(brokerMessageLog, queryWrapper);
log.info("消息发送成功, messageId={}", messageId);
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
log.error("消息发送失败, messageId={}", messageId);
}
}
/**
* 失败后回调
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
log.error("消息发送失败, body={}", new String(body));
}
}
消费端
package com.guoj.rabbitmq.receive;
import com.guoj.rabbitmq.entity.TOrder;
import com.guoj.rabbitmq.utils.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Slf4j
public class OrderReceive {
/**
* 使用这个注解,可以自动在rabbitmq中创建出交换机、队列及routingKey的绑定关系。
* 使用时,可以先启动消费方把这些关系都自动创建出来。
* <p>
* exchange 交换机
* ---------------------
* name:交换机名称
* durable: 是否持久化
* type: 消息模式(direct、topic、fanout、header)
* ignoreDeclarationExceptions: 忽略声明异常
* <p>
* value 队列
* --------------------
* value: 哪个队列
* durable:是否持久化
* <p>
* key 路由key
*/
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(
name = Constant.ORDER_EXCHANGE,
durable = Constant.DURABLE,
type = Constant.MESSAGE_TYPE,
ignoreDeclarationExceptions = Constant.IGNORE_DECLARATION_EXCEPTIONS
),
value = @Queue(
value = Constant.ORDER_QUEUE,
durable = Constant.DURABLE
),
key = Constant.ORDER_CONSUMER_KEY
))
public void onOrderMessage(@Payload TOrder order, Channel channel,
@Headers Map<String, Object> headers)
throws Exception {
// 消费消息
log.info("-----------------------收到消息, 开始消费-----------------------");
log.info("订单id: {}", order.getId());
// 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
/**
* 手工ACK,实际中一般使用手工签收,自动签收容易丢失消息。
* 这行如果注释掉,也能收到消息,但是rabbitmq的消息还在,没有没签收。
* 第二个参数是multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
*/
channel.basicAck(deliveryTag, false);
/**
* 手工nack
*
* requeue 第三个参数:再次入队列,如果配合死信队列,这里设置为false.直接进入死信队列
*
*/
channel.basicNack(deliveryTag,false,false);
}
}
当channel.basicNack(deliveryTag,false,false); 时,消息会进入死信队列
常见问题
消息持久化
在生产环境中,我们需要考虑万一生产者挂了,消费者挂了,或者 rabbitmq 挂了怎么样。一般来说,如果生产者挂了或者消费者挂了,其实是没有影响,因为消息就在队列里面。那么万一 rabbitmq 挂了,之前在队列里面的消息怎么办,其实可以做消息持久化,RabbitMQ 会把信息保存在磁盘上。
做法是可以先从 Connection 对象中拿到一个 Channel 信道对象,然后再可以通过该对象设置 消息持久化。
生产者或者消费者断线重连
ACK 确认机制
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。