消息队列的作用

  1. 应用解耦

image.png

  1. 流量削峰

image.png

概述

  1. 消息队列主要有两种形式的目的地

队列:点对点消息通信
主题: 发布/订阅 消息通信

发布模式

  1. 点对点式:消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
  2. 发布订阅式:发送者(发布者发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,呢么就会在消息到达时同时收到消息

    消息代理的规范

  3. JMS(Java Message Service)JAVA消息服务:基于VM消息代理的规范。ActiveMQ、HornetMQ是MS实现

  4. AMQP (Advanced Message Queuing Protocol)高级消息队列协议,也是一个消息代理的规范,兼容MSRabbitMQ是AMQP的实现

    RabbitMQ简介

  5. RabbitMQ是个由erlang开发的AMQP(Advanved Message Queue Protoco的开源实现。

    核心概念

  6. Message:消息是不具名的,它油消息头和消息体组成。消息体是不透梨明购项消息头则由.系列的可选属性组,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  7. Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  8. Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout,,topic,和headers(基本不用),不同类型的Exchange转发消息的策略有所区别
  9. Queue:消息队列,用来保存消息真到发送给消费者。它是消息的容器,也是消息的终点。一个消息可一个或多个队列。消意一直在队列重面,等待消费署莲接到这个队列将其取走。
  10. Binding:绑定,用于消息队列和交换器之间的关联一个绑定就是基于路由键将交换器和消息队列连接起来的路宙规侧,所以可以将交换器理解成一个宙绑定构成的露宙表。Exchange和Queue的绑定可以是多对多的关系。
  11. Connection:网络连接,比如一个TCP连接。
  12. Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成,因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,以引入了信道的概念,以复用一条TCP连接。
  13. Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  14. Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定RabbitMQ默认的vhost是/。
  15. Broker:表示消息队列服务器实体

9.整合RabbitMq消息队列组件 - 图3
image.png

docker安装rabbitMq

  1. docker下拉镜像

    1. docker pull rabbitmq
  2. docker部署镜像容器

    1. docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitMq rabbitmq
  3. 设置开启后台管理

    1. # 进入容器中
    2. docker exec -it <rabbitmq容器id> bash
    3. # 执行
    4. rabbitmq-plugins enable rabbitmq_management
  4. 常见错误RabbitMQ Management:Management API returned status code 500

解决方法:

  1. 因为是使用docker 容器安装的,所有需要进入容器
  2. docker exec -it 容器名称 /bin/bash
  3. 进入目录
  4. cd /etc/rabbitmq/conf.d/
  5. 执行命令
  6. echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
  7. 退出容器
  8. exit
  9. 重启rabbitmq
  10. docker restart rabbitmq

访问出现此页面则代表成功
image.png

整合RabbitMQ

1.导入jar包

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit-test</artifactId>
  4. <scope>test</scope>
  5. </dependency>

完整的pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.6.10</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>springboot-rabbitmq</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springboot-rabbitmq</name>
  15. <description>springboot-rabbitmq</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-devtools</artifactId>
  31. <scope>runtime</scope>
  32. <optional>true</optional>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.projectlombok</groupId>
  36. <artifactId>lombok</artifactId>
  37. <optional>true</optional>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.springframework.boot</groupId>
  41. <artifactId>spring-boot-starter-test</artifactId>
  42. <scope>test</scope>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.springframework.amqp</groupId>
  46. <artifactId>spring-rabbit-test</artifactId>
  47. <scope>test</scope>
  48. </dependency>
  49. <dependency>
  50. <groupId>junit</groupId>
  51. <artifactId>junit</artifactId>
  52. <version>4.13.2</version>
  53. </dependency>
  54. </dependencies>
  55. <build>
  56. <plugins>
  57. <plugin>
  58. <groupId>org.springframework.boot</groupId>
  59. <artifactId>spring-boot-maven-plugin</artifactId>
  60. <configuration>
  61. <excludes>
  62. <exclude>
  63. <groupId>org.projectlombok</groupId>
  64. <artifactId>lombok</artifactId>
  65. </exclude>
  66. </excludes>
  67. </configuration>
  68. </plugin>
  69. </plugins>
  70. </build>
  71. </project>

2.配置rabbitMq参数

  1. #RabbitMQ主机
  2. spring.rabbitmq.host=192.168.87.128
  3. #RabbitMQ账号
  4. spring.rabbitmq.username=guest
  5. #RabbitMQ密码
  6. spring.rabbitmq.password=guest
  7. #RabbitMQ端口号
  8. spring.rabbitmq.port=5672
  9. #spring.rabbitmq.virtual-host=
  10. server.port=8082

3.开启RabbitMQ相关注释

  1. /***
  2. * description: 自动配置
  3. * 1.RabbitAutoConfiguration
  4. * 2.有自动配置了连接工厂ConnectionFactory
  5. * 3.RabbitProperties 封装了RabbitMQ的配置
  6. * 4. RabbitTemplate :给RabbitMQ发送和接收消息
  7. * 序列化
  8. * 5. AmqpAdmin : RabbitMQ系统管理组件
  9. * 创建和删除Queue,Exchange.Binding
  10. * 6.开启rabbitMq
  11. */
  12. @SpringBootApplication
  13. @EnableRabbit
  14. public class SpringbootRabbitmqApplication {
  15. public static void main(String[] args) {
  16. SpringApplication.run(SpringbootRabbitmqApplication.class, args);
  17. }
  18. }

4.配置json序列化

若为配置序列化,会按默认额度序列化存储数据,但是后台查看存储数据是无法看清数据

  1. /**
  2. * @description: json 序列化
  3. * @author: xiaYZ
  4. * @createDate: 2022/8/1
  5. */
  6. @Configuration
  7. public class MyAMQPConfig {
  8. @Bean
  9. public Jackson2JsonMessageConverter messageConverter(){
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. }

5.RabbitMQ相关操作

5.1构建RabbitTemplate和AmqpAdmin

  1. RabbitTemplate rabbitTemplate;
  2. AmqpAdmin amqpAdmin;
  3. @Autowired
  4. public DemoRabbitMQ(RabbitTemplate rabbitTemplate,AmqpAdmin amqpAdmin){
  5. this.rabbitTemplate = rabbitTemplate;
  6. this.amqpAdmin = amqpAdmin;
  7. }

5.2点对点发送消息

如果没有相应的exchange和Queue需要创建否则会提示失败信息

  1. /***
  2. * description: 1.单播(点对点)
  3. *
  4. * version: 1.0
  5. * date: 2022/8/1 19:49
  6. * author: xiaYZ
  7. * iteration: 迭代说明
  8. * @param
  9. * @return
  10. */
  11. @GetMapping("contextLoads")
  12. public String contextLoads(){
  13. //Message需要自己构造一个;定义消息体内容和消息头
  14. //exchange交换机,routKey
  15. //rabbitTemplate.send(exchange,routeKey,message);
  16. HashMap<Object, Object> map = new HashMap<>(8);
  17. map.put("objects","对象");
  18. map.put("msg","这是一个对象");
  19. map.put("date", Arrays.asList(123,true,"对象"));
  20. //序列号之后再发送出去
  21. rabbitTemplate.convertAndSend("exchange.direct","top.news",map);
  22. return "发送成功";
  23. }

5.3 广播消息

  1. /****
  2. * description: sendMessage 广播消息
  3. * version: 1.0 ->
  4. * date: 2022/8/1 23:55
  5. * author: xiaYZ
  6. * iteration: 迭代说明
  7. * @param
  8. * @return java.lang.String
  9. */
  10. @ResponseBody
  11. @RequestMapping("sendMessage")
  12. public String sendMessage(){
  13. rabbitTemplate.convertAndSend("exchange.fanout","","hello world");
  14. return "发送成功";
  15. }

5.4获取队列消息

如果队列中所有消息都消耗完了,会提示空指针异常

  1. @GetMapping("receive")
  2. public String receive(){
  3. Object o = rabbitTemplate.receiveAndConvert("xiayz.news");
  4. System.out.println(o.getClass());
  5. System.out.println(o);
  6. return o.toString();
  7. }

5.5创建交换机和队列

注意:amqpAdmin是管理交换机exchange和队列Queue的组件

  1. /****
  2. * description: create
  3. * version: 1.0 ->
  4. * date: 2022/8/2 0:03
  5. * author: xiaYZ
  6. * iteration: 迭代说明
  7. * @param
  8. * @return java.lang.String
  9. */
  10. @ResponseBody
  11. @RequestMapping("create")
  12. public String create(){
  13. //创建交换机
  14. amqpAdmin.declareExchange(new DirectExchange("newExchange"));
  15. //创建队列--队列名称,是否持久化
  16. amqpAdmin.declareQueue(new Queue("newQueue",true));
  17. //绑定队列和交换机
  18. amqpAdmin.declareBinding(new Binding("newQueue", Binding.DestinationType.QUEUE,"newExchange","haha",null));
  19. return "创建成功,绑定成功";
  20. }

5.6监听队列消息

当监听队列收到消息时执行相应的方法

  1. @Service
  2. public class RabbitMQListen {
  3. @RabbitListener(queues = "xiayz.news")
  4. public void listen(Object object){
  5. System.out.println(object.toString());
  6. }
  7. }