1.1MQ概述

MQ简称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
image.png
小结:

  • MQ,消息队列,存储消息的中间件
  • 分布式系统通信的两种方式:直接远程调用 和 借助第三方 完成间接通信
  • 发送方称为生产者,接收方称为消费者

1.2 MQ的优势和劣势

优势:

  • 应用解耦:提高系统的容错性和可维护性

image.png
image.png

  • 异步提速:提升用户体验和系统吞吐量

image.png
image.png

  • 削峰填谷:提高系统稳定性

image.png
image.png
image.png

劣势:

  • 系统可用性降低
  • 系统复杂性提高
  • 一致性问题

image.png

小结

image.png

1.3 常见的 MQ 产品

image.png

1.4 RabbitMQ 简介

AMQP协议

image.png

RabbitMQ

image.png
image.png

image.png

JMS

image.png

小结

image.png

2.1 RabbitMQ快速入门:简单模式

简单模式
image.png

  1. 创建一个工程rabbitmq,在这个工程下面创建2个模块:rabbitmq-producer,rabbitmq-consumer。
  2. 修改两个pom.xml文件 ```xml com.rabbitmq amqp-client 5.14.2

org.apache.maven.plugins maven-compiler-plugin 3.10.1 1.8 1.8

  1. 3. 生产者代码
  2. ```java
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 发送消息
  10. */
  11. public class Producer_HelloWorld {
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. // 1.创建连接工厂
  14. ConnectionFactory factory = new ConnectionFactory();
  15. // 2.设置参数
  16. factory.setHost("192.168.133.128"); // ip 默认值为 localhost
  17. factory.setPort(5672); // 端口 默认值 5672
  18. factory.setVirtualHost("/"); // 虚拟机 默认值/
  19. factory.setUsername("xg"); // 用户名 默认值 guest
  20. factory.setPassword("167219"); // 密码 默认值 guest
  21. // 3.创建连接 Connection
  22. Connection connection = factory.newConnection();
  23. // 4.创建Channel
  24. Channel channel = connection.createChannel();
  25. // 5.HelloWorld方式,不需要交换机。创建队列Queue
  26. // 声明一个队列
  27. /**
  28. * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments</>)
  29. * 参数:
  30. * 1. queue:队列名称
  31. * 2. durable:是否持久化,当mq重启后,还在
  32. * 3. exclusive:
  33. * * 是否独占。只能有一个消费者监听这个队列
  34. * * 当Connection关闭时,是否删除队列
  35. * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  36. * 5. arguments:参数
  37. */
  38. // 如果没有一个名字叫做hello_world的队列,则会创建该队列;如果有则不会创建
  39. channel.queueDeclare("hello_world", true, false, false, null);
  40. // 6.发送消息
  41. /**
  42. * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  43. * 参数:
  44. * 1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
  45. * 2. routingKey:路由名称
  46. * 3. props:配置信息
  47. * 4. body:发送的消息数据(字节)
  48. */
  49. String body = "hello rabbitmq~~~";
  50. channel.basicPublish("", "hello_world", null, body.getBytes());
  51. // 7.释放资源
  52. channel.close();
  53. connection.close();
  54. }
  55. }

运行后,访问rabbitmq客户端:
image.png

  1. 消费者代码 ```java import com.rabbitmq.client.*;

import java.io.IOException; import java.util.concurrent.TimeoutException;

public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory();

  1. // 2.设置参数
  2. factory.setHost("192.168.133.128"); // ip 默认值为 localhost
  3. factory.setPort(5672); // 端口 默认值 5672
  4. factory.setVirtualHost("/"); // 虚拟机 默认值/
  5. factory.setUsername("xg"); // 用户名 默认值 guest
  6. factory.setPassword("167219"); // 密码 默认值 guest
  7. // 3.创建连接 Connection
  8. Connection connection = factory.newConnection();
  9. // 4.创建Channel
  10. Channel channel = connection.createChannel();
  11. // 5.HelloWorld方式,不需要交换机。创建队列Queue
  12. // 声明一个队列
  13. /**
  14. * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments</>)
  15. * 参数:
  16. * 1. queue:队列名称
  17. * 2. durable:是否持久化,当mq重启后,还在
  18. * 3. exclusive:
  19. * * 是否独占。只能有一个消费者监听这个队列
  20. * * 当Connection关闭时,是否删除队列
  21. * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  22. * 5. arguments:参数
  23. */
  24. // 如果没有一个名字叫做hello_world的队列,则会创建该队列;如果有则不会创建
  25. channel.queueDeclare("hello_world", true, false, false, null);
  26. // 6.消费(接收)消息
  27. /**
  28. * basicConsume(String queue, boolean autoAck, Consumer callback)
  29. * 参数:
  30. * 1. queue:队列名称
  31. * 2. autoAck:是否自动确认
  32. * 3. callback:回调对象
  33. */
  34. // 接收消息(匿名内部类方式,重写方法)
  35. Consumer consumer = new DefaultConsumer(channel) {
  36. /**
  37. * 回调方法,当收到消息后,会自动执行该方法
  38. * 参数:
  39. * 1. consumerTag:消息标识
  40. * 2. envelope:获取一些信息,交换机,路由key...
  41. * 3. properties:配置信息
  42. * 4. body:真实数据
  43. */
  44. @Override
  45. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  46. System.out.println("consumerTag: " + consumerTag);
  47. System.out.println("Exchange: " + envelope.getExchange());
  48. System.out.println("RoutingKey: " + envelope.getRoutingKey());
  49. System.out.println("properties: " + properties);
  50. System.out.println("body: " + new String(body));
  51. }
  52. };
  53. channel.basicConsume("hello_world", true, consumer);
  54. // 消费者要进行监听,不能关闭资源
  55. }

} ``` 运行后,访问rabbitmq客户端:
image.png

2.2 RabbitMQ 的工作模式

Work queues 工作队列模式

image.png
2、代码与简单模式类似
image.png
image.png

3、小结
image.png

Pub/Sub 订阅模式

image.png

Routing 模式

image.png

2、代码
3、小结
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

Topics 通配符模式—功能最强大

image.png
*代表一个单词
#代表一个或多个单词

2、代码
3、小结
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置 routing key 的时候可以使用通配符,显得更加灵活。

工作模式总结