AMQP协议
AMQP(Advanced Message Queuing Protocol,高级消息队列协议),是进程之间传递异步消息的网络协议。
AMQP工作工程
发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
RabbiMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。
- 应用耦合
====》
- 异步消息
===》
- 流量削峰
===》
RabbitMQ使用场景:
- 排队算法 : 使用消息队列特性
- 秒杀活动 : 使用消息队列特性
- 消息分发 : 使用消息异步特性
- 异步处理 : 使用消息异步特性
- 数据同步 : 使用消息异步特性
- 处理耗时任务 : 使用消息异步特性
- 流量销峰
RabbitMQ架构
1.Virtual Host |
---|
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/ |
2.Connection |
链接。指rabbitMQ服务器和服务建立的TCP链接。 |
3.Channel |
信道。1,Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。2,TCP一旦打开,就会创建AMQP信道。3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。 |
4.Message |
消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。 |
5.Publisher |
消息的生产者。也是一个向交换器发布消息的客户端应用程序。 |
6.Consumer |
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。 |
7.Exchange |
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 三种常用的交换器类型1. direct(发布与订阅 完全匹配)2. fanout(广播)3. topic(主题,规则匹配) |
8.Binding |
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 |
9.Routing-key |
路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的名称,路由键是key,队列是value)队列通过路由键绑定到交换器。消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞。 |
10.Queue |
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。 |
11.Borker |
表示消息队列服务器实体。 |
12.交换器和队列的关系 |
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。 路由键可以理解为匹配的规则。 |
13.RabbitMQ为什么需要信道?为什么不是TCP直接通信? |
1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。2. 如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。 |
RabbitMQ通信方式
- Hello World!:为了入门操
- Work queues:一个队列被多个消费者消费
- Publish/Subscribe:手动创建Exchange(FANOUT)
- Routing:手动创建Exchange(DIRECT)
- Topics:手动创建Exchange(TOPIC)
- RPC:RPC方式
- Publisher Confirms:保证消息可靠性
RabbitMQ 客户端操作
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
创建RabbitMQ工具类
public class RabbitMQUtil {
/**
* rabbitMQ 服务器ip地址
*/
private static final String RABBITMQ_HOST = "192.168.126.128";
/**
* rabbitMQ 服务器端口
*/
private static final int RABBITMQ_PORT = 5672;
/**
* rabbitMQ 虚拟服务器uri
*/
private static final String RABBIT_VIRTUAL_HOST = "/";
/**
* rabbitMQ 用户名
*/
private static final String RABBITMQ_USERNAME = "admin";
/**
* rabbitMQ 用户密码
*/
private static final String RABBITMQ_PASSWORD = "admin";
/**
* 获取一个connection
* @return Connection 对象
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 创建ConnectionFactory 工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置rabbitMQ连接信息
connectionFactory.setHost(RABBITMQ_HOST);
connectionFactory.setPort(RABBITMQ_PORT);
connectionFactory.setVirtualHost(RABBIT_VIRTUAL_HOST);
connectionFactory.setUsername(RABBITMQ_USERNAME);
connectionFactory.setPassword(RABBITMQ_PASSWORD);
// 创建一个rabbitMQ 连接Connection 对象返回
return connectionFactory.newConnection();
}
}
HelloWord通信
采用默认的交换机Exchange( direct )
public class Publisher {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
try {
// 1. 获取连接 Connection
Connection connection = RabbitMQUtil.getConnection();
// 2. 获取信道 Channel
Channel channel = connection.createChannel();
//3. 创建队列 Queue
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4. 发布消息
String message = "hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功");
//5 关闭资源
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
try {
// 1 创建连接 Connection
Connection connection = RabbitMQUtil.getConnection();
// 2 构建信道 Channel
Channel channel = connection.createChannel();
// 3 构建队列(与Publisher构建的队列一样)。 在这里也声明了队列。因为可能会在Publisher之前启动Consumer,确保在消费消息之前队列是存在的
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4 消费消息
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
}
});
//5 关闭资源
channel.close();
connection.close();
} catch (IOException |
TimeoutException e) {
e.printStackTrace();
}
}
}
WorkQueue
- 一个队列中的一个消息,只会被一消费者消费
- 默认情况下,RabbitMQ队列会将消息以轮询的方式交给不同的消费者消费
消费者拿到消息后,需要给RabbitMQ一个ack,这样RabbitMQ就认为消费者拿到消息了
public class Publisher { private static final String QUEUE_NAME = "work"; public static void main(String[] args) { try { //1 获取连接 Connection connection = RabbitMQUtil.getConnection(); //2 构建信道Channel Channel channel = connection.createChannel(); //3 构建队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //4 发送消息 AMQP.BasicProperties prop = new AMQP.BasicProperties(); for (int i = 1; i <= 10; i++) { String message = "work queue—"+i; channel.basicPublish("",QUEUE_NAME,prop,message.getBytes()); } System.out.println("发送成功"); //5 关闭资源 channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }
```java public class Consumer {
private static final String QUEUE_NAME = “work”;
public static void main(String[] args) {
try { //1 获取连接 Connection connection = RabbitMQUtil.getConnection(); //2 构建信道Channel Channel channel = connection.createChannel(); //3 构建队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //4 消费消息 //4.1 设置回调函数 // 设置信道流量。服务器将传递的最大消息数,如果无限制,则为0 channel.basicQos(1); DefaultConsumer callback = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("1号—获取到消息:"+new String(body, StandardCharsets.UTF_8)); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 设置手动Ack channel.basicAck(envelope.getDeliveryTag(),false); } }; // 消费消息,关闭自动Ack channel.basicConsume(QUEUE_NAME,false,callback); } catch (IOException | TimeoutException e) { e.printStackTrace(); }
} } //—————————————————————————————————————-
public class Consumer2 {
private static final String QUEUE_NAME = "work";
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道Channel
Channel channel = connection.createChannel();
//3 构建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4 消费消息
//4.1 设置回调函数
// 设置信道流量。服务器将传递的最大消息数,如果无限制,则为0
channel.basicQos(1);
DefaultConsumer callback1 = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("1号—获取到消息:"+new String(body, StandardCharsets.UTF_8));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 设置手动Ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 消费消息,关闭自动Ack
channel.basicConsume(QUEUE_NAME,false,callback1);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
<a name="BjOnt"></a>
#### Publish/Subscribe

- 生产者(Publisher),自行构建FANOUT类型的Exchange,并与队列进行绑定。
```java
public class Publisher {
private static final String EXCHANGE_NAME = "pubsub";
private static final String QUEUE_NAME1 = "publish1";
private static final String QUEUE_NAME2 = "publish2";
public static void main(String[] args) {
try {
//1 构建连接Connection
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道Channel
Channel channel = connection.createChannel();
//3 构建交换机Exchange,制定交换机类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5 绑定交换机和队列,绑定方式为直接绑定(根据queueName和exchangeName绑定,与routingKey无关
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "publish1");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "publish2");
//6 发送消息
String message = "Hello publish/subscribe";
channel.basicPublish(EXCHANGE_NAME, "publish1", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "publish2", null, message.getBytes());
//7 关闭资源
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
public class Consumer1 {
private static final String QUEUE_NAME1 = "publish1";
public static void main(String[] args) {
try {
//1 构建连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道
Channel channel = connection.createChannel();
//3 构建队列,保证队列存在
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
//4 获取消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:"+new String(body,StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME1,false,callback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
//----------------------------------------------------------------------------------
public class Consumer2 {
private static final String QUEUE_NAME2 = "publish2";
public static void main(String[] args) {
try {
//1 构建连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道
Channel channel = connection.createChannel();
//3 构建队列,保证队列存在
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//4 获取消息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:"+new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME2,false,callback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
Routing
生产者 Publisher:在绑定Exchange和Queue时,需要指定routingKey,同时在发送消息时, 也需要指定routingKey,只有在routingKey一致时,消息才会路由到指定的Queue。交换机类型为DIRECT。
public class Publisher { private static final String EXCHANGE_NAME = "routingKey"; private static final String QUEUE_NAME1 = "routingKey_queue1"; private static final String QUEUE_NAME2 = "routingKey_queue2"; public static void main(String[] args) { try { //1 获取连接 Connection connection = RabbitMQUtil.getConnection(); //2 构建信道 Channel Channel channel = connection.createChannel(); //3 构建交换机,指定类型为DIRECT channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //4 构建队列 channel.queueDeclare(QUEUE_NAME1, false, false, false, null); channel.queueDeclare(QUEUE_NAME2, false, false, false, null); //5 绑定Exchange和Queue channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white"); //6 发送信息 channel.basicPublish(EXCHANGE_NAME, "red", null, "这是Red".getBytes()); channel.basicPublish(EXCHANGE_NAME, "blue", null, "这是blue".getBytes()); channel.basicPublish(EXCHANGE_NAME, "black", null, "这是black".getBytes()); channel.basicPublish(EXCHANGE_NAME, "white", null, "这是white".getBytes()); //7 关闭资源 channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }
```java public class Consumer1 {
private static final String QUEUE_NAME1 = “routingKey_queue1”;
public static void main(String[] args) {
try { //1 获取连接 Connection connection = RabbitMQUtil.getConnection(); //2 构建信道 Channel channel = connection.createChannel(); //3 构建队列 channel.queueDeclare(QUEUE_NAME1, false, false, false, null); //4 获取消息 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("获取到消息" + new String(body, StandardCharsets.UTF_8)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME1, false, callback); } catch (IOException | TimeoutException e) { e.printStackTrace(); }
} } //—————————————————————————————————————————- public class Consumer2 { private static final String QUEUE_NAME2 = “routingKey_queue2”;
public static void main(String[] args) {
try { //1 获取连接 Connection connection = RabbitMQUtil.getConnection(); //2 构建信道 Channel channel = connection.createChannel(); //3 构建队列 channel.queueDeclare(QUEUE_NAME2, false, false, false, null); //4 获取消息 DefaultConsumer callback = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("获取到消息" + new String(body, StandardCharsets.UTF_8)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME2, false, callback); } catch (IOException | TimeoutException e) { e.printStackTrace(); }
}
}
<a name="pBZ2Y"></a>
#### Topics
<br />Publisher:TOPIC类型可以编写带有特殊意义的routingKey的绑定方式。交换机类型为TOPIC类型。<br />注意:topic类型交换机,在与queue绑定时,routingkey格式为:单词1.单词2.单词3……,其中*表示占位符,#表示通配符
```java
public class Publisher {
private static final String EXCHANGE_NAME = "topic";
private static final String QUEUE_NAME1 = "topic-queue1";
private static final String QUEUE_NAME2 = "topic-queue2";
public static void main(String[] args) {
try {
//1 获取连接Connection
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道Channel
Channel channel = connection.createChannel();
//3 构建交换机,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4 构建队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//5 绑定交换机和队列
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"*.red.*");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"big.*.*");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"*.red.#");
//6 发送消息
channel.basicPublish(EXCHANGE_NAME,"big.red.dog",null,"big.red.dog".getBytes());
channel.basicPublish(EXCHANGE_NAME,"small.red.cat",null,"small.red.cat".getBytes());
channel.basicPublish(EXCHANGE_NAME,"big.blue.dog",null,"big.blue.dog".getBytes());
channel.basicPublish(EXCHANGE_NAME,"big.red.dog.pig",null,"big.red.dog".getBytes());
//7 关闭资源
channel.close();;
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
public class Consumer1 {
private static final String QUEUE_NAME1 = "topic-queue1";
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道 Channel
Channel channel = connection.createChannel();
//3 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
//4 接收消费信息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME1, false, callback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
//------------------------------------------------------------------------------------
public class Consumer2 {
private static final String QUEUE_NAME2 = "topic-queue2";
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道 Channel
Channel channel = connection.createChannel();
//3 构建队列
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//4 接收消费信息
DefaultConsumer callback = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:" + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME2, false, callback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
RPC通信类型(了解)
因为两个服务在交互时,可以尽量做到Client和Server的解耦,通过RabbitMQ进行解耦操作需要让Client发送消息时,携带两个属性:
- replyTo:告知Server将相应信息放到哪个队列
- correlationId:告知Server发送相应消息时,需要携带位置标示来告知Client响应的信息
public class Client {
private static final String QUEUE_PUBLISH = "rpc-publish";
private static final String QUEUE_CONSUMER = "rpc-consumer";
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 获取信道
Channel channel = connection.createChannel();
//3 构建队列
channel.queueDeclare(QUEUE_PUBLISH, false, false, false, null);
channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null);
//4 发布消息
String message = "hello RPC";
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties prop = new AMQP.BasicProperties()
.builder()
.replyTo(QUEUE_CONSUMER)
.correlationId(correlationId)
.build();
channel.basicPublish("", QUEUE_PUBLISH, prop, message.getBytes());
System.out.println("消息发送成功");
channel.basicConsume(QUEUE_CONSUMER, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String correlationId1 = properties.getCorrelationId();
if (correlationId1 != null && correlationId1.equalsIgnoreCase(correlationId)) {
System.out.println("获取到服务:" + new String(body, StandardCharsets.UTF_8));
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
public class Server {
private static final String QUEUE_PUBLISH = "rpc-publish";
private static final String QUEUE_CONSUMER = "rpc-consumer";
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 获取信道
Channel channel = connection.createChannel();
//3 构建队列
channel.queueDeclare(QUEUE_PUBLISH, false, false, false, null);
channel.queueDeclare(QUEUE_CONSUMER, false, false, false, null);
//4 监听消息
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到消息:"+new String(body,StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(),false);
String response = "这是服务端答复的消息";
String correlationId = properties.getCorrelationId();
String responseQueue = properties.getReplyTo();
AMQP.BasicProperties prop = new AMQP.BasicProperties()
.builder().correlationId(correlationId).build();
channel.basicPublish("",responseQueue,prop,response.getBytes());
}
};
channel.basicConsume(QUEUE_PUBLISH, false, callback);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
RabbitMQ交换机类型
direct类型
- direct交换器是RabbitMQ默认交换器。默认会进行公平调度,所有接受者依次从消息队列中获取值(一个队列多个消费者,采用类似轮询的方式)。
Publisher给哪个队列发消息,就一定是给哪个队列发送消息(routingkey,绑定交换机和队列)。对交换器绑定的其他队列没有任何影响。
fanout类型
扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换器上的队列。且每个队列消息中第一个Consumer能收到消息。
topic类型
允许在路由键(RoutingKey)中出现匹配规则。路由键的写法和包写法相同。com.msb.xxxx.xxx格式。
在绑定时可以带有下面特殊符号,中间可以出现:
* : 代表一个单词(两个.之间内容)
# : 0个或多个字符
接收方依然是公平调度,同一个队列中内容轮换获取值。SpringBoot使用RabbitMQ
spring: rabbitmq: host: 192.168.126.128 port: 5672 username: admin password: admin virtual-host: /
```java @Configuration public class RabbitMQConfig {
private static final String EXCHANGE_NAME = “springboot-exchange”; private static final String QUEUE_NAME = “spingboot-queue”; private static final String ROUTING_KEY = “*.black.#”;
//获取交换机
@Bean public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
// 获取队列
@Bean public Queue bootQueue() {
return QueueBuilder.nonDurable(QUEUE_NAME).build();
}
//绑定交换机和队列
@Bean public Binding getBinding(Exchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
} }
```java
@Component
public class Publisher {
private static final String EXCHANGE_NAME = "springboot-exchange";
private final RabbitTemplate rabbitTemplate;
@Autowired
public Publisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void publish() {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "big.black.dogAndCat", "messageWithProperties", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("发送" + new String(message.getBody(), StandardCharsets.UTF_8));
message.getMessageProperties().setCorrelationId("test");
return message;
}
});
System.out.println("消息发送成功");
}
}
@Component
public class Consumer {
private static final String QUEUE_NAME = "spingboot-queue";
@RabbitListener(queues = QUEUE_NAME)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("获取到消息:" + msg);
System.out.println("获取到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
RabbitMQ保证消息的可靠性
保证消息送到Exchange —— Confirm机制
有三种策略:
- 单独发布消息:显著降低了消息发布速度,因为对消息的确认会阻碍所有后续消息的发布
- 批量发布消息:必单独发布消息快,但还是降低了消息的发布速度
- 异步回调
可以通过Confirm效果保证消息一定送达到Exchange,官方提供了三种方式,选择了对于效率影响最低的异步回调的效果
public static void main(String[] args) {
try {
//1 构建连接Connection
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道Channel
Channel channel = connection.createChannel();
//3 构建交换机Exchange,制定交换机类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//4 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5 绑定交换机和队列,绑定方式为直接绑定(根据queueName和exchangeName绑定,与routingKey无关
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "publish1");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "publish2");
// 开启confirms
channel.confirmSelect();
// 设置异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 消息发送到Exchange了,进行处理
System.out.println("消息发送成功");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 消息发送Exchange失败,进行处理;比如重新发送等
System.out.println("消息发送失败");
}
});
//6 发送消息
String message = "Hello publish/subscribe";
channel.basicPublish(EXCHANGE_NAME, "publish1", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "publish2", null, message.getBytes());
//7 关闭资源
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
保证消息可靠路由到Queue —— Return机制
为了保证Exchange上的消息一定可以送达到Queue,设置Return机制
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道 Channel
Channel channel = connection.createChannel();
//3 构建交换机,指定类型为DIRECT
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4 构建队列
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//5 绑定Exchange和Queue
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white");
// 设置Return回调,没有路由到指定Queue时会执行回调函数
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到Queue");
}
});
//6 发送信息,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
channel.basicPublish(EXCHANGE_NAME, "red", true,null, "这是Red".getBytes());
channel.basicPublish(EXCHANGE_NAME, "blue", true,null, "这是blue".getBytes());
channel.basicPublish(EXCHANGE_NAME, "black", true,null, "这是black".getBytes());
channel.basicPublish(EXCHANGE_NAME, "white", true,null, "这是white".getBytes());
//7 关闭资源
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
保证Queue消息持久化——DeliveryMode设置消息持久化
DeliveryMode设置为2代表持久化,如果设置为1,就代表不会持久化。注意:先设置queue持久化,在设置消息持久化
public static void main(String[] args) {
try {
//1 获取连接
Connection connection = RabbitMQUtil.getConnection();
//2 构建信道 Channel
Channel channel = connection.createChannel();
//3 构建交换机,指定类型为DIRECT
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//4 构建队列, durable设置true,设置队列持久化
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//5 绑定Exchange和Queue
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "red");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "black");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "white");
// 设置Return回调
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息没有路由到Queue");
}
});
// deliveryMode(2),设置消息持久化
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().deliveryMode(2).build();
//6 发送信息,将basicPublish方法参数中的mandatory设置为true,即可开启Return机制,当消息没有路由到队列中时,就会执行return回调
channel.basicPublish(EXCHANGE_NAME, "red", true, prop, "这是Red".getBytes());
channel.basicPublish(EXCHANGE_NAME, "blue", true, prop, "这是blue".getBytes());
channel.basicPublish(EXCHANGE_NAME, "black", true, prop, "这是black".getBytes());
channel.basicPublish(EXCHANGE_NAME, "white", true, prop, "这是white".getBytes());
//7 关闭资源
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
保证消费者可以正常消费消息
设置手动ack
Springboot设置消息可靠性
spring:
rabbitmq:
host: 192.168.126.128
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: CORRELATED #设置开启confirm回调
publisher-returns: true # 开启return回调
listener:
simple:
acknowledge-mode: MANUAL #开启手动ack
prefetch: 1
@Component
public class Publisher {
private static final String EXCHANGE_NAME = "springboot-exchange";
private final RabbitTemplate rabbitTemplate;
@Autowired
public Publisher(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void publish() {
// 设置Confirm回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("消息成功发送到交换机");
}else {
System.out.println("消息没有发送到交换机");
}
}
});
// 设置Return回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
String msg = new String(returned.getMessage().getBody());
System.out.println("消息:" + msg + "路由队列失败!!做补救操作!!");
}
});
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "big.black.dogAndCat", "messageWithProperties", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("发送" + new String(message.getBody(), StandardCharsets.UTF_8));
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setCorrelationId("test");
return message;
}
});
System.out.println("消息发送成功");
}
}
RabbitMQ死信队列与延迟交换机
产生死信的情况:
- 消息被消费者拒绝,requeue设置为false,该消息变为死信
- 给消息设置生存时间
- 发送消息是设置消息的生存时间,如果该消息的生存时间到了还没有被消费,该消息变为死信
- 设置某个队列中所有消息的生存时间,如果生存时间到了还没被消费,消息变为死信
- 队列已达到消息的最大长度,在路由过来的消息变为死信
死信队列的应用:
- 基于死信队列在消息已满的情况下,消息也不会丢失;
-
实现死信队列
基于消费者进行reject或者nack实现死信效果
@Configuration public class DeadLetterConfig { private static final String NORMAL_EXCHANGE = "normal-exchange"; private static final String NORMAL_QUEUE = "normal-queue"; private static final String NORMAL_ROUTING_KEY = "normal.#"; private static final String DEAD_EXCHANGE = "dead-exchange"; private static final String DEAD_QUEUE = "dead-queue"; private static final String DEAD_ROUTING_KEY = "dead.#"; @Bean public Exchange normalExchange() { return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build(); } @Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.message").build(); } @Bean public Binding normalBinding(Exchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs(); } @Bean public Exchange deadExchange() { return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding deadBinding(Exchange deadExchange, Queue deadQueue) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
@Component public class Publisher2 { private static final String NORMAL_EXCHANGE = "normal-exchange"; private static final String DEAD_EXCHANGE = "dead-exchange"; @Autowired private RabbitTemplate rabbitTemplate; public void publish() { // 设置Confirm回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println(cause); ReturnedMessage returned = correlationData.getReturned(); // 发送到Exchange失败,重新再发一次 rabbitTemplate.convertAndSend(returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), message1 -> { // 开启消息持久化 message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message1; }); } } }); // 设置Return回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { // 路由到queue失败,重新再发一次 rabbitTemplate.convertAndSend(returned.getExchange(), returned.getRoutingKey(), returned.getMessage(), message1 -> { // 开启消息持久化 message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message1; }); } }); String message = "这是消息"; rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, "normal.message", message, message1 -> { // 开启消息持久化 message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message1; }); } }
@Component public class DeadListener { private static final String NORMAL_QUEUE = "normal-queue"; @RabbitListener(queues = NORMAL_QUEUE) public void consume(Message message, Channel channel) { System.out.println("normal队列中的消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); try { // consumer reject channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // consumer nack // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); } } }
给消息设置生存时间
public void publishWithExpireTime() { String message = "这是有生存时间的消息"; rabbitTemplate.convertAndSend(NORMAL_EXCHANGE, "normal.message", message, message1 -> { // 开启消息持久化 message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置setExpiration(毫秒) message1.getMessageProperties().setExpiration("5000"); return message1; }); }
给队列中的消息设置生存时间
@Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abc") .ttl(10000)// 设置ttl(毫秒) .build(); }
设置Queue中的消息最大长度
@Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.abc") .maxLength(1)// 设置maxLength .build(); }
延迟交换机
死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。
@Configuration public class DelayedConfig { public static final String DELAYED_EXCHANGE = "delayed-exchange"; public static final String DELAYED_QUEUE = "delayed-queue"; public static final String DELAYED_ROUTING_KEY = "delayed.#"; @Bean public Exchange delayedExchange(){ Map<String, Object> arguments = new HashMap<>(); // 设置延迟交换机为Topic路由规则 arguments.put("x-delayed-type","topic"); Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments); return exchange; } @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DELAYED_QUEUE).build(); } @Bean public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
public void publish(){ rabbitTemplate.convertAndSend(DelayExchangeConfig.DELAYED_EXCHANGE, "delayed.message", "这是消息", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置延迟时间 message.getMessageProperties().setDelay(3000); return message; } }); }
RabbiMQ集群