依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
Producer
public class Producer {
public static void main(String[] args) throws Exception {
// 假数据框架 Facker
Faker faker = new Faker(Locale.CHINA);
ConnectionFactory connectionFactory = null;
Connection connection = null;
Channel channel = null;
try {
// 1. 创建 ConnectionFactory
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通过 ConnectionFactory 创建 Connect
connection = connectionFactory.newConnection();
// 3. 通过 Connect 创建一个 Channel
channel = connection.createChannel();
// 4. 通过 Channel 发送 msg
String exchange = "test";
String routingKey = "test01";
AMQP.BasicProperties basicProperties = null;
for (int i = 0; i < 10000; i++) {
String msg = faker.name().fullName();
byte[] msgBody = msg.getBytes(StandardCharsets.UTF_8);
channel.basicPublish(
exchange, // exchange
routingKey, // routingKey
basicProperties, // properties
msgBody // body
);
}
} finally {
// 5. 关闭 Channel 和 Connection
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Confirm Listener
在 channel 上开启确认模式
channel.confirmSelect();
在 channel 上添加监听
// 监听回应
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack!");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("no ack");
}
});
- 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志
Return Listener
- 接受路由不可达的响应
- routingKey 不符合
- 在发送时候需要设置
mandatory
为true- true 则监听器会接受到路由不可达的消息
- 如果为默认的 false,则 broker 端会自动删除该消息
// 方法声明
void basicPublish(
String exchange,
String routingKey,
boolean mandatory,
BasicProperties props,
byte[] body)throws IOException;
- 添加 returnListener
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("不可达");
}
});
Consumer
public class FanoutConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Channel channel = null;
try {
// 1. 创建 ConnectionFactory
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通过 ConnectionFactory 创建 Connect
connection = connectionFactory.newConnection();
// 3. 通过 Connect 创建一个 Channel
channel = connection.createChannel();
// 4. 声明 channel 属性
String exchangeName = "test";
// 设置 exchange 的属性 fanout
String exchangeType = "fanout";
// fanout 属性下,routingKey 是无所谓的
String routingKey = "wtf";
String queueName = "test_fanout_queue";
// queue 声明
channel.queueDeclare(
queueName,
false, // durable 队列是否持久化,重启可用
false, // exclusive 是否独占,仅一个连接可用
false, // autoDelete 没有东西和该 queue 绑定,则会自动删除
null); // arguments
// exchange 声明
channel.exchangeDeclare(
exchangeName,
exchangeType,
true, // durable 是否持久化消息
false, // autoDelete
false, // internal
null // arguments
);
// 绑定 queue 和 exchange
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者
// 》》》》 新版本通过重写 DefaultConsumer#handleDelivery 进行数据消费
MyConsumer consumer = new MyConsumer(channel);
// 绑定 channel 和 Consumer
channel.basicConsume(
queueName,
true, // 是否 autoAck
consumer);
// 消费消息
while(true) {
}
} finally {
// 5. 关闭 Channel 和 Connection
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel){
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
// 手动 ack
// channel.basicAck(envelope.getDeliveryTag(), false);
}
}
autoAck
autoAck=true
,即 自动确认模式,一旦 rabbitmq 将消息分发给消费者,就会直接删除该消息- 如果消费者拿到消息却没有做处理,此时杀死正在执行的消费者,会丢失正在处理的消息
autoAck=false
, 即 手动模式,一旦消费者挂掉,但是没有手动ack,则该消息会交付给其他消费者- 当消费者发送消息应答(ack),则 rabbitmq 会删除该消息
限流 qos
- qos 服务质量保证。如果一定瞬目的消息未被确认前,不进行消费新的消息
前提,要求 no_ask = fasle(关闭自动签收)
channel.basicConsume(queueName,
false, // 关闭自动签收
consumer);
channel 设置
basicQos
```java channel.basicQos(0,10,
false);
/**
* Request specific "quality of service" settings.
*
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* 消息体大小限制,0表示不限制
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* 同时给 consumer 同时推送的最大消息数量
* 如果该数量的消息没有 ack,则 consumer 会 block, until 消息 ack
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* 该 Qos 设置是否 channel 全局启用?
* false 表示仅在当前 Consumer 启用, 即作用在绑定的 consumer 上
* @throws java.io.IOException if an error is encountered
*/ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
<a name="jSfgn"></a>
### Nack 和 重回队列
- nack 表示告诉 broker 消费失败
- **ackAck 肯定是 false**
- 重回队列是为了对没有处理成功的消息,讲消息重新投递到 broker 的尾端
- **一般都会设置 false(默认 false)**
```java
// 消费端消费消息时,给 broker 的响应
channel.basicNack(envelope.getDeliveryTag(),
false, // autoAck
false); // 重回队列,建议 false
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
TTL
- time to live, 生存时间
执行消息的TTL
- 设置消息的
expiration
属性- 过期时间内未被消费会被 broker 自动删除
// 额外参数
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化
.contentEncoding("UTF-8") // 编码格式
.expiration("10000") // ******** 过期时间 ***********
.headers(new HashMap<String, Object>(){ // 自定义参数
{
put("key01", "value1");
put("key02", "value2");
}
})
.build();
- 过期时间内未被消费会被 broker 自动删除
channel.basicPublish("normal_exchange", // exchange
"routingKey", // routing key
true , // mandatory 开启 return,监听是否可达
properties, // 设置 proeprties
msg.getBytes());
设置队列为过期队列
- 从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
- 队列创建时设置
arguments
中的项x-message-ttl
Exchange
- 接受消息,根据路由键(routing key)转发消息所绑定的队列
- 属性
- name: 交换机名称
- type: 交换机类型
direct
topic
fanout
headers
- durability: 是否需要持久化,server 重启的时候是否保留
- auto delete: 当最后一个绑定到 exchange 的队列删除后,是否自动删除该 exchange
- internal: 当前 exchange 是否用于 rabbitmq 内部使用,默认为 false
- 基本就是 false
- argument:扩展参数,用于扩展 amqp 协议自定义化使用
direct exchange
- 和 queue 之间的 binding 的 routingkey 名称需要全部命中才会转发
topic exchange
- 所有关心 routing key 中指定的 topic 的 queue 都会转发
- 其实就是 Message 用指定规则的 routingKey,Consumer 端进行 routingKey 的模糊匹配
- 可以使用通配符进行模糊匹配
#
匹配一个或者多个词*
匹配一个词demo.*
能够匹配 demo.a, demo.wtf,无法匹配 demo.a.b#.demo
可以匹配 a.b.c.d.demo
fanout exchange
- 不处理 routing key, 只是简单将队列绑定到 exchange 上
- 发送到 exchange 的消息都会转发到与 exchange 绑定的所有队列上
- 类似广播
- fanout exchange 的速度是最快的
Queue
- 消息队列,实际存储消息数据
- 属性
- durability: 是否持久化, durable 是, transient 否
- auto delete : yes 则表示当最后一个监听被移除后,该 queue 会自动删除
Message
- server 和 client 之间传送的数据
- 由 properties 和 payload(body) 组成
- 常用属性
- dilivery mode
- haders(自定义属性)
其他属性
- correlation_id、reply_to、expiration, message_id
- timestamp、type、user_id、appid、cluster_id
// com.rabbitmq.client.AMQP.BasicProperties
public static class BasicProperties extends AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String, Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
}
生产端设置 properties 即可
AMQP.BasicProperties basicProperties =
new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding(StandardCharsets.UTF_8.toString())
.expiration("20000")
// 进行设置
.build();
virtual host
- 虚拟地址,用于逻辑隔离,最上层的消息路由,类似 redis 不同的 db
- 一个 virtual host 里面可以有若干个 exchange 和 queue
- 一个 virtual host 里面不能有相同名称的 exchange 或 queue
死信队列
- Dead-Letter-Exchange
利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是DLX
进入的时机
消息被拒绝(basic.reject/basic.nack) 且 requeue=false(重回队列false)
- 消息 ttl 过期
- 队列达到最大长度
设置死信队列
// 死信队列的 exchange 是个普通 exchange
channel.exchangeDeclare(
"dlx.exchange", // exhcangeName
"topic", // type
true, // durable
false, // autoDelete
null // arguments
);
// 对应的 queue 也是一个普通的 queue
channel.queueDeclare(
"dlx.queue", // queueName
true, // durable
false, // exclusive
false, // autoDelete
null
); // arguments
// binding
//////////////////////////////////////////////////////
//注意 routing key 可以接受所有消息
/////////////////////////////////////////////////////
channel.queueBind("dlx.queue", "dlx.exchange", "#");
- 注意死信队列的 routingKey 是
#
绑定死信队列
- 给需要死信队列的队列加上属性
arguments.put("x-dead-letter-exchange", "dlx.exchange");
```java String exchangeName = “normal_exchange”; String exchangeType = “topic”; String queueName = “normal_queue”;
// 声明一个普通的 exchange channel.exchangeDeclare( exchangeName, exchangeType, true, // durable false, // autoDelete null // argument );
Map
// 声明一个普通的 queue
channel.queueDeclare(
queueName,
true, // durable
false, // exclusive 独占模式
false, // autoDelete
arguments // 设置该普通队列拥有死信队列
);
channel.queueBind(queueName, exchangeName, “routingKey”);
```