RabbitMQ


安装 Docker
// 卸载系统之前的 dockersudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine// 安装 Docker-CEsudo yum install -y yum-utils device-mapper-persistent-data lvm2// 设置 docker repo 的 yum 源, 此处是阿里源sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo// 安装 docker, 以及docker-clisudo yum install docker-ce docker-ce-cli containerd.io// 启动 dockersudo systemctl start docker// 查看 docker 状态sudo systemctl status docker// 设置 docker 开机自启sudo systemctl enable docker// 重启dockersudo systemctl restart docker
Docker 安装 RabbitMQ
// 下载
docker pull rabbitmq:management
// 启动
docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq:management
// 设置开机自启
docker update rabbitmq --restart=always
三种使用方式
使用代码
/* 生产者 */
// 建立连接
ConnectionFactory factory = new ConnectionFactory(); // 创建 RabbitMQ 连接工厂
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.0.111");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!"; // 消息
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
/****************************************************************************/
/* 消费者 */
// 建立连接、设置连接参数、创建连接、创建队列 略
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ // 设置监听
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
使用 Spring AMQP
使用 Spring AMQP
- 发送流程
- 引入 AMQP 依赖
spring-boot-starter-amqp - 配置文件添加 mq 连接信息
- 注入 rabbitTemplate, 设置队列名称、消息, 调用 convertAndSend 发送消息
- 引入 AMQP 依赖
接收流程
- 引入 AMQP 依赖
spring-boot-starter-amqp - 配置文件添加 mq 连接信息
新建一个类使用 @Component 注解注入到 Spring 容器中, 编写方法 使用 @RabbitListener 指定队列名称, 编写方法接收消息
spring: rabbitmq: host: 192.168.0.111 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: guest # 用户名 password: guest # 密码// 消费者 @RabbitListener(queues = "simple.queue") // 绑定队列名称 public void listenSimpleQueueMessage(String msg) { // 传递过来是什么类型接受就是什么类型 // 处理业务逻辑 System.out.println("spring 消费者接收到消息:【" + msg + "】"); } // 生产者 public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); }基于注解声明队列和交换机
@RabbitListener(bindings = @QueueBinding( // 绑定关系 value = @Queue(name = "对"), // 队列名称 exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), // 交换机名称, 交换机类型 key = {"red", "blue"} // rk )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } // 生产者 public void testSendDirectExchange() { // 交换机名称 String exchangeName = "交换机名称"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }RabbitMQ 的优势
应用解耦(解除耦合)
- 引入 AMQP 依赖
- 发送流程
传统做法:每新增一项业务, 都要在原有的支付服务基础上去修改代码
- 接入MQ: 支付完成后发送消息不在关注后续业务, 后续新增业务订阅事件即可
异步处理(提升性能)
- 传统做法
- 串行: 支付成功 -> 订单业务 -> 仓储服务 -> 短信服务, 整体调用链路过长消耗性能
- 并行: 支付成功 -> 创建线程同时处理 剩余业务, 可以在串行上进一步提升性能
- 接入MQ: 支付成功 发送消息 直接返回, 不在等待后续业务完成
故障隔离
- 传统做法: 如果下游链路中某一个环节在使用中失败, 会导致整体业务不可用
- 接入MQ: 接入MQ: 支付成功 发送消息 直接返回, 不在等待后续业务完成, 出现问题的业务自己处理
流量消峰
RabbitMQ 的基本结构
RabbitMQ 消息模型
简单模型
- 一个生产者、一个队列、一个消费者

入门案例(生产者消费者都声明队列的原因是避免队列不存在)
工作队列(RabbitMQ 默认会做公平分发, 预取操作 先把消息平均发送给所有的 消费者)
广播模型(Fanout Exchange)
- 在 consumer 服务声明 Exchange、Queue、Binding
- 在 consumer 定义一个类, 添加 @configuration 注解, 并声明 FanoutExchange、Queue 和绑定关系对象 Bingding
- 在 consumer 服务声明两个消费者
- 添加两个方法, 分别监听绑定的队列
- 在 publisher 服务发送消息到 FanoutExchange
- 在 consumer 服务声明 Exchange、Queue、Binding
注: 交换机的作用
路由模型(Direct Exchange)
- 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct 类型的 Exchange
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
- 注意
- 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct 类型的 Exchange
通配符模型(Topic Exchange)
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符Routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert#:匹配一个或多个词*:匹配不多不少恰好1个词
- 注意
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
- Topic交换机接收的消息RoutingKey必须是多个单词,以






