RabbitMQ整合SpringBoot

SpringBoot想整合RabbitMQ那肯定先导入依赖的啦!SpringBoot内部就有RabbitMQ的AMQP协议

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

或者在新建SpirngBoot项目时,勾选上Spring for RabbitMQ的依赖

然后添加配置,我们这里用yml配置,properties同理

  1. spring:
  2. application:
  3. name: SpringBoot-RabbitMQ
  4. rabbitmq:
  5. host: xx.xx.xx.xxx #此处为你的rabbitmq的ip地址
  6. port: 5672
  7. username: admin
  8. password: 123456
  9. virtual-host: /ems

SpringBoot提供了一个模板对象,RabbitTemplate用来简化操作,如同RedisTemplate,使用的话直接在项目中注入即可,接下来我们将之前的五种模型都模拟一下:

1、helloworld模型

测试代码如下:

模拟生产者,但是当没有消费者时,该队列无法创建出来,因为没有意义

  1. package com.zym.springboot_rabbitmq;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import org.springframework.test.context.junit4.SpringRunner;
  8. @SpringBootTest(classes = SpringbootRabbitmqApplication.class)
  9. @RunWith(SpringRunner.class)
  10. public class TestRabbitMQ {
  11. //注入
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. //helloworld模型
  15. @Test
  16. public void test(){
  17. //参数1:队列名称 参数2:消息内容
  18. rabbitTemplate.convertAndSend("hello","hello,world");
  19. }
  20. }

消费者:

  1. package com.zym.springboot_rabbitmq;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @RabbitListener(queuesToDeclare = @Queue("hello"))
  8. public class Cutomer {
  9. @RabbitHandler //代表取出消息的回调方法
  10. public void receivel(String message){
  11. System.out.println("消息为:"+message);
  12. }
  13. }

执行生产者,产生一条消息,观察后台打印语句:
1.png

没毛病,已经拿到了,再看管理页面:
2.png

也存在对应的队列,并且默认该队列持久化。当然如果想让他不持久呢?自动删除呢?

当然可以做到啦,我们要修改声明队列时的注解:

  1. @RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true",autoDelete = "true"))

先将之前创建好的队列删除,我们再执行一次,观察管理页面:
3.png

再次执行过后发现队列为空,说明我们的自动删除生效了。

2、workquene(工作队列)

生产者:

  1. @Test
  2. public void testWorkQuene(){
  3. for (int i = 0 ; i < 10 ; i++){
  4. rabbitTemplate.convertAndSend("work","第"+i+"条hello,workquene");
  5. }
  6. }

消费者(整合springboot后,我们不需要用两个类来代表两个消费者,可以利用@RabbitListener这个注解来进行模拟):

  1. package com.zym.springboot_rabbitmq.work;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class WorkQueneCutomer {
  7. @RabbitListener(queuesToDeclare = @Queue("work"))
  8. public void Cutomer1(String message){
  9. System.out.println("消费者1消费了:"+message+"这条消息");
  10. }
  11. @RabbitListener(queuesToDeclare = @Queue("work"))
  12. public void Cutomer2(String message){
  13. System.out.println("消费者2消费了:"+message+"这条消息");
  14. }
  15. }

执行一次生产者,查看后台输出与管理页面:

管理页面:
4.png

后台输出页面:
5.png

如果要实现能者多劳的模式,需要额外的配置:

  1. # 在我们的配置文件中,添加如下配置:
  2. listener:
  3. simple:
  4. prefetch: 1 #每个消费者每次只消费一个

添加完成后我们让消费者1每执行一个睡2s,Thread.sleep(2000);

查看后台:
6.png

可以看到,实现了“能者多劳”!

3、fanout(广播模式)

生产者:

  1. @Test
  2. public void testFanOut(){
  3. rabbitTemplate.convertAndSend("logs","","hello,fanout");
  4. }

没有消费者的情况下,执行多少次也不会创建该交换机

消费者:

  1. package com.zym.springboot_rabbitmq.fanout;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class FanOutCutomer {
  9. @RabbitListener(bindings = {
  10. @QueueBinding(
  11. value = @Queue, //不写队列名称表示为临时队列
  12. exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机与交换机类型
  13. )
  14. })
  15. public void Cutomer1(String message){
  16. System.out.println("消费者1消费了:"+message+"这条消息");
  17. }
  18. @RabbitListener(bindings = {
  19. @QueueBinding(
  20. value = @Queue, //不写队列名称表示为临时队列
  21. exchange = @Exchange(value = "logs",type = "fanout") //绑定的交换机与交换机类型
  22. )
  23. })
  24. public void Cutomer2(String message){
  25. System.out.println("消费者2消费了:"+message+"这条消息");
  26. }
  27. }

运行生产者,查看管理页面与后台输出:

管理页面:
7.png

后台页面:
8.png

4、Routing的Direct模型

生产者:

  1. @Test
  2. public void testRouteDirect(){
  3. rabbitTemplate.convertAndSend("logs_direct","info","hello,info");
  4. rabbitTemplate.convertAndSend("logs_direct","warn","hello,warn");
  5. rabbitTemplate.convertAndSend("logs_direct","error","hello,error");
  6. }

消费者:

  1. package com.zym.springboot_rabbitmq.direct;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class DirectCutomer {
  9. @RabbitListener(bindings = {
  10. @QueueBinding(
  11. value = @Queue,
  12. exchange = @Exchange(value = "logs_direct",type = "direct"),
  13. key = {"info","warn"}
  14. )
  15. })
  16. public void DirectCutomer1(String message){
  17. System.out.println("消费者1消费了"+message+"这条消息");
  18. }
  19. @RabbitListener(bindings = {
  20. @QueueBinding(
  21. value = @Queue,
  22. exchange = @Exchange(value = "logs_direct",type = "direct"),
  23. key = {"error"}
  24. )
  25. })
  26. public void DirectCutomer2(String message){
  27. System.out.println("消费者2消费了"+message+"这条消息");
  28. }
  29. }

我们可以看到,消费者1可以收到RoutingKey为info、warn的消息,消费者2只能收到error的消息

运行生产者,查看管理页面与后台输出:

管理页面:
9.png

后台输出:
10.png

5、Routing的Topic模型

生产者:

  1. @Test
  2. public void testRouteTopic(){
  3. rabbitTemplate.convertAndSend("logs_topic","user.add.info","hello,user.add.info");
  4. rabbitTemplate.convertAndSend("logs_topic","user.update.info","hello,user.update.info");
  5. rabbitTemplate.convertAndSend("logs_topic","user.add.detail","hello,user.add.info");
  6. rabbitTemplate.convertAndSend("logs_topic","mail.info.detail","hello,user.add.info");
  7. rabbitTemplate.convertAndSend("logs_topic","test.mail.info.detail","hello,user.add.info");
  8. }

消费者:

  1. package com.zym.springboot_rabbitmq.topic;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class TopicCutomer {
  9. @RabbitListener(bindings = {
  10. @QueueBinding(
  11. value = @Queue,
  12. exchange = @Exchange(value = "logs_topic",type = "topic"),
  13. key = "user.add.*"
  14. )
  15. })
  16. public void TopicCutomer1(String message){
  17. System.out.println("消费者1消费了"+message+"这条消息");
  18. }
  19. @RabbitListener(bindings = {
  20. @QueueBinding(
  21. value = @Queue,
  22. exchange = @Exchange(value = "logs_topic",type = "topic"),
  23. key = "user.*.*"
  24. )
  25. })
  26. public void TopicCutomer2(String message){
  27. System.out.println("消费者2消费了"+message+"这条消息");
  28. }
  29. @RabbitListener(bindings = {
  30. @QueueBinding(
  31. value = @Queue,
  32. exchange = @Exchange(value = "logs_topic",type = "topic"),
  33. key = "*.*.detail"
  34. )
  35. })
  36. public void TopicCutomer3(String message){
  37. System.out.println("消费者3消费了"+message+"这条消息");
  38. }
  39. @RabbitListener(bindings = {
  40. @QueueBinding(
  41. value = @Queue,
  42. exchange = @Exchange(value = "logs_topic",type = "topic"),
  43. key = "#.detail"
  44. )
  45. })
  46. public void TopicCutomer4(String message){
  47. System.out.println("消费者4消费了"+message+"这条消息");
  48. }
  49. }
  1. 消费者1user.add.*
  2. 消费者2user.*.*
  3. 消费者3:*.*.detail
  4. 消费者4:#.detail

管理页面:
11.png

后台输出:
12.png
Demo的码云地址:https://gitee.com/zym213/SpringBoot_RabbitMQ_Demo.git