RabbitMQ整合SpringBoot
SpringBoot想整合RabbitMQ那肯定先导入依赖的啦!SpringBoot内部就有RabbitMQ的AMQP协议
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
或者在新建SpirngBoot项目时,勾选上Spring for RabbitMQ的依赖
然后添加配置,我们这里用yml配置,properties同理
spring:
application:
name: SpringBoot-RabbitMQ
rabbitmq:
host: xx.xx.xx.xxx #此处为你的rabbitmq的ip地址
port: 5672
username: admin
password: 123456
virtual-host: /ems
SpringBoot提供了一个模板对象,RabbitTemplate用来简化操作,如同RedisTemplate,使用的话直接在项目中注入即可,接下来我们将之前的五种模型都模拟一下:
1、helloworld模型
测试代码如下:
模拟生产者,但是当没有消费者时,该队列无法创建出来,因为没有意义
package com.zym.springboot_rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入
@Autowired
private RabbitTemplate rabbitTemplate;
//helloworld模型
@Test
public void test(){
//参数1:队列名称 参数2:消息内容
rabbitTemplate.convertAndSend("hello","hello,world");
}
}
消费者:
package com.zym.springboot_rabbitmq;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Cutomer {
@RabbitHandler //代表取出消息的回调方法
public void receivel(String message){
System.out.println("消息为:"+message);
}
}
执行生产者,产生一条消息,观察后台打印语句:
没毛病,已经拿到了,再看管理页面:
也存在对应的队列,并且默认该队列持久化。当然如果想让他不持久呢?自动删除呢?
当然可以做到啦,我们要修改声明队列时的注解:
@RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true",autoDelete = "true"))
先将之前创建好的队列删除,我们再执行一次,观察管理页面:
再次执行过后发现队列为空,说明我们的自动删除生效了。
2、workquene(工作队列)
生产者:
@Test
public void testWorkQuene(){
for (int i = 0 ; i < 10 ; i++){
rabbitTemplate.convertAndSend("work","第"+i+"条hello,workquene");
}
}
消费者(整合springboot后,我们不需要用两个类来代表两个消费者,可以利用@RabbitListener这个注解来进行模拟):
package com.zym.springboot_rabbitmq.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkQueneCutomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void Cutomer1(String message){
System.out.println("消费者1消费了:"+message+"这条消息");
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void Cutomer2(String message){
System.out.println("消费者2消费了:"+message+"这条消息");
}
}
执行一次生产者,查看后台输出与管理页面:
管理页面:
后台输出页面:
如果要实现能者多劳的模式,需要额外的配置:
# 在我们的配置文件中,添加如下配置:
listener:
simple:
prefetch: 1 #每个消费者每次只消费一个
添加完成后我们让消费者1每执行一个睡2s,Thread.sleep(2000);
查看后台:
可以看到,实现了“能者多劳”!
3、fanout(广播模式)
生产者:
@Test
public void testFanOut(){
rabbitTemplate.convertAndSend("logs","","hello,fanout");
}
没有消费者的情况下,执行多少次也不会创建该交换机
消费者:
package com.zym.springboot_rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanOutCutomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不写队列名称表示为临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机与交换机类型
)
})
public void Cutomer1(String message){
System.out.println("消费者1消费了:"+message+"这条消息");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不写队列名称表示为临时队列
exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机与交换机类型
)
})
public void Cutomer2(String message){
System.out.println("消费者2消费了:"+message+"这条消息");
}
}
运行生产者,查看管理页面与后台输出:
管理页面:
后台页面:
4、Routing的Direct模型
生产者:
@Test
public void testRouteDirect(){
rabbitTemplate.convertAndSend("logs_direct","info","hello,info");
rabbitTemplate.convertAndSend("logs_direct","warn","hello,warn");
rabbitTemplate.convertAndSend("logs_direct","error","hello,error");
}
消费者:
package com.zym.springboot_rabbitmq.direct;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectCutomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_direct",type = "direct"),
key = {"info","warn"}
)
})
public void DirectCutomer1(String message){
System.out.println("消费者1消费了"+message+"这条消息");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_direct",type = "direct"),
key = {"error"}
)
})
public void DirectCutomer2(String message){
System.out.println("消费者2消费了"+message+"这条消息");
}
}
我们可以看到,消费者1可以收到RoutingKey为info、warn的消息,消费者2只能收到error的消息
运行生产者,查看管理页面与后台输出:
管理页面:
后台输出:
5、Routing的Topic模型
生产者:
@Test
public void testRouteTopic(){
rabbitTemplate.convertAndSend("logs_topic","user.add.info","hello,user.add.info");
rabbitTemplate.convertAndSend("logs_topic","user.update.info","hello,user.update.info");
rabbitTemplate.convertAndSend("logs_topic","user.add.detail","hello,user.add.info");
rabbitTemplate.convertAndSend("logs_topic","mail.info.detail","hello,user.add.info");
rabbitTemplate.convertAndSend("logs_topic","test.mail.info.detail","hello,user.add.info");
}
消费者:
package com.zym.springboot_rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicCutomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_topic",type = "topic"),
key = "user.add.*"
)
})
public void TopicCutomer1(String message){
System.out.println("消费者1消费了"+message+"这条消息");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_topic",type = "topic"),
key = "user.*.*"
)
})
public void TopicCutomer2(String message){
System.out.println("消费者2消费了"+message+"这条消息");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_topic",type = "topic"),
key = "*.*.detail"
)
})
public void TopicCutomer3(String message){
System.out.println("消费者3消费了"+message+"这条消息");
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs_topic",type = "topic"),
key = "#.detail"
)
})
public void TopicCutomer4(String message){
System.out.println("消费者4消费了"+message+"这条消息");
}
}
消费者1:user.add.*
消费者2:user.*.*
消费者3:*.*.detail
消费者4:#.detail
管理页面:
后台输出:
Demo的码云地址:https://gitee.com/zym213/SpringBoot_RabbitMQ_Demo.git