介绍
:::tips
WorkQueues工作队列模型就是让多个消费者绑定到一个队列,共同消费队列中的消息,每条消息只会被一个消费者接收
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度,长此以往,消息就会堆积越来越多,无法及时处理,此时就可以使用WorkQueues工作队列模型,多个消费者共同处理消息,就能大大提高速度 :::
引入依赖
:::tips 在父工程中引入SpringAMQP的依赖(在消息生产者和消息消费者中都引入依赖) :::
<!-- SpringAMQP依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
发送消息
添加配置
:::tips 在消息生产者的配置文件中添加配置 :::
spring:
rabbitmq:
#RabbitMQ服务的IP地址
host: IP地址
#RabbitMQ服务的端口号
port: 端口号
#RabbitMQ的虚拟主机
virtual-host: /
#RabbitMQ服务的用户名
username: 用户名
#RabbitMQ服务的密码
password: 密码
编写代码
:::tips 在消息生产者中需要发送消息的类里面注入RabbitTemplate对象,然后调用这个对象的convertAndSend方法(参数一是队列名称,参数二是消息内容)来发送消息 :::
@SpringBootTest
public class MyTest{
//注入RabbitTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
//指定队列名称
String queueName = "work.queue";
//构建消息
String message = "这是一条测试消息";
//发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
接收消息
添加配置
:::tips 在每个消息接收者的配置文件中添加配置 :::
spring:
rabbitmq:
#RabbitMQ服务的IP地址
host: IP地址
#RabbitMQ服务的端口号
port: 端口号
#RabbitMQ的虚拟主机
virtual-host: /
#RabbitMQ服务的用户名
username: 用户名
#RabbitMQ服务的密码
password: 密码
编写代码
:::tips 在每个消息消费者中新建一个listener包,然后在listener包下创建一个类,类上需要打上@Component注解将其注册到Spring容器中,否则无法接收消息
在这个类里面创建一个方法,方法上打上@RabbitListener注解,用来监听消息
- 指定@RabbitListener注解的queuesToDeclare属性为@Queue注解
- 指定@Queue注解的value属性为需要监听的队列名,如果此队列不存在会自动创建
在这个类里面创建一个方法,方法上打上@RabbitListener注解,指定注解的queues属性为消息队列名(可以指定多个),在方法中添加一个形参,形参的类型需要和消息生产者发送消息的类型保持一致,那么这个方法就会持续监听指定队列的消息,如果监听到消息,就会将消息传递给形参 :::
//将这个类注册到Spring容器中
@Component
public class RabbitMqListener{
//queuesToDeclare属性用于监听指定队列的消息(可以指定多个队列),如果队列不存在会自动创建
@RabbitListener(queuesToDeclare = {@Queue(value = "work.queue")})
public void listen(String msg) throws InterruptedException {
System.out.println("接收到消息:" + msg);
}
}
消息预取限制
:::tips 如果多个消息消费者处理消息的能力不同,但是消息却是会被平均分给每个消息消费者,导致处理速度快的消费者很快完成了消息,而处理速度慢的消费者却还在缓慢地处理消息,这样显然是不合理的
在所有消息消费者中添加以下配置就可以解决这个问题 :::
spring:
rabbitmq:
listener:
simple:
#限制每次只能获取一条消息,处理完后才能获取下一条消息
prefetch: 1