消息队列中间件,是分布式系统中的重要组件
主要解决,异步处理,应用解耦,流量削峰等问题 从而实现高性能,高可用,可伸缩和最终一致性的架构
使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等

异步处理

用户注册后,需要发送验证邮箱和手机验证码; 将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
image.png

应用解耦

image.png
如果库存系统坏了,传统架构下订单系统也会出问题。耦合太高了。
使用MQ以后,下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队 列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦。

流量削峰

image.png 用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束 的页面

背景知识

AMQP

即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息并不受产品、开发语言等条件的限制

JMS

Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消 息,进行异步通信

关系

JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式 JMS必须是java语言;AMQP只是协议,与语言无关

为什么用RabbitMQ

有强大的WEB管理页面
强大的社区支持,为技术进步提供动力
支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ,如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或者zeroMQ
kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!

RabbitMQ组件的功能

image.png
Broker:消息队列服务器实体
Virtual Host:虚拟主机 标识一批交换机、消息队列和相关对象,形成的整体
虚拟主机是共享相同的身份认证和加密环境的独立服务器域 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制 vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
Exchange:交换器(路由) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列。用来保存消息直到发送给消费者。 它是消息的容器,也是消息的终点。 一个消息可投入一个或多个队列。 消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
多路复用连接中的一条独立的双向数据流通道。 信道是建立在真实的TCP连接内的虚拟链接
AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
消息是不具名的,它是由消息头和消息体组成。 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由-键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

启动后台管理插件

  1. rabbitmq-plugins enable rabbitmq_management

启动MQ相关命令

  1. [root@localhost opt]# systemctl start rabbitmq-server.service
  2. [root@localhost opt]# systemctl status rabbitmq-server.service
  3. [root@localhost opt]# systemctl restart rabbitmq-server.service
  4. [root@localhost opt]# systemctl stop rabbitmq-server.service
  5. 查看进程
  6. [root@localhost opt]# ps -ef | grep rabbitmq
  7. 默认端口15672

创建远程连接角色

  1. [root@localhost opt]# rabbitmqctl add_user laosun 123456
  2. [root@localhost opt]# rabbitmqctl set_user_tags laosun administrator
  3. [root@localhost opt]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
  4. 查看用户列表和改密码
  5. [root@localhost opt]# rabbitmqctl list_users
  6. [root@localhost opt]# rabbitmqctl change_password laosun 123123

界面简介

image.png
image.png

Java连接RabbitMQ

  1. public class ConnectionUtil {
  2. public static Connection getConnection() throws Exception {
  3. ConnectionFactory connectionFactory = new ConnectionFactory();
  4. connectionFactory.setHost("192.168.42.131");
  5. connectionFactory.setPort(5672);
  6. connectionFactory.setVirtualHost("/lagou");
  7. connectionFactory.setUsername("jining");
  8. connectionFactory.setPassword("980909");
  9. Connection connection = connectionFactory.newConnection();
  10. return connection;
  11. }
  12. public static void main(String[] args) throws Exception {
  13. Connection connection = getConnection();
  14. System.out.println(connection);
  15. connection.close();
  16. }
  17. }

消息模式

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
image.png 1和2属于点对点 3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:消息队列(queue),发送者(sender),接收者(receiver)
每个消息发送到一个特定的队列中,接收者从中获得消息
队列中保留这些消息,直到他们被消费或超时

特点:

  1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了
  2. 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
  3. 接收者成功接收消息之后需向对象应答成功(确认)
    如果希望发送的每个消息都会被成功处理,那需要P2P

发布订阅模式:publish(Pub)/subscribe(Sub)
pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者 (subcriber) 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者

特点:

  1. 每个消息可以有多个订阅者
  2. 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅
    后,才能消费发布者的消息
  3. 为了消费消息,订阅者必须保持运行状态;类似于看电视直播。
    如果希望发送的消息被多个消费者处理,可采用本模式

简单模式

  1. public class MessageSender {
  2. public static void main(String[] args) throws Exception {
  3. String msg = "Jining said 'Hello, Rabbit MQ'";
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. //参数1 队列名 2 是否持久化 3 是否排外(能否给别的队列用) 4 是否自动删除 5 队列参数
  7. channel.queueDeclare("queue1", false, false, false, null);
  8. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  9. channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
  10. System.out.println("发送了" + msg);
  11. channel.close();
  12. }
  13. }
  14. public class MessageReceiver {
  15. public static void main(String[] args) throws Exception {
  16. Connection connection = ConnectionUtil.getConnection();
  17. Channel channel = connection.createChannel();
  18. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. //1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
  22. System.out.println(new String(body));
  23. }
  24. };
  25. channel.basicConsume("queue1", true, defaultConsumer);
  26. }
  27. }

消息确认机制

image.png

修改成手动确认

  1. public class ReceiverACK {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  6. @Override
  7. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  8. //1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
  9. System.out.println(new String(body));
  10. // 1 收件人信息 2 是否同时确认多个消息
  11. channel.basicAck(envelope.getDeliveryTag(), false);
  12. }
  13. };
  14. channel.basicConsume("queue1", false, defaultConsumer);
  15. }
  16. }
  1. 主要就是最后一行。

工作队列模式

image.png

发送者

  1. public class Sender {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //参数1 队列名 2 是否持久化 3 是否排外(能否给别的队列用) 4 是否自动删除 5 队列参数
  6. channel.queueDeclare("test_work_queue", false, false, false, null);
  7. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  8. for (int i=0; i<100; i++) {
  9. String msg = "羊肉串 ---> " + i;
  10. channel.basicPublish("", "test_work_queue", null, msg.getBytes(StandardCharsets.UTF_8));
  11. System.out.println("新鲜出炉" + msg);
  12. }
  13. channel.close();
  14. }
  15. }

接收者1

  1. public class Receiver1 {
  2. private static int i = 1;
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. //队列不存在则创建,存在则获取
  7. channel.queueDeclare("test_work_queue", false, false, false, null);
  8. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  9. @Override
  10. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  11. //1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
  12. String msg = new String(body);
  13. System.out.printf("顾客1吃掉了" + msg + ", 总共吃%d串\n", i++);
  14. try {
  15. Thread.sleep(200);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. // 1 收件人信息 2 是否同时确认多个消息
  20. channel.basicAck(envelope.getDeliveryTag(), false);
  21. }
  22. };
  23. channel.basicConsume("test_work_queue", false, defaultConsumer);
  24. }
  25. }

接收者2

  1. public class Receiver2 {
  2. private static int i = 1;
  3. public static void main(String[] args) throws Exception {
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. channel.queueDeclare("test_work_queue", false, false, false, null);
  7. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  8. @Override
  9. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  10. //1 收件人信息 2 信封(包裹上的快递标签) 3 协议配置 4
  11. String msg = new String(body);
  12. System.out.printf("顾客2吃掉了" + msg + ", 总共吃%d串\n", i++);
  13. try {
  14. Thread.sleep(900);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. // 1 收件人信息 2 是否同时确认多个消息
  19. channel.basicAck(envelope.getDeliveryTag(), false);
  20. }
  21. };
  22. channel.basicConsume("test_work_queue", false, defaultConsumer);
  23. }
  24. }

问题与解决

俩人消费的速度不一样可消费的消息数是一样的,1先消费完,无所事事的等2,这肯定不行。
官网给了解决方法

  1. channel.basicQos(1);

这句话的意思就是 不要给队列一次发送1条以上的信息。换句话说,消息没确认之前不要发下一条信息。
这必须配合手动确认机制。

发布订阅模式

image.png
P是生产者,X是交换机,红色的是队列。
举例来说,P是up主,X是哔站,红色的是人。
整个过程,必须先创立路由。但是路由没有储存消息的能力。

运行时的顺序

先生产者 创建路由
再消费者 绑定队列
最后再生产者 发送具体消息

代码

  1. public class Sender {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //定义路由 1 名 2 模式。 fanout 不处理路由键(只需要把队列绑定到路由上,消息就会自动转发到队列就行了)
  6. channel.exchangeDeclare("test_exchange_fanout", "fanout");
  7. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  8. String msg = "hello everyone";
  9. channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes(StandardCharsets.UTF_8));
  10. System.out.println("生产者: " + msg);
  11. channel.close();
  12. }
  13. }
  1. public class Receiver1 {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //声明队列
  6. channel.queueDeclare("test_exchange_fanout_queue1", false, false, false, null);
  7. //绑定路由
  8. channel.queueBind("test_exchange_fanout_queue1", "test_exchange_fanout", "");
  9. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  10. @Override
  11. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  12. String msg = new String(body);
  13. System.out.println("消费者1: " + msg);
  14. channel.basicAck(envelope.getDeliveryTag(), false);
  15. }
  16. };
  17. channel.basicConsume("test_exchange_fanout_queue1", false, defaultConsumer);
  18. }
  19. }
  1. Receiver就是把队列换成2,改一下数字即可。

路由模式

image.png
direct就是定向。

  1. public class Sender {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //direct 根据路由键定向分发消息
  6. channel.exchangeDeclare("test_exchange_direct", "direct");
  7. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  8. String msg = "用户注册,[userid=s101]";
  9. channel.basicPublish("test_exchange_direct", "select", null, msg.getBytes(StandardCharsets.UTF_8));
  10. System.out.println("生产者: " + msg);
  11. channel.close();
  12. }
  13. }
  14. public class Receiver1 {
  15. public static void main(String[] args) throws Exception {
  16. Connection connection = ConnectionUtil.getConnection();
  17. Channel channel = connection.createChannel();
  18. //声明队列
  19. channel.queueDeclare("test_exchange_direct_queue1", false, false, false, null);
  20. //绑定路由
  21. channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "insert");
  22. channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "update");
  23. channel.queueBind("test_exchange_direct_queue1", "test_exchange_direct", "delete");
  24. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. String msg = new String(body);
  28. System.out.println("消费者1: " + msg);
  29. channel.basicAck(envelope.getDeliveryTag(), false);
  30. }
  31. };
  32. channel.basicConsume("test_exchange_direct_queue1", false, defaultConsumer);
  33. }
  34. }
  35. public class Receiver2 {
  36. public static void main(String[] args) throws Exception {
  37. Connection connection = ConnectionUtil.getConnection();
  38. Channel channel = connection.createChannel();
  39. //声明队列
  40. channel.queueDeclare("test_exchange_direct_queue2", false, false, false, null);
  41. //绑定路由
  42. channel.queueBind("test_exchange_direct_queue2", "test_exchange_direct", "select");
  43. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. String msg = new String(body);
  47. System.out.println("消费者2: " + msg);
  48. channel.basicAck(envelope.getDeliveryTag(), false);
  49. }
  50. };
  51. channel.basicConsume("test_exchange_direct_queue2", false, defaultConsumer);
  52. }
  53. }

通配符模式

和定向基本相同,就是路由键可以模糊匹配。

  1. public class Sender {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //direct 根据路由键定向分发消息
  6. channel.exchangeDeclare("test_exchange_topic", "topic");
  7. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  8. String msg = "用户注册,[userid=s101]";
  9. channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes(StandardCharsets.UTF_8));
  10. System.out.println("生产者: " + msg);
  11. channel.close();
  12. }
  13. }
  14. public class Receiver1 {
  15. public static void main(String[] args) throws Exception {
  16. Connection connection = ConnectionUtil.getConnection();
  17. Channel channel = connection.createChannel();
  18. //声明队列
  19. channel.queueDeclare("test_exchange_topic_queue1", false, false, false, null);
  20. //绑定路由
  21. channel.queueBind("test_exchange_topic_queue1", "test_exchange_topic", "user.#");
  22. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  23. @Override
  24. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  25. String msg = new String(body);
  26. System.out.println("消费者1: " + msg);
  27. channel.basicAck(envelope.getDeliveryTag(), false);
  28. }
  29. };
  30. channel.basicConsume("test_exchange_topic_queue1", false, defaultConsumer);
  31. }
  32. }
  33. public class Receiver2 {
  34. public static void main(String[] args) throws Exception {
  35. Connection connection = ConnectionUtil.getConnection();
  36. Channel channel = connection.createChannel();
  37. //声明队列
  38. channel.queueDeclare("test_exchange_topic_queue2", false, false, false, null);
  39. //绑定路由
  40. channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "product.#");
  41. channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "order.#");
  42. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  43. @Override
  44. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45. String msg = new String(body);
  46. System.out.println("消费者2: " + msg);
  47. channel.basicAck(envelope.getDeliveryTag(), false);
  48. }
  49. };
  50. channel.basicConsume("test_exchange_topic_queue2", false, defaultConsumer);
  51. }
  52. }

持久化

想要将消息持久化,那么路由和队列都要持久化才可以。

  1. public class Sender {
  2. public static void main(String[] args) throws Exception {
  3. Connection connection = ConnectionUtil.getConnection();
  4. Channel channel = connection.createChannel();
  5. //direct 根据路由键定向分发消息
  6. channel.exchangeDeclare("test_exchange_topic", "topic", true);
  7. //1 交换机名称 简单模式没有交换机。 2 目标队列名称 3 设置消息属性 4 消息内容
  8. String msg = "用户注册,[userid=s101]";
  9. channel.basicPublish("test_exchange_topic", "product.price",
  10. MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
  11. System.out.println("生产者: " + msg);
  12. channel.close();
  13. }
  14. }
  15. public class Receiver1 {
  16. public static void main(String[] args) throws Exception {
  17. Connection connection = ConnectionUtil.getConnection();
  18. Channel channel = connection.createChannel();
  19. //声明队列
  20. channel.queueDeclare("test_exchange_topic_queue1", true, false, false, null);
  21. //绑定路由
  22. channel.queueBind("test_exchange_topic_queue1", "test_exchange_topic", "user.#");
  23. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. String msg = new String(body);
  27. System.out.println("消费者1: " + msg);
  28. channel.basicAck(envelope.getDeliveryTag(), false);
  29. }
  30. };
  31. channel.basicConsume("test_exchange_topic_queue1", false, defaultConsumer);
  32. }
  33. }
  34. public class Receiver2 {
  35. public static void main(String[] args) throws Exception {
  36. Connection connection = ConnectionUtil.getConnection();
  37. Channel channel = connection.createChannel();
  38. //声明队列
  39. channel.queueDeclare("test_exchange_topic_queue2", true, false, false, null);
  40. //绑定路由
  41. channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "product.#");
  42. channel.queueBind("test_exchange_topic_queue2", "test_exchange_topic", "order.#");
  43. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. String msg = new String(body);
  47. System.out.println("消费者2: " + msg);
  48. channel.basicAck(envelope.getDeliveryTag(), false);
  49. }
  50. };
  51. channel.basicConsume("test_exchange_topic_queue2", false, defaultConsumer);
  52. }
  53. }

Spring整合

其实用的最多的就是最后的通配符模式。
记一下依赖和配置文件吧。

生产者

  1. 依赖
  2. <dependencies>
  3. <dependency>
  4. <groupId>org.springframework.amqp</groupId>
  5. <artifactId>spring-rabbit</artifactId>
  6. <version>2.0.1.RELEASE</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.slf4j</groupId>
  10. <artifactId>slf4j-log4j12</artifactId>
  11. <version>1.7.25</version>
  12. <scope>compile</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.commons</groupId>
  16. <artifactId>commons-lang3</artifactId>
  17. <version>3.9</version>
  18. </dependency>
  19. </dependencies>
  1. 配置文件
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <beans xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/rabbit
  10. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  11. <rabbit:connection-factory
  12. id="connectionFactory"
  13. host="192.168.42.131"
  14. port="5672"
  15. username="jining"
  16. password="980909"
  17. virtual-host="/lagou"
  18. />
  19. <!-- 2.配置队列 -->
  20. <rabbit:queue name="test_spring_queue_1"/>
  21. <!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
  22. <rabbit:admin connection-factory="connectionFactory"/>
  23. <!-- 4.配置topic类型exchange;队列绑定到交换机 -->
  24. <rabbit:topic-exchange name="spring-topic-exchange">
  25. <rabbit:bindings>
  26. <rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
  27. </rabbit:bindings>
  28. </rabbit:topic-exchange>
  29. <!-- 5. 配置消息对象json转换类 -->
  30. <bean id="jsonMessageConverter"
  31. class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
  32. <!-- 6. 配置RabbitTemplate(消息生产者) -->
  33. <rabbit:template id="rabbitTemplate"
  34. connection-factory="connectionFactory"
  35. exchange="spring-topic-exchange"
  36. message-converter="jsonMessageConverter"/>
  37. </beans>
  1. public class Sender {
  2. public static void main(String[] args) {
  3. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
  4. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  5. HashMap<String, String> map = new HashMap<>();
  6. map.put("name", "alice");
  7. map.put("email", "alice@gmail.com");
  8. rabbitTemplate.convertAndSend("msg.user", map);
  9. context.close();
  10. }
  11. }

消费者

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/rabbit
  10. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  11. http://www.springframework.org/schema/context
  12. http://www.springframework.org/schema/context/spring-context.xsd">
  13. <rabbit:connection-factory
  14. id="connectionFactory"
  15. host="192.168.42.131"
  16. port="5672"
  17. username="jining"
  18. password="980909"
  19. virtual-host="/lagou"
  20. />
  21. <!-- 2.配置队列 -->
  22. <rabbit:queue name="test_spring_queue_1"/>
  23. <!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
  24. <rabbit:admin connection-factory="connectionFactory"/>
  25. <!-- 注册bean-->
  26. <context:component-scan base-package="listener"/>
  27. <!-- 配置监听-->
  28. <rabbit:listener-container connection-factory="connectionFactory">
  29. <rabbit:listener ref="consumer" queue-names="test_spring_queue_1"/>
  30. </rabbit:listener-container>
  31. </beans>
  1. @Component
  2. public class Consumer implements MessageListener {
  3. private static final ObjectMapper MAPPER = new ObjectMapper();
  4. @Override
  5. public void onMessage(Message message) {
  6. try {
  7. JsonNode jsonNode = MAPPER.readTree(message.getBody());
  8. String name = jsonNode.get("name").asText();
  9. String email = jsonNode.get("email").asText();
  10. System.out.println("队列中的消息: " + name + "的邮箱是" + email);
  11. } catch (IOException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

消息成功确认机制

如何保证消息成功发布?
事务、发布确认机制。
实际使用中主要是用发布确认机制,发布确认机制对性能影响比较小。

事务

AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式 并利用信道 的三个方法来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
channel.txSelect(): 开启事务
channel.txCommit() :提交事务
channel.txRollback() :回滚事务
当然,在spring里,这些都封装好了。

发布确认

比如我们一次发送十条消息,到第九条失败了,前八条都白瞎了。
发布确认则是失败什么重新补发什么。

配置文件,其实就改了几行,一是factory那里,二是模板那里多了bean

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="
  6. http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/rabbit
  9. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  10. <rabbit:connection-factory
  11. id="connectionFactory"
  12. host="192.168.42.131"
  13. port="5672"
  14. username="jining"
  15. password="980909"
  16. virtual-host="/lagou"
  17. publisher-confirms="true"
  18. />
  19. <!-- 2.配置队列 -->
  20. <rabbit:queue name="test_spring_queue_1"/>
  21. <!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
  22. <rabbit:admin connection-factory="connectionFactory"/>
  23. <!-- 4.配置topic类型exchange;队列绑定到交换机 -->
  24. <rabbit:topic-exchange name="spring-topic-exchange">
  25. <rabbit:bindings>
  26. <rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/>
  27. </rabbit:bindings>
  28. </rabbit:topic-exchange>
  29. <!-- 5. 配置消息对象json转换类 -->
  30. <bean id="jsonMessageConverter"
  31. class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
  32. <!-- 6. 配置RabbitTemplate(消息生产者) -->
  33. <rabbit:template id="rabbitTemplate"
  34. connection-factory="connectionFactory"
  35. exchange="spring-topic-exchange"
  36. message-converter="jsonMessageConverter"
  37. confirm-callback="msgSendConfirmCallBack"
  38. />
  39. <bean id="msgSendConfirmCallBack" class="confirm.MsgSendConfirmCallBack"/>
  40. </beans>

消费端限流

不可能在生产端限流。

实现

首先修改配置文件(消费端)

  1. <!-- 配置监听 prefetch是一次放几条消息, 确认是手动-->
  2. <rabbit:listener-container connection-factory="connectionFactory" prefetch="2" acknowledge="manual">
  3. <rabbit:listener ref="consumer" queue-names="test_spring_queue_1"/>
  4. </rabbit:listener-container>
  1. 再改代码
  1. @Component
  2. public class Consumer extends AbstractAdaptableMessageListener {
  3. private static final ObjectMapper MAPPER = new ObjectMapper();
  4. @Override
  5. public void onMessage(Message message, Channel channel) throws Exception {
  6. try {
  7. // String str = new String(message.getBody());
  8. // 将message对象转换成json
  9. JsonNode jsonNode = MAPPER.readTree(message.getBody());
  10. String name = jsonNode.get("name").asText();
  11. String email = jsonNode.get("email").asText();
  12. System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】");
  13. long deliveryTag =
  14. message.getMessageProperties().getDeliveryTag();
  15. //确认收到(参数1,参数2)
  16. /*
  17. 参数1:RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递
  18. 增的正整数,delivery_tag 的范围仅限于 Channel
  19. 参数2:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以
  20. 一次性确认 delivery_tag 小于等于传入值的所有消息
  21. */
  22. channel.basicAck(deliveryTag , true);
  23. Thread.sleep(3000);
  24. System.out.println("休息三秒然后再接收消息");
  25. } catch (Exception e){
  26. e.printStackTrace();
  27. }
  28. }
  29. }

过期时间

消息可以设置过期时间,过期后加入死信队列。
分类两种设置,一种是设置队列ttl,一种是设置消息ttl。第二种太细了,一般不用。

设置队列ttl

在生产者端进行修改。

  1. <rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
  2. <rabbit:queue-arguments>
  3. <entry key="x-message-ttl" value-type="long" value="5000"/>
  4. </rabbit:queue-arguments>
  5. </rabbit:queue>
  6. <rabbit:admin connection-factory="connectionFactory"/>
  7. <!-- 4.配置topic类型exchange;队列绑定到交换机 -->
  8. <rabbit:topic-exchange name="spring-topic-exchange">
  9. <rabbit:bindings>
  10. <rabbit:binding queue="test_spring_queue_ttl" pattern="msg.#"/>
  11. </rabbit:bindings>
  12. </rabbit:topic-exchange>
  1. 测试之前删除已有的队列,之后运行生产者即可看到效果。

设置消息ttl

首先把队列的消息ttl相关配置还原

  1. <!-- 2.配置队列 -->
  2. <rabbit:queue name="test_spring_queue_ttl2"/>
  3. <!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
  4. <rabbit:admin connection-factory="connectionFactory"/>
  5. <!-- 4.配置topic类型exchange;队列绑定到交换机 -->
  6. <rabbit:topic-exchange name="spring-topic-exchange">
  7. <rabbit:bindings>
  8. <rabbit:binding queue="test_spring_queue_ttl2" pattern="msg.#"/>
  9. </rabbit:bindings>
  10. </rabbit:topic-exchange>
  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.core.MessageProperties;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;
  5. import java.nio.charset.StandardCharsets;
  6. public class Sender2 {
  7. public static void main(String[] args) {
  8. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
  9. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  10. MessageProperties messageProperties = new MessageProperties();
  11. messageProperties.setExpiration("6000");
  12. Message message = new Message("测试".getBytes(StandardCharsets.UTF_8), messageProperties);
  13. rabbitTemplate.convertAndSend("msg.user", message);
  14. context.close();
  15. }
  16. }
  1. 一定要注意代码里引用的包。<br />另外,如果队列配置和消息配置都存在的情况下,会采用时间短的那个。

死信队列

DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时 消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中。
而绑定DLX交换机 的队列,称之为:“死信队列”

消息没有被及时消费的原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
消息超时未消费
达到最大队列长度

image.png

核心就是修改配置文件,这里创建了一个新的。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  5. xsi:schemaLocation="
  6. http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/rabbit
  9. http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  10. <rabbit:connection-factory
  11. id="connectionFactory"
  12. host="192.168.42.131"
  13. port="5672"
  14. username="jining"
  15. password="980909"
  16. virtual-host="/lagou"
  17. publisher-confirms="true"
  18. />
  19. <!-- 3.配置rabbitAdmin:主要用于在Java代码中对队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 -->
  20. <rabbit:admin connection-factory="connectionFactory"/>
  21. <!-- 6. 配置RabbitTemplate(消息生产者) -->
  22. <rabbit:template id="rabbitTemplate"
  23. connection-factory="connectionFactory"
  24. exchange="my_exchange"
  25. />
  26. <!-- dlx config begin-->
  27. <!-- 定义死信队列-->
  28. <rabbit:queue name="dlx_queue"/>
  29. <!--定向死信交换机-->
  30. <rabbit:direct-exchange name="dlx_exchange" >
  31. <rabbit:bindings>
  32. <rabbit:binding key="dlx_ttl" queue="dlx_queue"></rabbit:binding>
  33. <rabbit:binding key="dlx_max" queue="dlx_queue"></rabbit:binding>
  34. </rabbit:bindings>
  35. </rabbit:direct-exchange>
  36. <!-- 测试消息交换机-->
  37. <rabbit:direct-exchange name="my_exchange" >
  38. <rabbit:bindings>
  39. <rabbit:binding key="dlx_ttl" queue="test_ttl_queue">
  40. </rabbit:binding>
  41. <rabbit:binding key="dlx_max" queue="test_max_queue">
  42. </rabbit:binding>
  43. </rabbit:bindings>
  44. </rabbit:direct-exchange>
  45. <rabbit:queue name="test_ttl_queue">
  46. <rabbit:queue-arguments>
  47. <!--队列ttl6秒-->
  48. <entry key="x-message-ttl" value-type="long" value="6000"/>
  49. <!--超时 消息 投递给 死信交换机-->
  50. <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
  51. </rabbit:queue-arguments>
  52. </rabbit:queue>
  53. <!--测试超长度的队列-->
  54. <rabbit:queue name="test_max_queue">
  55. <rabbit:queue-arguments>
  56. <!--队列ttl6秒-->
  57. <entry key="x-max-length" value-type="long" value="2"/>
  58. <!--超时 消息 投递给 死信交换机-->
  59. <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
  60. </rabbit:queue-arguments>
  61. </rabbit:queue>
  62. </beans>
  1. public class SenderDLX {
  2. public static void main(String[] args) {
  3. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
  4. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  5. HashMap<String, String> map = new HashMap<>();
  6. map.put("name", "alice");
  7. map.put("email", "alice@gmail.com");
  8. // rabbitTemplate.convertAndSend("dlx_ttl", map);
  9. rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
  10. rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
  11. rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
  12. context.close();
  13. }
  14. }
  1. **注意,对于长度超出来说,会把旧的消息挤出去而不是新的。**

延迟队列

延迟队列:TTL + 死信队列的合体
死信队列只是一种特殊的队列,里面的消息仍然可以消费
在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题

具体实现:只要改一下消费者消费的队列名就可以了。

集群

rabbitmq有3种模式,但集群模式是2种。详细如下:
单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。之前我们一直在用
普通模式:默认模式,以两个节点(A、B)为例来进行说明
当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临时通道进行消息传输,把A中的消息实体取出并经过通过交给B发送给consumer
当A故障后,B就无法取到A节点中未消费的消息实体
如果做了消息持久化,那么得等A节点恢复,然后才可被消费
如果没有持久化的话,就会产生消息丢失的现象
镜像模式:非常经典的 mirror 镜像模式,保证 100% 数据不丢失。
高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步
对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集 群模式

还有主备模式,远程模式,多活模式等,不作介绍。

镜像模式准备

1 修改host文件,把两个机器改名成A B

  1. 127.0.0.1 A localhost localhost.localdomain localhost4
  2. localhost4.localdomain4
  3. ::1 A localhost localhost.localdomain localhost6
  4. localhost6.localdomain6
  5. 192.168.204.141 A
  6. 192.168.204.142 B

2 两台服务器的cookie文件要一致

  1. [root@A opt]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.204.142:/var/lib/rabbitmq
  1. 3 关闭防火墙重启mq服务
  1. [root@A ~]# systemctl stop firewalld
  2. [root@A ~]# systemctl start rabbitmq-server
  1. 4 加入集群节点
  1. [root@B ~]# rabbitmqctl stop_app
  2. [root@B ~]# rabbitmqctl join_cluster rabbit@A
  3. [root@B ~]# rabbitmqctl start_app

5 查看集群状态

  1. [root@B ~]# rabbitmqctl cluster_status
  1. 6 为集群模式单独添加用户,哪个节点都行。关闭集群后原有的用户恢复正常。
  1. root@A ~]# rabbitmqctl add_user laosun 123123
  2. [root@A ~]# rabbitmqctl set_user_tags laosun administrator
  3. [root@A ~]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"

镜像模式

image.png

  1. [root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'

HAProxy

虽然我们在程序中访问A服务器,可以实现消息的同步,虽然在同步,但都是A服务器在接收消 息,A太累
是否可以像Nginx一样,做负载均衡,A和B轮流接收消息,再镜像同步 ?

HAProxy工作在OSI的第四层和第七层,支持TCP与Http协议。只能做负载均衡。Nginx不仅仅是一款优秀的负载均衡器/反向代理软件,它同时也是功能强大的Web应用服务器。
性能上HA胜,功能性和便利性上Nginx胜
只做负载均衡可以用HAProxy。

KeepAlived

HAProxy也需要做高可用集群。
可以用KeepAlived实现。把多台设备的ip虚拟成一个ip对外开放。