0.官方文档
https://www.rabbitmq.com/getstarted.html
最好的学习还是看官方文档,
下面的课程是我结合官方文档加上自己在使用rabbitmq的工作中总结出来的心得体会!
1.什么是消息队列
1.1.MQ概述
MQ全称Message Queue,中文名称消息队列。顾名思义,它就是一个队列,简单来说就是一个应用程序A将数据丢到一个队列中,由另一个应用程序B从队列中拿到这个数据,再去做一些其他的业务操作。我们把应用程序A叫做生产者,应用程序B叫做消费者,它们之间传输的数据称作消息。
1.2.MQ的使用场景
1、解耦
假设系统A通过调用接口推送数据给B、C、D,如果后续系统E也需要被推送、或者B不再需要被推送呢?那我们就需要修改系统A的代码,加上给E推送数据的逻辑,去掉给B推送数据的逻辑。显然A系统和其他系统严重耦合,在这个场景中,如果使用 MQ,通过发布订阅模型,就可以实现A和其他系统的解耦。A 产生一条数据,发送到 MQ ,哪个系统需要就去订阅消费,如果某个系统不再需要,取消对消息的订阅即可。
2、异步
将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
3、削峰限流
商城秒杀活动我们都不陌生,对于商城系统,可能在0点左右会有个短暂的高峰期,但其余时间的并发量也没那么高,假如我们的后台系统直接操作数据库,平时可能没什么问题,但如果突然有很高的的并发量进来,就会因为MySQL并发量过大导致系统瘫痪。
如果使用MQ,请求会短期积压在MQ中,后台系统从MQ中分批拉取消息,从而保证数据库不会被压垮。等高峰期一过,系统就会将MQ中积压的消息慢慢解决掉。这就是MQ的”削峰限流”作用。
1.3.引入MQ对系统的影响
1、系统的可用性降低
系统的可用性是指系统服务不中断运行的时间占实际运行时间的比例。高可用就是指系统服务不中断运行时间占实际运行时间的比例大。
系统引入的外部依赖越多,越容易挂掉,所以引入MQ后系统的可用性可能会降低。
2、系统的复杂度增加
引入MQ以后要多考虑很多方面的问题,比如怎么保证消息的可靠传输、怎么保证消息的幂等性等,提高了系统的复杂度。
1.2.RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
2.RabbitMQ之快速应用
2.1.理解五大核心
- 生产者
- 交换机
- 绑定key(生产者发送的数据要到那个队列去,最终由绑定key决定)
- 队列
- 消费者
重要总结:
1.生产者将数据发送到交换机,并告通过绑定key,告知交换机这些数据要进入那些队列,即:发送消息需要参数 交换机和绑定key;
2.消费者需要明确知道自己要消费那个交换机通过那个绑定Key发送到那个队列的数据,即:消费消息需要参数 交换机、绑定key、队列;
备注:消息队列最重要的就是学习如何放入消息,如何取出消息,上面两句总结是学习与理解RabbitMQ的重要思想,一定要多理解思考。
2.2.安装RabbitMQ
详见博客:
docker容器安装(推荐)
https://www.cnblogs.com/newAndHui/p/14969320.html
直接在Linux系统上安装:
https://www.cnblogs.com/newAndHui/p/14914862.html
2.3.Hello World代码案例
生产者
package demo01;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Copyright (C) XXXXXXXXXXX科技股份技有限公司
* @Author: lidongping
* @Date: 2021-06-30 10:40
* @Description:
*/
public class Producer {
private final static String HOST = "192.168.25.129";
private final static Integer PORT = 5672;
private final static String USER_NAME = "admin";
private final static String PASSWORD = "123456";
private final static String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
// channel 实现了自动 close 接口 自动关闭 不需要显示关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 定义一个队列
* 1.队列名称 queue the name of the queue
* 2.队列里面的消息是否持久化 默认消息存储在内存中 durable true if we are declaring a durable queue (the queue will survive a server restart)
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 exclusive true if we are declaring an exclusive queue (restricted to this connection)
* 4.是否自动删除 最后一个消费者消费以后 该队列是否自动删除 true 自动删除 autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* 5.其他参数 arguments other properties (construction arguments) for the queue
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world03";
/**
* 发送一个消息
* 1.发送到那个交换机,不填写表示发送到默认交换机 exchange the exchange to publish the message to
* 2.路由的 key 是哪个 routingKey the routing key
* 3.其他的参数信息 props other properties for the message - routing headers etc
* 4.发送消息的消息体 the message body
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
消费者
package demo01;
import com.rabbitmq.client.*;
/**
* @Copyright (C) XXXXXXXXXXX科技股份技有限公司
* @Author: lidongping
* @Date: 2021-06-30 10:55
* @Description:
*/
public class Consumer {
private final static String HOST = "192.168.25.129";
private final static Integer PORT = 5672;
private final static String USER_NAME = "admin";
private final static String PASSWORD = "123456";
private final static String QUEUE_NAME = "test_queue_name";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
// channel 实现了自动 close 接口 自动关闭 不需要显示关闭
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//队列不存在时会报错 no queue 'test_queue_name' in vhost '/', class-id=60, method-id=20
/**
* 定义一个队列
* 1.队列名称 queue the name of the queue
* 2.队列里面的消息是否持久化 默认消息存储在内存中 durable true if we are declaring a durable queue (the queue will survive a server restart)
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 exclusive true if we are declaring an exclusive queue (restricted to this connection)
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* 5.其他参数 arguments other properties (construction arguments) for the queue
*/
System.out.println("等待接收消息.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("接收到消息:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列 queue the name of the queue
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 autoAck true if the server should consider messages
* 3.消费者成功消费的回调 deliverCallback callback when a message is delivered
* 4.消费取消的回调 cancelCallback callback when the consumer is cancelled
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
代码优化抽取一个获取channel的工具类,方便后面写代码
package common;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Copyright (C) XXXXXXXXXXX科技股份技有限公司
* @Author: lidongping
* @Date: 2021-06-30 12:02
* @Description:
*/
public class RabbitMqUtils {
private final static String HOST = "192.168.25.129";
private final static Integer PORT = 5672;
private final static String USER_NAME = "admin";
private final static String PASSWORD = "123456";
/**
* 得到一个连接的 channel
*
* @return
* @throws Exception
*/
public static Channel getChannel() throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
return channel;
}
}
2.4.交换机hello world
详细的关于交换机的理论知识点,我们后面后详细介绍,
现在我们先写一个hello world案例,或许你就明白他的作用了
对交换机的理解非常重要,下面的案例代码非常简单,但是理解非常重要
特别是这幅图,我再次把它请出来,以表示它的重要性
生产者 交换机 绑定/路由key 消费者,这几个术语一定要200%的理解;
生产者通过是把消息放入交换机,通过路由key 到指定的 队列 ;
上两句换就是学习rabbitmq的核心,理解了后面都只是api的调用而已,非常简单。
废话不多说直接上代码
生产者
package demo02Exchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import common.RabbitMqUtils;
import java.util.Scanner;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 07/10 5:08
* @description <p>
* 交换机demo案例理解
* </p>
*/
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange_name";
private final static String ROUTING_KEY = "test_routing_key_name";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个direct交换机
/**
* 参数说明
*1.交换机名称 exchange the name of the exchange
* 2.交换机类型 the exchange type DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
System.out.println("请输入消息:");
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
消费者
package demo02Exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import common.RabbitMqUtils;
/**
* @Copyright (C) XXXXXXXXXXX科技股份技有限公司
* @Author: lidongping
* @Date: 2021-06-30 13:05
* @Description:
*/
public class Consumer {
private final static String EXCHANGE_NAME = "test_exchange_name9";
private final static String ROUTING_KEY = "test_routing_key_name";
private static String QUEUE_NAME = "test_queue_name2";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 如果交换机存在,该行代码可以不写,交换机不存在在绑定时会报错 no exchange 'test_exchange_name9' in vhost '/'
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 如果队列存在,该行代码可以不写,队列不存在会报错 no queue 'test_queue_name' in vhost '/'
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 这个绑定非常重要,只需要绑定一次
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("消息监听中........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange=" + exchange + " ,routingKey=" + routingKey + " ,接收到的消息:" + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
2.5RabbitMQ整合Springboot
案例高度结合实际开发
模拟的是保存用户后,将用户名放入队列,
通过队列监听消费消息
队列通过配置文件动态监听
废话不多说直接上代码
项目结果如图:
为了更加接近实际项目,项目中有很多架构上的代码
下面只贴出有关rabbitmq的代码,方便大家直接拷贝到直接的项目中
当然大家也可以直接下载课程演示中的完整代码
整合MQ步骤
步骤一:引入rabbitmq包
implementation group: ‘org.springframework.boot’, name: ‘spring-boot-starter-amqp’, version: ‘2.3.12.RELEASE’
步骤二:application.yml中引入mq配置
# 配置端口
server:
port: 8080
servlet:
context-path: /api
logging:
config: classpath:logback.xml
level:
com.ldp.rabbitmq.sys.mapper: DEBUG
spring:
# 配置数据源
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/mp-data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: admin
type: com.alibaba.druid.pool.DruidDataSource
rabbitmq:
host: 192.168.25.129
port: 5672
username: admin
password: 123456
virtual-host: /
# mybatis-plus相关配置
mybatis-plus:
# xml扫描,多个目录用逗号或者分号分隔(告诉 Mapper 所对应的 XML 文件位置)
mapper-locations: classpath:mapper/*.xml
# 以下配置均有默认值,可以不设置
global-config:
db-config:
#主键类型 AUTO:"数据库ID自增" INPUT:"用户输入ID",ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
id-type: auto
#字段策略 IGNORED:"忽略判断" NOT_NULL:"非 NULL 判断") NOT_EMPTY:"非空判断"
field-strategy: NOT_EMPTY
#数据库类型
db-type: ORACLE
configuration:
# 是否开启自动驼峰命名规则映射:从数据库列名到Java属性驼峰命名的类似映射
map-underscore-to-camel-case: true
# 如果查询结果中包含空值的列,则 MyBatis 在映射的时候,不会映射这个字段
call-setters-on-nulls: true
# 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
rabbitmq:
config:
exchange: test2_exchange_name
listerQueues: test2_routing_key_name #,如无监听队列不填写 这里默认key与队列名称一样,这也是生产上的常用设计(简化)
routingKey: test2_routing_key_name
#自定义参数
tc:
mq-max-count: 1000 #队列积压数
executor:
corePoolSize: 30
maxPoolSize : 40
queueCapacity: 1000
步骤三:编写RabbitMqConfiguration.java配置文件
package com.ldp.rabbitmq.common.config;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Copyright (C) XXXXXXXXXX科技有限公司
* @Author: lidongping
* @Date: 2021/6/24 13:44
* @Description:
*/
@Configuration
@Slf4j
public class RabbitMqConfiguration {
@Value("${rabbitmq.config.exchange}")
private String exchangeName;
@Value("${rabbitmq.config.listerQueues}")
private String queueNames;
@Autowired
private ConfigurableApplicationContext context;
@Bean
public DirectExchange directExchange() {
return new DirectExchange(exchangeName, true, false);
}
/**
* 注册绑定
*/
@Bean
public void createQueueAndBind() {
if (StrUtil.isEmpty(queueNames)) {
log.info("无监听队列............");
return;
}
String[] queueList = queueNames.split(",");
for (String queueName : queueList) {
// 队列进入容器
Queue beanQueue = new Queue(queueName, true);
context.getBeanFactory().registerSingleton(queueName, beanQueue);
log.info("注册队列{},到容器,{}", queueName, beanQueue);
// 绑定进入容器
String routingKey = queueName;
String beanBindingName = queueName + "BindExchange";
Binding beanBinding = BindingBuilder.bind(beanQueue).to(directExchange()).with(routingKey);
context.getBeanFactory().registerSingleton(beanBindingName, beanBinding);
log.info("队列名称{},绑定key{},到容器,{}", queueName, routingKey, beanBinding);
}
}
}
步骤四:编写生产者
package com.ldp.rabbitmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 07/10 9:51
* @description
*/
@Slf4j
@Service
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.config.exchange}")
private String exchangeName;
@Value("${rabbitmq.config.routingKey}")
private String routingKey;
public void convertAndSend(String message) {
log.info("进入队列:exchangeName={},routingKey={},message={}", exchangeName, routingKey, message);
// 保存到队列
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
步骤五:编写消费者
package com.ldp.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RabbitMqConsumer {
@Value(value = "${rabbitmq.config.listerQueues}")
public String[] queueNames;
@Bean
public String[] queueNames() {
return queueNames;
}
@RabbitListener(queues = {"#{queueNames}"})
public void accept(Message message) {
String exchange = message.getMessageProperties().getReceivedExchange();
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
String queue = message.getMessageProperties().getConsumerQueue();
// 队列中取值
String param = new String(message.getBody());
log.info("接收到消息 exchange= {} ,routingKey = {},queue = {}, param = {} ", exchange, routingKey, queue, param);
// TODO 调用业务方法处理
log.info("调用业务方法处理中....");
}
}
步骤六:业务中使用
步骤七:测试
以上是MQ相关的全部代码,其他代码请下载项目演示
启动项目访问:http://127.0.0.1:8080/api/sysUser/save?name=zhangsan
请求日志:
2021-07-10 22:46:03.850 - Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-07-10 22:46:03.850 - Initializing Servlet 'dispatcherServlet'
2021-07-10 22:46:03.867 - Completed initialization in 17 ms
2021-07-10 22:46:03.925 - ContentType: null
2021-07-10 22:46:03.925 - 请求地址: http://127.0.0.1:8080/api/sysUser/save
2021-07-10 22:46:03.925 - 请求方法: GET
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6a9d7329] was not registered for synchronization because synchronization is not active
JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@41beca26] will not be managed by Spring
==> Preparing: INSERT INTO sys_user ( name, create_time, update_time ) VALUES ( ?, ?, ? )
==> Parameters: zhangsan(String), null, null
<== Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6a9d7329]
2021-07-10 22:46:04.372 - 进入队列:exchangeName=test2_exchange_name,routingKey=test2_routing_key_name,message={"id":49,"name":"zhangsan"}
2021-07-10 22:46:04.395 - 接收到消息 exchange= test2_exchange_name ,routingKey = test2_routing_key_name,queue = test2_routing_key_name, param = {"id":49,"name":"zhangsan"}
2021-07-10 22:46:04.395 - 调用业务方法处理中....
2021-07-10 22:46:04.812 - 响应结果: {"message":"操作成功","code":100,"data":"zhangsan"}
2021-07-10 22:46:04.812 - HTTP状态: 200
2021-07-10 22:46:04.813 - 处理时长: 887毫秒
到这里,如果项目时间紧,你可以开始使用rabbitmq了,常规的mq使用基本就这样
不过有时间的话,还是建议大家还是认真把后面学完(不要是什么都是半吊子),
后面是重点讲知识点,rabbitmq的一写特性等
3.rabbitMq常用知识点
3.1.工作队列
官方文档:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
工作队列模式:一个生产者,一个消息队列,多个消费者,同样也称为点对点模式。
工作队列优点:有多个消费者,这样就不会因为处理耗时的任务导致MQ不可用。
这种模式也是生产中常用的一种模式:
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange_name";
private final static String ROUTING_KEY = "test_routing_key_name";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个direct交换机
/**
* 参数说明
*1.交换机名称 exchange the name of the exchange
* 2.交换机类型 the exchange type DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
System.out.println("请输入消息:");
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
消费者01:
public class Consumer01 {
private final static String EXCHANGE_NAME = "test_exchange_name";
private final static String ROUTING_KEY = "test_routing_key_name";
private static String QUEUE_NAME = "test_queue_name";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Consumer01消息监听中........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01接收到的消息:" + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
3.2.消息应答ACK机制
通俗的讲就是:服务端推送一个消息给你(消费者),你处理完成后要告诉服务端处理的结果(成功还是失败)。
3.2.1.概念理解
Message acknowledgment(消息应答)执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。
一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。
在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。
消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。
这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true关闭这种机制。
现即自动应答开,一旦我们完成任务,消费者会自动发送应答。
通知RabbitMQ消息已被处理,可以从内存删除。
如果消费者因宕机或链接失败等原因没有发送ACK,则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
3.2.2.自动应答
消息发送后立即被认为已经处理成功,
这种模式需要在高吞吐量和数据传输安全性方面做权衡,
因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失 了,
当然另一方面这种模式消费者接收过多的消息,没有对传递的消息数量进行限制,
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使
得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以
某种速率能够处理这些消息的情况下使用,
或者建立专门处理该类消息的线程池进行异步处理(这是生产上常用的做法,这句话很值钱,是经验总结而非理论知识)。
3.2.3.代码演示
这里模拟2个消费者,通过线程睡眠模拟不同的消息处理速度。
生产者:与之前的代码一样
消费者:
public class Consumer {
private final static String EXCHANGE_NAME = "test_exchange_name";
private final static String ROUTING_KEY = "test_routing_key_name";
private static String QUEUE_NAME = "test_queue_name";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Consumer消息监听中........... ");
// 收到消息后回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
System.out.println("Consumer接收到的消息:" + message + " ,deliveryTag=" + deliveryTag);
// 模拟睡眠2秒钟
sleepDemo(2);
// 这里模拟message长度小于2时应答成功,等于2应答失败重新进入队列(注意这里会死循环),大于2小于5应答失败不在处理,其他取消
if (message.length() < 2) {
/**
* 参数说明
* 1.deliveryTag 消息标记,可以理解为消息id the tag from the received
* 2.multiple 是否批量应答
*/
System.out.println("message长度小于2时应答成功:basicAck");
channel.basicAck(deliveryTag, false);
} else if (message.length() == 2) {
/**
* 参数说明
* 前两个参数与basicAck含义一样
* 3.true 表示重新进入队列
*/
System.out.println("message长度等于2时应答失败,重新进入队列-死循环:basicNack");
channel.basicNack(deliveryTag, false, true);
} else if (message.length() > 2 && message.length() < 5) {
// 参数含义与 basicNack 含义一样,只是少一个参数
System.out.println("message长度大于2小于5时应答失败,不在进入队列:basicReject");
channel.basicReject(deliveryTag, false);
} else {
// 取消消费者对队列的订阅关系
System.out.println("message>=5时应答失败,取消队列:basicCancel");
channel.basicCancel(consumerTag);
}
};
// 消息拒绝时回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("------------------取消消费:consumerTag=" + consumerTag);
};
// 表示非自动应答,即手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
线程睡眠包装方法
/**
* 线程睡眠包装方法
* @param second
*/
public static void sleepDemo(long second) {
try {
Thread.sleep(second * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3.3.RabbitMQ持久化
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),
这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。当然还是会有一些小概率事件会导致消息丢失。
确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
1.队列持久化
// 队列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
2.消息持久化
// MessageProperties.PERSISTENT_TEXT_PLAIN 表示队列持久化
_channel.basicPublish(**_EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN**, message.getBytes());
3.4.不公平分发-预取值
意思就是那个消费者消费得快就把消息推送给谁,这个在很多负载均衡策略中都是这样的,也是合理的,正所谓能者多劳!
实现方式:
// 不公平分发策略
int prefetchCount = 5;
channel.basicQos(prefetchCount);
4.发布确认-重点
通俗的理解就是:消息发布后,等rabbitmq服务器收到后告知,服务已收到.
(其实只要在架构设计上先入库,在通过队列驱动流程,都不会使用发布确认,这句话很值钱)
不过大家还是认真理解一下,下面的发布确认代码
单笔确认
package demo07Confirm;
import com.rabbitmq.client.Channel;
import common.RabbitMqUtils;
import java.util.UUID;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 07/11 4:39
* @description <p>
* 单个发布确认
* 吞吐量:每秒处理约 数百个
* 当前实验中: 发布10000个单独确认消息,耗时6173ms
* 常规项目都能满足
* </p>
*/
public class Producer01Single {
/**
* 单个消息发送确认
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
int count = 10000;
for (int i = 0; i < count; i++) {
String message = "发送第:" + i + "条消息.";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
//System.out.println(message + "成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个单独确认消息,耗时" + (end - begin) + "ms");
}
}
批量确认
package demo07Confirm;
import com.rabbitmq.client.Channel;
import common.RabbitMqUtils;
import java.util.UUID;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 07/11 4:39
* @description <p>
* 批量发布确认
* 吞吐量:每秒处理约 数百个
* 当前实验中: 发布10000个 批量 确认消息,耗时1293ms
* 常规项目都能满足
* </p>
*/
public class Producer02Batch {
/**
* 批量消息发送确认
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 总共发布条数
int count = 10000;
// 每批次确认条数
int confirmCount = 100;
// 当前批次累计条数
int currentCount = 0;
for (int i = 0; i < count; i++) {
String message = "发送第:" + i + "条消息.";
channel.basicPublish("", queueName, null, message.getBytes());
currentCount++;
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
if (confirmCount == currentCount) {
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个 批量 确认消息,耗时" + (end - begin) + "ms");
}
}
异步确认
package demo07Confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import common.RabbitMqUtils;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* @author 姿势帝-博客园
* @address https://www.cnblogs.com/newAndHui/
* @WeChat 851298348
* @create 07/11 4:39
* @description <p>
* 异步发布确认
* 吞吐量:每秒处理约 数百个
* 当前实验中: 发布10000个异步确认消息,耗时922ms
* 常规项目都能满足
* </p>
*/
public class Producer03Async {
/**
* 异步消息发送确认
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调,参数说明
* 1.sequenceNumber:消息id
* 2.multiple: true 可以确认小于等于当前序列号的消息 异步确认,false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) { // 异步确认
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
} else {
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.ackCallback : 确认收到消息的回调
* 2.nackCallback : 未收到消息的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
// 总共发布条数
int count = 10000;
long begin = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
String message = "发送第:" + i + "条消息.";
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + count + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
三种确认方式比对
单个 当前实验中: 发布10000个单独确认消息,耗时6173ms
批量 当前实验中: 发布10000个 批量 确认消息,耗时1293ms
异步 当前实验中: 发布10000个异步确认消息,耗时922ms
如果非要做发布确认建议使用异步发布确认
5.交换机
5.1.交换机的概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
5.2.默认交换机
5.3.交换机的类型
5.3.1.扇出(fanout)
Fanout 是将接收到的所有消息广播到与它绑定的所有队列中,与routingKey无关
实际案例:
生产者
/**
* 应用场景:
* 收单订单后,
* 1.系统做订单的下一步处理(如绑定,发货等)
* 2.将订单信息通知到其他合作商(如下游通知等)
* </p>
*/
public class Producer {
private final static String EXCHANGE_NAME = "order_exchange_fanout";
private final static String ROUTING_KEY = "order_routing_key";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("模拟订单号进入队列,请输入订单号:");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes("UTF-8"));
System.out.println("订单号:" + message + " 已入队列");
}
}
}
发货消费者
/**
* 监听代发货的订单号
*/
public class ConsumerDelivery {
private final static String EXCHANGE_NAME = "order_exchange_fanout";
private final static String ROUTING_KEY = "order_routing_key";
private final static String QUEUE_NAME = "order_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("等待接收待发货订单号.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("待发货订单号:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
订单通知消费者
/**
* 监听待通知的订单号
*/
public class ConsumerNotify {
private final static String EXCHANGE_NAME = "order_exchange_fanout";
private final static String ROUTING_KEY = "order_routing_key";
private final static String QUEUE_NAME = "order_notify_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("等待接收待通知订单号.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("待通知订单号:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
测试:
在上面的测试中任意修改routingKey都可以收到消息.
重要结论:只要是fanout类型的交互机,routingKey是无效的,即任何rountingKey都是可以接收到消息的;
5.3.2.直接(direct)
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。<br />实际案例:<br />![图片.png](https://cdn.nlark.com/yuque/0/2021/png/1628584/1627357107016-f86d18b9-1378-4751-8684-31f5fa9a4b22.png#clientId=u97edecca-41b5-4&from=paste&height=402&id=u6f4fcd53&name=%E5%9B%BE%E7%89%87.png&originHeight=402&originWidth=779&originalType=binary&ratio=1&size=26613&status=done&style=none&taskId=u0c91580c-1aa9-4fa8-a769-3abd4c6452b&width=779)<br />生产者
/**
* 应用场景:
* 收单后订单处理流程为:进入库存队列-->进入发货队列--->资金变动队列
*/
public class Producer {
private final static String EXCHANGE_NAME = "order_exchange_direct";
private static Map<String, String> ROUTING_KEYS = new HashMap<>();
static {
ROUTING_KEYS.put("1", "delivery_routing_key");
ROUTING_KEYS.put("2", "stock_routing_key");
ROUTING_KEYS.put("3", "account_routing_key");
}
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner sc = new Scanner(System.in);
// 输入格式为 路由编号|订单号 如: 1|N1001
System.out.println("模拟不同业务数据绑定不同的key,请输入:");
while (sc.hasNext()) {
String input = sc.nextLine();
String[] split = input.split(",");
String routingKey = ROUTING_KEYS.get(split[0]);
String message = split[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("订单号:" + message + " 已入队列,routingKey=" + routingKey);
}
}
}
库存消费者
/**
* 监听待处理的库存订单号
*/
public class ConsumerStock {
private final static String EXCHANGE_NAME = "order_exchange_direct";
private final static String ROUTING_KEY = "stock_routing_key";
private final static String QUEUE_NAME = "stock_notify_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("监听待处理的库存订单号.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("待处理的库存订单号:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
发货消费者
/**
* 监听代发货的订单号
*/
public class ConsumerDelivery {
private final static String EXCHANGE_NAME = "order_exchange_direct";
private final static String ROUTING_KEY = "delivery_routing_key";
private final static String QUEUE_NAME = "order_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("等待接收待发货订单号.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("待发货订单号:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
5.3.3.主题(topic)
一个队列可以通过*或#监听多个routingKey.
实际案例:
生产者
/**
* 主题交换机
* 应用场景:
* 一个队列接收多个routingKey的消息
* 重点1:
* routing_key :命名规范,它必须是一个单词列表,以点号分隔开,
* 如: id.order.product.routing.key 是合法的, 可以匹配绑定的routing_key=id.order.product.routing.key
* 如: id.*.phone.routing.key 是合法的,*(星号)可以代替一个单词, 可以匹配绑定的routing_key=id.number.phone.routing.key
* 如: id.#.phone.routing.key 是合法的,#(井号)可以替代零个或多个单词, 可以匹配绑定的routing_key=id.number.phone.routing.key 和 id.number.notify.phone.routing.key
* 重点2:
* 当一个队列绑定键是#,那么这个队列将接收所有数据,类似fanout 了
* 当一个队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
*/
public class Producer {
private final static String EXCHANGE_NAME = "order_exchange_topic";
private static Map<String, String> ROUTING_KEYS = new HashMap<>();
static {
ROUTING_KEYS.put("1", "id.order.product.routing.key");
ROUTING_KEYS.put("2", "id.number.phone.routing.key");
ROUTING_KEYS.put("3", "id.number.notify.phone.routing.key");
}
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner sc = new Scanner(System.in);
// 输入格式为 路由编号|订单号 如: 1|N1001
System.out.println("模拟不同业务数据绑定不同的key,请输入:");
while (sc.hasNext()) {
String input = sc.nextLine();
String[] split = input.split(",");
String routingKey = ROUTING_KEYS.get(split[0]);
String message = split[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("订单号:" + message + " 已入队列,routingKey=" + routingKey);
}
}
}
消费者1
**
* 监听队列
*/
public class Consumer01 {
private final static String EXCHANGE_NAME = "order_exchange_topic";
private final static String ROUTING_KEY = "id.order.product.routing.key";
private final static String QUEUE_NAME = "order01_topic_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("监听消息,当前绑定的routing key=" + ROUTING_KEY);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("实际routingKey:" + routingKey);
System.out.println("message:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者2
修改:消费者1代码
private final static String ROUTING_KEY = “id.order.product.routing.key”;
改为:
private final static String ROUTING_KEY = “id.*.phone.routing.key”;
消费者3
修改:消费者1代码
private final static String ROUTING_KEY = “id.order.product.routing.key”;
改为:
private final static String ROUTING_KEY = “id.#.phone.routing.key”;
测试
5.3.4.标题(headers)
这个不是通过routingKey进行匹配,而是一个map参数匹配的,个人觉得有点复杂,不好理解,一般生产上用得也少,这里只是简单学习一下.
实际案例
生产者
/**
* headers交换机
* 应用场景:
*/
public class Producer {
private final static String EXCHANGE_NAME = "order_exchange_headers";
public static Map<String, AMQP.BasicProperties> propsMap = new HashMap<>(3);
/**
* 消息header数据里有一个特殊值”x-match”,它有两个值
* all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
* any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
*
* 队列01:绑定交换机参数是:name=product,type=report,x-match=all,接收到消息1
* 队列02: 绑定交换机参数是:name=product,type=report,x-match=any,接收到消息1,2
* 队列03:绑定交换机参数是:name=user,type=wait,x-match=all,接收到消息4
*
* 消息1: 发送交换机的头参数是:name=product,type=report
* 消息2: 发送交换机的头参数是:name=product
* 消息3: 发送交换机的头参数是:type=reply,消息丢失没有可以满足条件的队列
* 消息4: 发送交换机的头参数是:name=user,type=wait
* */
static {
// 属性参数1
Map<String, Object> headers1 = new HashMap<>(3);
headers1.put("name", "product");
headers1.put("type", "report");
propsMap.put("1", new AMQP.BasicProperties.Builder().headers(headers1).build());
// 属性参数2
Map<String, Object> headers2 = new HashMap<>(3);
headers2.put("name", "product");
propsMap.put("2", new AMQP.BasicProperties.Builder().headers(headers2).build());
// 属性参数3
Map<String, Object> headers3 = new HashMap<>(3);
headers3.put("type", "reply");
propsMap.put("3", new AMQP.BasicProperties.Builder().headers(headers3).build());
// 属性参数4
Map<String, Object> headers4 = new HashMap<>(3);
headers4.put("name", "user");
headers4.put("type", "wait");
propsMap.put("4", new AMQP.BasicProperties.Builder().headers(headers4).build());
}
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, "headers");
Scanner sc = new Scanner(System.in);
// 格式:参数序号,消息,例如: 1,N001
System.out.println("请输入队列消息和参数:");
while (sc.hasNext()) {
String input = sc.nextLine();
String[] split = input.split(",");
AMQP.BasicProperties properties = propsMap.get(split[0]);
String message = split[1];
channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes("UTF-8"));
System.out.println("订单号:" + message + " 已入队列,headers=" + properties.getHeaders());
}
}
}
消费者1
/**
* 监听队列
*/
public class Consumer01 {
private final static String EXCHANGE_NAME = "order_exchange_headers";
private final static String QUEUE_NAME = "order01_headers_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列01:绑定交换机参数是:name=product,type=report,x-match=all,接收到消息1
Map<String, Object> headers = new HashMap<>(3);
headers.put("name", "product");
headers.put("type", "report");
headers.put("x-match", "all");
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
System.out.println("Consumer01监听消息,当前绑定的headers=" + headers);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
Map<String, Object> headersP = delivery.getProperties().getHeaders();
System.out.println("message:" + message);
System.out.println("headersP:" + headersP);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
//消费者消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者2
/**
* 监听队列
*/
public class Consumer02 {
private final static String EXCHANGE_NAME = "order_exchange_headers";
private final static String QUEUE_NAME = "order02_headers_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列02: 绑定交换机参数是:name=product,type=reply,x-match=any,接收到消息1,2,3
Map<String, Object> headers = new HashMap<>(3);
headers.put("name", "product");
headers.put("type", "reply");
headers.put("x-match", "any");
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
System.out.println("Consumer02监听消息,当前绑定的headers=" + headers);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
Map<String, Object> headersP = delivery.getProperties().getHeaders();
System.out.println("message:" + message);
System.out.println("headersP:" + headersP);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
//消费者消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者3
/**
* 监听队列
*/
public class Consumer03 {
private final static String EXCHANGE_NAME = "order_exchange_headers";
private final static String QUEUE_NAME = "order01_headers_delivery_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 队列03:绑定交换机参数是:name=user,type=wait,x-match=all,接收到消息4
Map<String, Object> headers = new HashMap<>(3);
headers.put("name", "user");
headers.put("type", "wait");
headers.put("x-match", "all");
// 绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
System.out.println("Consumer03监听消息,当前绑定的headers=" + headers);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
Map<String, Object> headersP = delivery.getProperties().getHeaders();
System.out.println("message:" + message);
System.out.println("headersP:" + headersP);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
//消费者消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
5.4.临时队列
定义临时队列:
_// 声明一个临时队列
_String queueName = channel.queueDeclare().getQueue();
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Consumer消息监听中........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer接收到的消息:" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
6.死信队列
案例演示
生产者
/**
* 死信队列演示
* 应用场景:
* 订单处理完成后,通知下游,当前下游一直通知失败,即该消息消费不了或者超时.
*
* 产生死信队列的几种情况
* 1. 消息 TTL 过期
* 2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
* 3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
*/
public class Producer {
/**
* 正常业务
*/
private final static String EXCHANGE_NAME = "Ldp_order_exchange";
private final static String ROUTING_KEY = "Ldp_order_routing_key";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 5秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
Scanner sc = new Scanner(System.in);
System.out.println("模拟订单号进入队列,请输入订单号:");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes("UTF-8"));
System.out.println("订单号:" + message + " 已入队列");
}
}
}
普通消费者
/**
* 监听代发货的订单号
*/
public class ConsumerNotify {
/**
* 正常业务
*/
private final static String EXCHANGE_NAME = "Ldp_order_exchange";
private final static String ROUTING_KEY = "Ldp_order_routing_key";
private final static String QUEUE_NAME = "Ldp_order_notify_queue";
/**
* 异常业务:才有私信队列处理
*/
private static final String DEAD_EXCHANGE_NAME = "Ldp_order_dead_exchange";
private final static String DEAD_ROUTING_KEY = "Ldp_order_dead_routing_key";
private final static String DEAD_QUEUE_NAME = "Ldp_order_dead_notify_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明死信队列 与 绑定
channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_ROUTING_KEY);
/**
* 正常队列 正常队列绑定死信队列信息
* 1.x-dead-letter-exchange:正常队列设置死信交换机 参数 key 是固定值
* 2.正常队列设置死信 routing-key 参数 key 是固定值
* 3.x-message-ttl 设置队列里消息的ttl的时间30s
*/
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
params.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// params.put("x-message-ttl", 5 * 1000);
channel.queueDeclare(QUEUE_NAME, false, false, false, params);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("消息监听.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("订单号:" + message);
};
//消费者消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {
});
}
}
死信队列消费者
/**
* 监听死信队列
*/
public class ConsumerDead {
/**
* 异常业务:死信队列处理
*/
private static final String DEAD_EXCHANGE_NAME = "Ldp_order_dead_exchange";
private final static String DEAD_ROUTING_KEY = "Ldp_order_dead_routing_key";
private final static String DEAD_QUEUE_NAME = "Ldp_order_dead_notify_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明死信队列 与 绑定
channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null);
channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_ROUTING_KEY);
System.out.println("监听中.........");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
String exchange = delivery.getEnvelope().getExchange();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("message:" + message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("-->消息消费被中断");
};
/**
* 消费者消费消息
*/
channel.basicConsume(DEAD_QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
测试
测试的时候 关闭普通的消费者,让消息过期,进入死信队列.
7.延迟队列
使用场景
1.用户下单后,10分钟检查用户是否支付,如果没有支付就关闭订单;
2.退款申请提交后,等5分钟查询是否退款成功;
等
实现方案
上一节学习了死信队列,结合死信队列可以实现这业务逻辑.
也就是说集合消息过期时间 或队列过期时间实现
消息过期ttl设置
队列过期ttl设置
延迟队列插件配置
下载地址:
https://www.rabbitmq.com/community-plugins.html
进入页面后如下:
将下载的:rabbitmq_delayed_message_exchange-3.8.0.ez 文件上传到路径:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins
然后执行命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果安装成功:
测试
构建简单的spingboot项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo.rabbitmq</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>v1</version>
<name>rabbitmq-springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.properties配置文件
server.servlet.context-path=/api
spring.rabbitmq.host=192.168.5.185
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
重要代码如下
RabbitDelayedConfig配置对象
@Configuration
public class RabbitDelayedConfig {
private final static String EXCHANGE_NAME = "Ldp_delayed_exchange";
private final static String QUEUE_NAME = "Ldp_delayed_queue";
private final static String ROUTING_KEY = "Ldp_delayed_routing_key";
@Bean
public Queue delayedQueue() {
return new Queue(QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}
生产者
@RestController
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
private final static String EXCHANGE_NAME = "Ldp_delayed_exchange";
private final static String ROUTING_KEY = "Ldp_delayed_routing_key";
/**
* http://localhost:8080/api/send/3/m3
*
* @return
*/
@GetMapping("/send/{delayTime}/{message}")
public Object send(@PathVariable Integer delayTime, @PathVariable String message) {
MessagePostProcessor messagePostProcessor = correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime * 1000);
return correlationData;
};
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, messagePostProcessor);
return "send success,message=" + message + ",delayTime=" + delayTime + " s";
}
}
消费者
@Component
public class DelayedConsumer {
private final static String QUEUE_NAME = "Ldp_delayed_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
String exchange = message.getMessageProperties().getReceivedExchange();
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println("收到消息:exchange=" + exchange);
System.out.println("收到消息:routingKey=" + routingKey);
System.out.println("收到消息:msg=" + msg);
}
}
测试方式
http://localhost:8080/api/send/30/m30 // 延迟30秒
http://localhost:8080/api/send/2/m2 // 延迟2秒
从结果可以看出延迟2秒的虽然后入队列,但是先收到.
8.RabbitMQ集群
这里以集群2台为例
1.主机名称
vim /etc/hostname
2.配置各个节点的 hosts 文件
vim /etc/hosts
192.168.5.61 node1
192.168.5.185 node2
3.cookie文件使用的是同一个值
在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
4.启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在每台节点上分别执行以下命令)
rabbitmq-server -detached
5.在节点 2 执行(意思就是把节点2加到节点1里面)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
6.如果是集群更多的节点重复步骤5
7.集群状态
rabbitmqctl cluster_status
8.需要重新设置用户
创建账号
rabbitmqctl add_user admin 123456
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p “/“ admin “.“ “.“ “.*”
注意:
rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务
访问任意节点客户端:
9.其他知识点
1.日志配置
https://blog.csdn.net/zhishidi/article/details/119112307
2.优先级队列
3.惰性队列
4.发布确认(与springboot整合)
5.备份交换机