image.png

简单模式

pom引入

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>junit</groupId>
  8. <artifactId>junit</artifactId>
  9. <version>4.12</version>
  10. <scope>test</scope>
  11. </dependency>
  12. </dependencies>

application.yml

  1. server:
  2. port: 8080
  3. spring:
  4. rabbitmq:
  5. host: 192.168.135.141
  6. port: 5672
  7. virtual-host: /
  8. username: guest
  9. password: guest
  10. # 消息发送失败回调
  11. # publisher-returns: true
  12. # # 消息发送确认到交互机,选择交互模式
  13. # publisher-confirm-type: correlated
  14. listener:
  15. simple:
  16. # 消息传递(ack是否手动确认,none:不确认,auto:自动确认,manual:手动确认)
  17. acknowledge-mode: manual
  18. # # 接收的消费队列数(并发)
  19. # concurrency: 5
  20. # # 最大并发数
  21. # max-concurrency: 10

代码讲解

第一部分

提供者

  1. ConnectionFactory factory = new ConnectionFactory();
  2. // 主机地址;默认为 localhost
  3. factory.setHost("192.168.135.143");
  4. // 虚拟主机名称;默认为 /
  5. factory.setVirtualHost("/itcast");
  6. // 连接用户名;默认为guest
  7. factory.setUsername("hikktn");
  8. // 连接密码;默认为guest
  9. factory.setPassword("hikktn");

springboot使用的是配置文件,读取application.yml文件
代码也不需要程序员编写。

第二部分

  1. /*
  2. name : 队列名称
  3. durable : 持久化
  4. exclusive : 是否自动连接
  5. autoDelete : 是否自动清除空闲队列
  6. arguments : 队列的其他配置
  7. */
  8. Channel channel = channel.queueDeclare(QUEUE_NAME, true, false, false, null);

和下面的代码一样

  1. @Bean("simple_queue")
  2. public Queue createCommonQueue() {
  3. /*
  4. name : 队列名称
  5. durable : 持久化
  6. exclusive : 是否自动连接
  7. autoDelete : 是否自动清除空闲队列
  8. arguments : 队列的其他配置
  9. */
  10. return new Queue(SIMPLE_QUEUE, true, false, false, null);
  11. }

第三部分

  1. String message = "你好,rabbitmq!";
  2. /**
  3. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  4. * 参数2:路由key,简单模式可以传递队列名称
  5. * 参数3:消息其它属性
  6. * 参数4:消息内容
  7. */
  8. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

和下面的代码一样
rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE,”simple模式,发送了消息”);

第四部分

消费者

  1. ConnectionFactory factory = new ConnectionFactory();
  2. // 主机地址;默认为 localhost
  3. factory.setHost("192.168.135.143");
  4. // 虚拟主机名称;默认为 /
  5. factory.setVirtualHost("/itcast");
  6. // 连接用户名;默认为guest
  7. factory.setUsername("hikktn");
  8. // 连接密码;默认为guest
  9. factory.setPassword("hikktn");

springboot使用的是配置文件,读取application.yml文件
代码也不需要程序员编写。

第五部分

  1. DefaultConsumer consumer = new DefaultConsumer(channel){
  2. @Override
  3. /**
  4. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  5. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  6. * properties 属性信息
  7. * body 消息
  8. */
  9. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  10. //路由key
  11. System.out.println("路由key为:" + envelope.getRoutingKey());
  12. //交换机
  13. System.out.println("交换机为:" + envelope.getExchange());
  14. //消息id
  15. System.out.println("消息id为:" + envelope.getDeliveryTag());
  16. //收到的消息
  17. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  18. }
  19. };
  20. //监听消息
  21. /**
  22. * 参数1:队列名称
  23. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  24. * 参数3:消息接收到后回调
  25. */
  26. channel.basicConsume(QUEUE_NAME, true, consumer);

和下面的代码一样,注意,这里测试后发生格式编码错误org.springframework.amqp.AmqpException: No method found for class ,懒得弄,就用最简单的方式解决。

  1. @Component
  2. @RabbitListener(queues = "simple_queue")
  3. public class SimpleListener {
  4. @RabbitHandler
  5. public void receive(Message message, Channel channel) {
  6. String messageRec = new String(message.getBody());
  7. System.out.println("simple模式接收到了消息:"+messageRec);
  8. try {
  9. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  10. } catch (IOException e) {
  11. System.out.println("报错了------------------"+e.getMessage());
  12. }
  13. }
  14. }

以上就是大致的代码逻辑,后续安装我总结的图片,就可以清晰的编写其他模式的代码。

简单模式代码实现

配置类

  1. package com.hikktn.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName RabbitMQConfig
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 14:49
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class RabbitMQConfig {
  15. // 普通模式
  16. public static final String SIMPLE_QUEUE = "simple_queue";
  17. // 简单模式
  18. @Bean("simple_queue")
  19. public Queue createCommonQueue() {
  20. /*
  21. name : 队列名称
  22. durable : 持久化
  23. exclusive : 是否自动连接
  24. autoDelete : 是否自动清除空闲队列
  25. arguments : 队列的其他配置
  26. */
  27. return new Queue(SIMPLE_QUEUE, true, false, false, null);
  28. }
  29. }

消费者

  1. package com.hikktn.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. * @ClassName SimpleListener
  9. * @Description TODO
  10. * @Author lisonglin
  11. * @Date 2021/4/9 21:20
  12. * @Version 1.0
  13. */
  14. @Component
  15. public class SimpleListener {
  16. @RabbitListener(queues = "simple_queue")
  17. public void receive(Message message, Channel channel) {
  18. String messageRec = new String(message.getBody());
  19. System.out.println("simple模式接收到了消息:"+messageRec);
  20. try {
  21. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22. } catch (IOException e) {
  23. System.out.println("报错了------------------"+e.getMessage());
  24. }
  25. }
  26. }

生产者

  1. package com.hikktn.producer;
  2. import com.hikktn.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName Sender
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 16:54
  11. * @Version 1.0
  12. */
  13. @Component
  14. public class Sender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * 简单模式 (生产者)
  19. * @param msg
  20. */
  21. public void sendSimpleTest(String msg) {
  22. rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE, msg);
  23. }
  24. }

控制器

  1. package com.hikktn.controller;
  2. import com.hikktn.producer.Sender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @ClassName ConsumerController
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 19:27
  11. * @Version 1.0
  12. */
  13. @RestController
  14. public class ConsumerController {
  15. @Autowired
  16. private Sender sender;
  17. @GetMapping("/getSimple")
  18. public String sendSimple(){
  19. sender.sendSimpleTest("simple模式,发送了消息");
  20. return "ok";
  21. }
  22. }

测试
image.png
image.png

工作模式

配置类

  1. package com.hikktn.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName RabbitMQConfig
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 14:49
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class RabbitMQConfig {
  15. // 工作模式
  16. public static final String WORK_QUEUE_NAME = "work_queue";
  17. // 工作模式
  18. @Bean("work_queue")
  19. public Queue createWorkQueue() {
  20. return new Queue(WORK_QUEUE_NAME, true, false, false, null);
  21. }
  22. }

生产者

  1. package com.hikktn.producer;
  2. import com.hikktn.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName Sender
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 16:54
  11. * @Version 1.0
  12. */
  13. @Component
  14. public class Sender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * 生产者 (工作模式)
  19. * @param msg
  20. */
  21. public void sendWorkTest(String msg) {
  22. for (int i = 0; i < 10; i++) {
  23. rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_QUEUE_NAME, msg + i);
  24. }
  25. }
  26. }

消费者1

  1. package com.hikktn.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. * @ClassName WorkListener
  9. * @Description TODO
  10. * @Author lisonglin
  11. * @Date 2021/4/9 16:13
  12. * @Version 1.0
  13. */
  14. @Component
  15. public class WorkListener1 {
  16. @RabbitListener(queues = "work_queue")
  17. public void receive(Message message, Channel channel,String msg) throws IOException {
  18. // System.out.println("收到了"+msg);
  19. channel.basicQos(1);
  20. String messageRec = new String(message.getBody());
  21. System.out.println("work模式1接收到了消息:"+messageRec);
  22. try {
  23. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  24. } catch (IOException e) {
  25. System.out.println("报错了------------------"+e.getMessage());
  26. }
  27. }
  28. }

消费者2

  1. package com.hikktn.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. /**
  8. * @ClassName WorkListener
  9. * @Description TODO
  10. * @Author lisonglin
  11. * @Date 2021/4/9 16:13
  12. * @Version 1.0
  13. */
  14. @Component
  15. public class WorkListener2 {
  16. @RabbitListener(queues = "work_queue")
  17. public void receive(Message message, Channel channel) throws IOException {
  18. channel.basicQos(1);
  19. String messageRec = new String(message.getBody());
  20. System.out.println("work模式2接收到了消息:"+messageRec);
  21. try {
  22. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  23. } catch (IOException e) {
  24. System.out.println("报错了------------------"+e.getMessage());
  25. }
  26. }
  27. }

控制器

  1. package com.hikktn.controller;
  2. import com.hikktn.producer.Sender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @ClassName ConsumerController
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 19:27
  11. * @Version 1.0
  12. */
  13. @RestController
  14. public class ConsumerController {
  15. @Autowired
  16. private Sender sender;
  17. @GetMapping("/getWork")
  18. public String sendWork(){
  19. sender.sendWorkTest("work模式,发送了消息");
  20. return "ok";
  21. }
  22. }

测试
image.png
image.png

订阅模式

配置类

  1. package com.hikktn.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName RabbitMQConfig
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 14:49
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class RabbitMQConfig {
  15. // 订阅模式
  16. public static final String FANOUT_QUEUE_NAME = "publish_fanout_queue_1";
  17. public static final String FANOUT_QUEUE_NAME1 = "publish_fanout_queue_2";
  18. public static final String TEST_FANOUT_EXCHANGE = "fount_exchange";
  19. // 订阅模式
  20. @Bean("publish_fanout_queue_1")
  21. public Queue createFountQueue1() {
  22. return new Queue(FANOUT_QUEUE_NAME, true, false, false, null);
  23. }
  24. @Bean("publish_fanout_queue_2")
  25. public Queue createFountQueue2() {
  26. return new Queue(FANOUT_QUEUE_NAME1, true, false, false, null);
  27. }
  28. // 声明交换机
  29. @Bean("fount_exchange")
  30. public FanoutExchange createFanoutExchange() {
  31. return ExchangeBuilder.fanoutExchange(TEST_FANOUT_EXCHANGE).durable(true).build();
  32. }
  33. //队列与交换机进行绑定
  34. @Bean
  35. public Binding bindingQueueAndFanoutExchange1(@Qualifier("publish_fanout_queue_1") Queue queue, @Qualifier("fount_exchange") FanoutExchange exchange) {
  36. return BindingBuilder.bind(queue).to(exchange);
  37. }
  38. //队列与交换机进行绑定
  39. @Bean
  40. public Binding bindingQueueAndFanoutExchange2(@Qualifier("publish_fanout_queue_2") Queue queue, @Qualifier("fount_exchange") FanoutExchange exchange) {
  41. return BindingBuilder.bind(queue).to(exchange);
  42. }
  43. }

生产者

  1. package com.hikktn.producer;
  2. import com.hikktn.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName Sender
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 16:54
  11. * @Version 1.0
  12. */
  13. @Component
  14. public class Sender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * 生产者1 (订阅模式)
  19. * @param msg
  20. */
  21. public void sendFanoutTest1(String msg) {
  22. for (int i = 0; i < 5; i++) {
  23. rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_FANOUT_EXCHANGE,"", msg);
  24. }
  25. }
  26. /**
  27. * 生产者2 (订阅模式)
  28. * @param msg
  29. */
  30. public void sendFanoutTest2(String msg) {
  31. for (int i = 0; i < 5; i++) {
  32. rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_FANOUT_EXCHANGE,"", msg);
  33. }
  34. }
  35. }

消费者1

  1. package com.hikktn.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @ClassName FanoutListener1
  6. * @Description TODO
  7. * @Author lisonglin
  8. * @Date 2021/4/9 21:23
  9. * @Version 1.0
  10. */
  11. @Component
  12. public class FanoutListener1 {
  13. @RabbitListener(queues = "publish_fanout_queue_1")
  14. public void receive(String message){
  15. System.out.println("消费者1接收到的消息为:" + message);
  16. }
  17. }

消费者2

  1. package com.hikktn.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @ClassName FanoutListener2
  6. * @Description TODO
  7. * @Author lisonglin
  8. * @Date 2021/4/9 21:28
  9. * @Version 1.0
  10. */
  11. @Component
  12. public class FanoutListener2 {
  13. @RabbitListener(queues = "publish_fanout_queue_2")
  14. public void receive(String message){
  15. System.out.println("消费者1接收到的消息为:" + message);
  16. }
  17. }

控制器

  1. package com.hikktn.controller;
  2. import com.hikktn.producer.Sender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @ClassName ConsumerController
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 19:27
  11. * @Version 1.0
  12. */
  13. @RestController
  14. public class ConsumerController {
  15. @Autowired
  16. private Sender sender;
  17. @GetMapping("/getFount")
  18. public String sendFount(){
  19. sender.sendFanoutTest1("fount模式1,发送了消息");
  20. sender.sendFanoutTest2("fount模式2,发送了消息");
  21. return "ok";
  22. }
  23. }

测试
image.png
image.png

路由模式

配置类

  1. package com.hikktn.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName RabbitMQConfig
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 14:49
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class RabbitMQConfig {
  15. // 路由模式
  16. public static final String DIRECT_QUEUE_NAME_1 = "routing_direct_queue_1";
  17. public static final String DIRECT_QUEUE_NAME_2 = "routing_direct_queue_2";
  18. public static final String TEST_DIRECT_EXCHANGE = "direct_exchange";
  19. private static final String DIRECT_ROUTING_KEY_INSERT = "insert";
  20. private static final String DIRECT_ROUTING_KEY_UPDATE = "update";
  21. // 路由模式
  22. @Bean("routing_direct_queue_1")
  23. public Queue createRoutingQueue1() {
  24. return new Queue(DIRECT_QUEUE_NAME_1, true, false, false, null);
  25. }
  26. @Bean("routing_direct_queue_2")
  27. public Queue createRoutingQueue2() {
  28. return new Queue(DIRECT_QUEUE_NAME_2, true, false, false, null);
  29. }
  30. // 声明交换机
  31. @Bean("direct_exchange")
  32. public DirectExchange createDirectExchange() {
  33. return ExchangeBuilder.directExchange(TEST_DIRECT_EXCHANGE).durable(true).build();
  34. }
  35. //队列与交换机进行绑定
  36. @Bean
  37. public Binding bindingQueueAndDirectExchange1(@Qualifier("routing_direct_queue_1") Queue queue, @Qualifier("direct_exchange") DirectExchange exchange) {
  38. return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY_INSERT);
  39. }
  40. @Bean
  41. public Binding bindingQueueAndDirectExchange2(@Qualifier("routing_direct_queue_2") Queue queue, @Qualifier("direct_exchange") DirectExchange exchange) {
  42. return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING_KEY_UPDATE);
  43. }
  44. }

生产者

  1. package com.hikktn.producer;
  2. import com.hikktn.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName Sender
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 16:54
  11. * @Version 1.0
  12. */
  13. @Component
  14. public class Sender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * 生产者1 (路由模式)
  19. * @param routingKey
  20. * @param msg
  21. */
  22. public void sendDirectTest1(String routingKey,String msg) {
  23. rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_DIRECT_EXCHANGE,routingKey, msg);
  24. }
  25. /**
  26. * 生产者2 (路由模式)
  27. * @param routingKey
  28. * @param msg
  29. */
  30. public void sendDirectTest2(String routingKey,String msg) {
  31. rabbitTemplate.convertAndSend(RabbitMQConfig.TEST_DIRECT_EXCHANGE,routingKey, msg);
  32. }
  33. }

消费者1

  1. package com.hikktn.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @ClassName DirectListener1
  6. * @Description TODO
  7. * @Author lisonglin
  8. * @Date 2021/4/9 21:55
  9. * @Version 1.0
  10. */
  11. @Component
  12. public class DirectListener1 {
  13. @RabbitListener(queues = "routing_direct_queue_1")
  14. public void receive(String message){
  15. System.out.println("消费者1接收到的消息为:" + message);
  16. }
  17. }

消费者2

  1. package com.hikktn.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @ClassName DirectListener1
  6. * @Description TODO
  7. * @Author lisonglin
  8. * @Date 2021/4/9 21:55
  9. * @Version 1.0
  10. */
  11. @Component
  12. public class DirectListener2 {
  13. @RabbitListener(queues = "routing_direct_queue_2")
  14. public void receive(String message){
  15. System.out.println("消费者1接收到的消息为:" + message);
  16. }
  17. }

控制器

  1. package com.hikktn.controller;
  2. import com.hikktn.producer.Sender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @ClassName ConsumerController
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 19:27
  11. * @Version 1.0
  12. */
  13. @RestController
  14. public class ConsumerController {
  15. @Autowired
  16. private Sender sender;
  17. @GetMapping("/getDirect")
  18. public String sendDirect(){
  19. sender.sendDirectTest1("insert","direct模式1,发送了消息");
  20. sender.sendDirectTest2("update","direct模式2,发送了消息");
  21. return "ok";
  22. }
  23. }

测试
image.png
image.png

通配符模式

配置类

  1. package com.hikktn.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClassName RabbitMQConfig
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 14:49
  11. * @Version 1.0
  12. */
  13. @Configuration
  14. public class RabbitMQConfig {
  15. // 通配符模式
  16. public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
  17. private static final String TOPIC_QUEUE_NAME_1 = "topic_queue_1";
  18. private static final String TOPIC_QUEUE_NAME_2 = "topic_queue_2";
  19. private static final String TOPIC_ROUTING_KEY_EMAIL = "*@qq.com";
  20. private static final String TOPIC_ROUTING_KEY_SMS = "sms.#";
  21. // 通配符模式
  22. @Bean("topic_queue_1")
  23. public Queue createTopicQueue1() {
  24. return QueueBuilder.durable(TOPIC_QUEUE_NAME_1).build();
  25. }
  26. @Bean("topic_queue_2")
  27. public Queue createTopicQueue2() {
  28. return QueueBuilder.durable(TOPIC_QUEUE_NAME_2).build();
  29. }
  30. // 声明交换机
  31. @Bean("topic_exchange")
  32. public Exchange createTopicExchange() {
  33. return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
  34. }
  35. // 绑定队列和交换机
  36. @Bean
  37. public Binding bindingQueueAndTopicExchange1(@Qualifier("topic_queue_1") Queue queue, @Qualifier("topic_exchange") Exchange exchange) {
  38. return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING_KEY_EMAIL).noargs();
  39. }
  40. @Bean
  41. public Binding bindingQueueAndTopicExchange2(@Qualifier("topic_queue_2") Queue queue, @Qualifier("topic_exchange") Exchange exchange) {
  42. return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING_KEY_SMS).noargs();
  43. }
  44. }

生产者

  1. package com.hikktn.producer;
  2. import com.hikktn.config.RabbitMQConfig;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName Sender
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 16:54
  11. * @Version 1.0
  12. */
  13. @Component
  14. public class Sender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. /**
  18. * 生产者1 (通配符模式)
  19. * @param routingKey
  20. * @param msg
  21. */
  22. public void sendTopicTest1(String routingKey,String msg) {
  23. rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,routingKey,msg);
  24. }
  25. /**
  26. * 生产者2 (通配符模式)
  27. * @param routingKey
  28. * @param msg
  29. */
  30. public void sendTopicTest2(String routingKey,String msg) {
  31. rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,routingKey,msg);
  32. }
  33. }

消费者

  1. package com.hikktn.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. /**
  9. * @ClassName MyListener
  10. * @Description TODO
  11. * @Author lisonglin
  12. * @Date 2021/4/9 14:51
  13. * @Version 1.0
  14. */
  15. @Component
  16. public class TopicListener {
  17. /**
  18. * 监听某个队列的消息
  19. *
  20. * @param message 接收到的消息
  21. */
  22. @RabbitListener(queues = "topic_queue_1")
  23. public void receive1(Message message, Channel channel, String msg) {
  24. System.out.println("消费者1接收到的消息为:" + msg);
  25. String messageRec = new String(message.getBody());
  26. System.out.println("消费者1接收到的消息为:" + messageRec);
  27. try {
  28. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  29. } catch (IOException e) {
  30. System.out.println("报错了------------------"+e.getMessage());
  31. }
  32. }
  33. @RabbitListener(queues = "topic_queue_2")
  34. public void receive2(Message message, Channel channel) {
  35. String messageRec = new String(message.getBody());
  36. System.out.println("消费者1接收到的消息为:" + messageRec);
  37. try {
  38. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  39. } catch (IOException e) {
  40. System.out.println("报错了------------------" + e.getMessage());
  41. }
  42. }
  43. }

控制器

  1. package com.hikktn.controller;
  2. import com.hikktn.producer.Sender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @ClassName ConsumerController
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/9 19:27
  11. * @Version 1.0
  12. */
  13. @RestController
  14. public class ConsumerController {
  15. @Autowired
  16. private Sender sender;
  17. @GetMapping("/getTopic")
  18. public String sendTopic(){
  19. sender.sendTopicTest1("1213457107@qq.com","topic模式1,发送给1213457107@qq.com消息");
  20. sender.sendTopicTest2("sms.@qq.com","topic模式2,发送给sms.@qq.com消息");
  21. return "ok";
  22. }
  23. }

测试
image.png
image.png