• RabbitMQ
  • ActiveMQ
  • RocketMQ(阿里开源)
  • Kafka
  • TubeMQ(腾讯开源)

RabboitMQ

搭建RabbitMQ服务器

  1. 克隆docker-bash:rabbitmq
  2. 设置ip
  1. ./ip-static
  2. ip:192.168.64.140
  1. 下载rabbitmq镜像
  1. docker pull rabbitmq:management
  2. #或者从 code 下载 rabbit-image.gz
  3. #上传到服务器,然后执行镜像导入
  4. docker load -i rabbit-image.gz
  1. 准备配置文件,配置管理员用户名和密码

    • 关闭防火墙
  1. systemctl stop firewalld
  2. systemctl disable firewalld
  • 重启docker服务
  1. systemctl restart docker
  • 添加配置文件
  1. mkdir /etc/rabbitmq
  2. vim /etc/rabbitmq/rabbitmq.conf
  • 新增管理员用户名密码配置
  1. default_user = admin
  2. default_pass = admin
  1. 启动rabbitmq服务器
  1. docker run -d --name rabbit \
  2. -p 5672:5672 \
  3. -p 15672:15672 \
  4. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
  5. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
  6. rabbitmq:management
  1. 访问控制台[http://192.168.64.140:15672](http://192.168.64.140:15672),用户名密码为admin

简单消费者和生产者简单模式

简单模式中只有一个生产者和一个消费者,生产者发送消息,消费者接收消息

创建连接

  1. package m1;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @Author: 一拳超人
  7. * @Date: 2021/10/21 17:16
  8. */
  9. public class Connection {
  10. public static com.rabbitmq.client.Connection connectionInfo() throws IOException, TimeoutException {
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setHost("192.168.64.140");
  13. connectionFactory.setPort(5672);
  14. connectionFactory.setUsername("admin");
  15. connectionFactory.setPassword("admin");
  16. return connectionFactory.newConnection();
  17. }
  18. }

创建生产者

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * RabbitMQ生产者
  7. *
  8. * @Author: 一拳超人
  9. * @Date: 2021/10/21 16:46
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. //创建连接
  14. com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
  15. //通信通道
  16. Channel channel = connection.createChannel();
  17. /*
  18. 2.在服务器中创建队列hello world
  19. 参数说明:
  20. 1.队列名
  21. 2.是否为持久队列
  22. 3.是否为排他(独占)队列(不会被多个消费者独占)
  23. 4.是否自动删除
  24. 5.队列得其他参数属性
  25. */
  26. channel.queueDeclare("HelloWorld", false, false, false, null);
  27. /*
  28. 3.向hello word队列发送消息
  29. 参数说明:
  30. 1.交换机,空串表示默认交换机
  31. 2.队列名
  32. 3.消息属性
  33. 4.消息正文
  34. */
  35. channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
  36. channel.close();
  37. }
  38. }

创建生产者

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * RabbitMQ生产者
  7. *
  8. * @Author: 一拳超人
  9. * @Date: 2021/10/21 16:46
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. //创建连接
  14. com.rabbitmq.client.Connection connection = m1.Connection.connectionInfo();
  15. //通信通道
  16. Channel channel = connection.createChannel();
  17. /*
  18. 2.在服务器中创建队列hello world
  19. 参数说明:
  20. 1.队列名
  21. 2.是否为持久队列
  22. 3.是否为排他(独占)队列(不会被多个消费者独占)
  23. 4.是否自动删除
  24. 5.队列得其他参数属性
  25. */
  26. channel.queueDeclare("HelloWorld", false, false, false, null);
  27. /*
  28. 3.向hello word队列发送消息
  29. 参数说明:
  30. 1.交换机,空串表示默认交换机
  31. 2.队列名
  32. 3.消息属性
  33. 4.消息正文
  34. */
  35. channel.basicPublish("", "HelloWorld", null, "万年的马钊".getBytes());
  36. channel.close();
  37. }
  38. }

工作模式

工作模式由一个生产者多个消费者构成,一个生产者发送消息,多个消费者之间轮询接受消息,且每个消费者获取的消息唯一,多个消费者之间共用一个队列.

合理收发消息

合理收发消息,需要让服务器知道消费者是否处理完消息,

ACK

使用ACK(Ackonw led)发送消息回执,用以向服务器发送通知,告知服务器一条消息已经处理完毕,服务器可以通过ack知道消息是否处理完毕,消费者使用basicConsume方法接收消息时,第二个参数为是否手动ack,true为自动确认,false为手动确认.

手动确认需要在回调对象中设置消息回执,使用basicAck(DeliveryTag,multiple)方法.

方法参数:

  • DeliveryTag:long类型,回执
  • mutiple:布尔类型,表示是否确认全部消息

QOS

qos可以设置每次抓取消息的数量,在抓取的消息处理完成前不会在抓取消息,在手动ack模式下才有效.

设置方式channel.basicQos(1);参数:prefetchCount – 服务器将传递的最大消息数,如果没有限制则为 0.

  1. package m2;
  2. import com.rabbitmq.client.CancelCallback;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.DeliverCallback;
  5. import m1.Connection;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * @Author: 一拳超人
  10. * @Date: 2021/10/22 9:37
  11. */
  12. public class Consumer {
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. //创建连接和通信通道
  15. Channel channel = Connection.connectionInfo().createChannel();
  16. //创建队列
  17. channel.queueDeclare("HelloWorld", false, false, false, null);
  18. //回调对象
  19. DeliverCallback deliverCallback = (consumer, message) -> {
  20. String s = new String(message.getBody());
  21. System.out.println("收到" + s);
  22. //处理消息,遍历字符串,遇到"."则暂停一秒,模拟耗时消息
  23. for (int i = 0; i < s.length(); i++) {
  24. if ('.' == s.charAt(i)) {
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. System.out.println("消息处理完成");
  33. /*
  34. 处理完消息手动发送回执
  35. 参数说明:
  36. 1.DeliveryTag:long类型,回执标签
  37. 2.multiple:布尔类型,是否确认之前的收到的所有消息
  38. */
  39. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  40. };
  41. CancelCallback cancelCallback = consumerTag -> {
  42. };
  43. channel.basicQos(1);
  44. /*
  45. 消费消息
  46. 参数:
  47. 1.队列名
  48. 2.autoAck,true为自动确认,false为手动确认
  49. */
  50. channel.basicConsume("HelloWorld", false, deliverCallback, cancelCallback);
  51. }
  52. }

消息持久化

消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在。消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义。

  1. 队列持久化
  2. 消息数据持久化

设置持久化队列

channel.queueDeclare("HelloWorld", true, false, false, null);

参数:

  1. 队列名
  2. 是否为持久化队列
  3. 是否为排他队列
  4. 是否自动删除(服务器将在队列不使用时删除)

将一个队列设置为持久队列只需要将第二个参数设置为true

消息持久化

channel.basicPublish("", "HelloWorld", MessageProperties.PERSISTENT_BASIC, s.getBytes(StandardCharsets.UTF_8));

其中MessageProperties.PERSISTENT_BASIC为持久化参数

发布订阅模式

将一条消息发送给所有消费者,同一条消息所有的消费者都可以收到.

交换机

  1. Direct 直连
  2. Fanout 扇出,扇形
  3. Topic 主题

Bus配置刷新

  1. 修改2,3,4,9添加依赖
  2. Bus
  3. Rabbitmq
  4. binder-rabbit
  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-bus</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.cloud</groupId>
  11. <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
  12. </dependency>
  1. 09添加依赖actuator
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-actuator</artifactId>
  4. </dependency>
  1. 修改09的yml配置
  2. 暴漏bus-refresh:
  3. rabbitmq连接配置
  1. server:
  2. port: 6001
  3. spring:
  4. application:
  5. name: config-server
  6. cloud:
  7. config:
  8. server:
  9. git:
  10. uri: https://gitee.com/G318705208/microservice-learning
  11. search-paths: /SpringCloud01/config
  12. username: gqc318705208@163.com
  13. password: Gqc19981120.
  14. rabbitmq:
  15. host: 192.168.64.140
  16. port: 5672
  17. username: admin
  18. password: admin
  19. eureka:
  20. client:
  21. service-url:
  22. defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
  23. management:
  24. endpoints:
  25. web:
  26. exposure:
  27. include: bus-refresh
  1. 修改2,3,4的yml配置,修改config目录的三个文件并提交
  2. rabbitmq连接配置
  • Bus发送刷新指令,其他模块接受指令并执行刷新操作,RabbitMQ模式为主题模式.
  • 链路日志发送至rabbitmq,zipkin接受链路日志,RabbitMQ模式为简单模式

订单削峰

将订单信息发送至RabbitMQ

  1. 新建配置类用于新建队列

Queue为org.springframework.amqp.core.Queue包下的类,用于封装队列参数,RabbitMQ的自动配置类会自动发现Queue实例,并根据其中的参数连接服务器创建队列

  1. import org.springframework.amqp.core.Queue;
  2. @Configuration
  3. public class RabbitMQConfig {
  4. /**
  5. * 新建Queue队列,用于封装队列参数
  6. * RabbitMQ的自动配置类会自动发现Queue实例,
  7. * 根据其中的参数连接服务器创建队列
  8. */
  9. @Bean
  10. public Queue orderQueue() {
  11. return new Queue("orderQueue", true, false, false);
  12. }
  13. }
  1. 注入AmqpTemplate实例使用此实例将订单信息发送至RabbitMQ

使用amqpTamplate实例的convertAndSend方法将订单信息发送至消息队列,此方法可以转自动将数据转换为byte[]数组并发送.第一个参数为队列名称,第二个参数为要发送的消息信息

  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3. /**
  4. * 在RabbitAutoConfiguration中自动创建了AmqpTemplate实例
  5. */
  6. @Autowired
  7. private AmqpTemplate amqpTemplate;
  8. public String saveOrder(PdOrder pdOrder) throws Exception {
  9. //转换(自动将数据转换为byte[]数组)并发送
  10. amqpTemplate.convertAndSend("orderQueue", pdOrder);
  11. }

从RabbitMQ中取出消息并进行处理

  1. 新建消息处理类

    • @RabbitListener注解可以自动注册成为消费者,自动开始从队列接收消息,自动将消息发送至@RabbitHandler注解标注方法中
    • @RabbitHandler用于指定处理消息的方法,配和@RabbitListener注解使用,且一个类中只能有一个
    • orderService实例为订单处理接口,用于处理订单信息
  1. @Component
  2. @RabbitListener(queues = "orderQueue")
  3. public class OrderConsumer {
  4. @Autowired
  5. private OrderService orderService;
  6. //@RabbitHandler配和@RabbitListener注解使用,用于指定处理消息的方法,且一个类中只能有一个
  7. @RabbitHandler
  8. public void receive(PdOrder pdOrder) throws Exception {
  9. orderService.saveOrder(pdOrder);
  10. System.out.println("------------订单已保存----------");
  11. }
  12. }
  1. 处理订单信息
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3. @Autowired
  4. PdOrderMapper pdOrderMapper;
  5. @Autowired
  6. PdCartItemMapper pdCartItemMapper;
  7. @Autowired
  8. PdItemMapper pdItemMapper;
  9. @Autowired
  10. PdItemParamItemMapper pdItemParamItemMapper;
  11. @Autowired
  12. PdShippingMapper pdShippingMapper;
  13. @Autowired
  14. PdOrderItemMapper pdOrderItemMapper;
  15. public String saveOrder(PdOrder pdOrder) throws Exception {
  16. String orderId = pdOrder.getOrderId();
  17. PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
  18. pdOrder.setShippingName(pdShipping.getReceiverName());
  19. pdOrder.setShippingCode(pdShipping.getReceiverAddress());
  20. pdOrder.setStatus(1);//
  21. pdOrder.setPaymentType(1);
  22. pdOrder.setPostFee(10D);
  23. pdOrder.setCreateTime(new Date());
  24. double payment = 0;
  25. List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
  26. for (ItemVO itemVO : itemVOs) {
  27. PdOrderItem pdOrderItem = new PdOrderItem();
  28. String id = generateId();
  29. //String id="2";
  30. pdOrderItem.setId(id);
  31. pdOrderItem.setOrderId(orderId);
  32. pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
  33. pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
  34. pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
  35. pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
  36. payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
  37. pdOrderItemMapper.insert(pdOrderItem);
  38. }
  39. pdOrder.setPayment(payment);
  40. pdOrderMapper.insert(pdOrder);
  41. return orderId;
  42. }
  43. }

SpringBoot整合RabbitMQ

  1. 创建模块导入依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.springframework.boot</groupId>
    8. <artifactId>spring-boot-starter-test</artifactId>
    9. <scope>test</scope>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.springframework.amqp</groupId>
    13. <artifactId>spring-rabbit-test</artifactId>
    14. <scope>test</scope>
    15. </dependency>
    16. </dependencies>
  1. 配置RabbitMQ连接配置
    1. spring:
    2. rabbitmq:
    3. host: 192.168.64.140
    4. port: 5672
    5. username: admin
    6. password: admin
    7. virtual-host: mq1

简单模式

新建一个HelloWorld队列,此队列为非持久队列,非独占,不自动删除,

new Queue("HelloWorld", false);等同于new Queue("HelloWorld", false, false, false);

new Queue("HelloWorld")默认为持久队列,只需要设置一个false,就可以将此队列设置为非持久队列

  1. @Bean
  2. public Queue HelloWorld() {
  3. return new Queue("HelloWorld", false);
  4. }

创建消息发送者

  1. @Component
  2. public class Provider {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. amqpTemplate.convertAndSend("HelloWorld", "HelloWorld");
  7. }
  8. }

创建消息消费者

  1. @Slf4j
  2. @Component
  3. public class Consumer {
  4. @RabbitListener(queues = "HelloWorld")
  5. public void receive(String str) {
  6. log.info("收到消息:{}", str);
  7. System.out.println(str);
  8. }
  9. }

工作模式(资源竞争)

创建队列:

  1. @Bean
  2. public Queue HelloWorld() {
  3. return new Queue("task_queue");
  4. }

创建生产者:

  1. @Component
  2. public class Provider {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. while (true){
  7. System.out.print("请输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. amqpTemplate.convertAndSend("task_queue", s);
  10. }
  11. }
  12. }

创建两个消费者:

  1. @Slf4j
  2. @Component
  3. public class Consumer {
  4. @RabbitListener(queues = "task_queue")
  5. public void receive1(String str) {
  6. log.info("----消费者1收到消息:{}----", str);
  7. }
  8. @RabbitListener(queues = "task_queue")
  9. public void receive2(String str) {
  10. log.info("----消费者2收到消息:{}----", str);
  11. }
  12. }
  1. 消息合理分发

    1. autoAck=false,关闭自动确认回执模式.
      SpringBot封装的api默认为手动确认回执模式,SpringBoot会自动发送回执.

    2. 每次抓取消息数量
      设置qos=1,每次只抓取一条消息
      SpringBoot默认每次抓取250条消息

  1. spring:
  2. rabbitmq:
  3. host: 192.168.64.140
  4. port: 5672
  5. username: admin
  6. password: admin
  7. listener:
  8. simple:
  9. prefetch: 1 #设置每次抓取一条消息
  1. 消息持久化

    1. 队列持久化
      new Queue(队列名),默认为持久化队列,new Queue(队列名,false)为非持久化队列

    2. 消息数据持久化
      使用amqpTemplate.convertAndSend发送的消息默认为持久化的消息
      amqpTemplate.convertAndSend(队列名,消息,消息预处理对象),消息与处理对象中可以获取消息属性,可以把持久化属性改为非持久,可以发送非持久消息

发布订阅模式(资源共享)

发布订阅模式需要使用fanout交换机

创建交换机

  1. @Bean
  2. public FanoutExchange fanoutExchange() {
  3. //非持久化和不自动删除,默认为持久化和自动删除
  4. return new FanoutExchange("logs",false,false);
  5. }

创建消息提供者

amqpTemplate.convertAndSend("交换机名","队列名或路由键","消息");,三个参数依次为:

  1. 交换机名
  2. 队列名或路由键
  3. 消息
  1. @Component
  2. public class Provider {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. while (true) {
  7. System.out.print("请输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. amqpTemplate.convertAndSend("logs", "", s);
  10. }
  11. }
  12. }

创建消息消费者

  1. @Slf4j
  2. @Component
  3. public class Consumer {
  4. @RabbitListener(bindings = @QueueBinding(
  5. //随机命名队列,参数为false,true,true--->非持久,独占,自动删除
  6. value = @Queue,
  7. //绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
  8. exchange = @Exchange(name = "logs", declare = "false")
  9. ))
  10. public void receive1(String str) {
  11. log.info("----消费者1收到消息:{}----", str);
  12. }
  13. @RabbitListener(bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "logs", declare = "false")))
  14. public void receive2(String str) {
  15. log.info("----消费者2收到消息:{}----", str);
  16. }
  17. }

路由模式

路由模式需要使用direct交换机

创建direct交换机

  1. @Bean
  2. public DirectExchange directExchange() {
  3. return new DirectExchange("direct_logs", false, false);
  4. }

创建消息生产者

  1. @Component
  2. public class Provider {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. while (true) {
  7. System.out.print("请输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. System.out.print("请输入路由键:");
  10. String k = new Scanner(System.in).nextLine();
  11. amqpTemplate.convertAndSend("direct_logs", k, s);
  12. }
  13. }
  14. }

创建消息消费者

  1. @Slf4j
  2. @Component
  3. public class Consumer {
  4. @RabbitListener(bindings = @QueueBinding(
  5. //随机命名队列,参数为false,true,true--->非持久,独占,自动删除
  6. value = @Queue,
  7. //绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
  8. exchange = @Exchange(name = "direct_logs", declare = "false"),
  9. //绑定路由键
  10. key = {"error"}
  11. ))
  12. public void receive1(String str) {
  13. log.info("----消费者1收到消息:{}----", str);
  14. }
  15. @RabbitListener(bindings = @QueueBinding(
  16. value = @Queue,
  17. exchange = @Exchange(name = "direct_logs", declare = "false"),
  18. key = {"info", "error", "waring"}
  19. ))
  20. public void receive2(String str) {
  21. log.info("----消费者2收到消息:{}----", str);
  22. }
  23. }

主题模式

主题模式需要使用topic交换机

topic交换机的路由键格式为*.*.*,支持正则表达式,*代表任意字符,#代表任意字符且任意个值

创建topic交换机

  1. @Bean
  2. public TopicExchange topicExchange() {
  3. return new TopicExchange("topic_logs", false, false);
  4. }

创建消息生产者

  1. @Component
  2. public class Provider {
  3. @Autowired
  4. private AmqpTemplate amqpTemplate;
  5. public void send() {
  6. while (true) {
  7. System.out.print("请输入消息:");
  8. String s = new Scanner(System.in).nextLine();
  9. System.out.print("请输入路由键:");
  10. String k = new Scanner(System.in).nextLine();
  11. amqpTemplate.convertAndSend("topic_logs", k, s);
  12. }
  13. }
  14. }

创建消息消费者

  1. @Slf4j
  2. @Component
  3. public class Consumer {
  4. @RabbitListener(bindings = @QueueBinding(
  5. //随机命名队列,参数为false,true,true--->非持久,独占,自动删除
  6. value = @Queue,
  7. //绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
  8. exchange = @Exchange(name = "topic_logs", declare = "false"),
  9. //绑定路由键
  10. key = {"*.orange.*"}
  11. ))
  12. public void receive1(String str) {
  13. log.info("----消费者1收到消息:{}----", str);
  14. }
  15. @RabbitListener(bindings = @QueueBinding(
  16. value = @Queue,
  17. exchange = @Exchange(name = "topic_logs", declare = "false"),
  18. key = {"*.*.rabbit", "info.*", "lazy.#"}
  19. ))
  20. public void receive2(String str) {
  21. log.info("----消费者2收到消息:{}----", str);
  22. }
  23. }

自定义随机命名队列

使用@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "topic_logs", declare = "false"),中value=@Queue方式绑定的对列为spring随机创建的队列,如下所示就是使用spring创建的随机队列

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue,
  3. exchange = @Exchange(name = "topic_logs", declare = "false"),
  4. key = {"*.*.rabbit", "info.*", "lazy.#"}
  5. ))

自定义随机队列

  1. /**
  2. * 放入Spring容器的对象键值对
  3. * key---randomQueue
  4. * value--Queue实例
  5. *
  6. * @return 随机命名的队列
  7. */
  8. @Bean
  9. public Queue randomQueue() {
  10. return new Queue(UUID.randomUUID().toString(), false, true, true);
  11. }

使用自定义的随机队列

  1. @RabbitListener(bindings = @QueueBinding(
  2. //随机命名队列,参数为false,true,true--->非持久,独占,自动删除
  3. value = @Queue(name = "#{randomQueue.name}", declare = "false"),
  4. //绑定交换机名字,declare="false"表示不创建交换机,只是引用存在的交换机
  5. exchange = @Exchange(name = "topic_logs", declare = "false"),
  6. //绑定路由键
  7. key = {"*.orange.*"}
  8. ))
  9. public void receive1(String str) {
  10. log.info("----消费者1收到消息:{}----", str);
  11. }

其中#{}为 SPEL——Spring Exception Language
可以直接访问Spring容器中的对象

${}为 OGNL——Object Graph Navigation Language
Struts2中提供的一种表达式语言

#{randomQueue.name}可以获取创建的随机队列名