依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>
Producer
public class Producer {public static void main(String[] args) throws Exception {// 假数据框架 FackerFaker faker = new Faker(Locale.CHINA);ConnectionFactory connectionFactory = null;Connection connection = null;Channel channel = null;try {// 1. 创建 ConnectionFactoryconnectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");// 2. 通过 ConnectionFactory 创建 Connectconnection = connectionFactory.newConnection();// 3. 通过 Connect 创建一个 Channelchannel = connection.createChannel();// 4. 通过 Channel 发送 msgString exchange = "test";String routingKey = "test01";AMQP.BasicProperties basicProperties = null;for (int i = 0; i < 10000; i++) {String msg = faker.name().fullName();byte[] msgBody = msg.getBytes(StandardCharsets.UTF_8);channel.basicPublish(exchange, // exchangeroutingKey, // routingKeybasicProperties, // propertiesmsgBody // body);}} finally {// 5. 关闭 Channel 和 Connectionif (channel != null) {channel.close();}if (connection != null) {connection.close();}}}}
Confirm Listener
在 channel 上开启确认模式
channel.confirmSelect();
在 channel 上添加监听
// 监听回应channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("ack!");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("no ack");}});
- 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志
Return Listener
- 接受路由不可达的响应
- routingKey 不符合
- 在发送时候需要设置
mandatory为true- true 则监听器会接受到路由不可达的消息
- 如果为默认的 false,则 broker 端会自动删除该消息
// 方法声明void basicPublish(String exchange,String routingKey,boolean mandatory,BasicProperties props,byte[] body)throws IOException;
- 添加 returnListener
channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("不可达");}});
Consumer
public class FanoutConsumer {public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = null;Connection connection = null;Channel channel = null;try {// 1. 创建 ConnectionFactoryconnectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");// 2. 通过 ConnectionFactory 创建 Connectconnection = connectionFactory.newConnection();// 3. 通过 Connect 创建一个 Channelchannel = connection.createChannel();// 4. 声明 channel 属性String exchangeName = "test";// 设置 exchange 的属性 fanoutString exchangeType = "fanout";// fanout 属性下,routingKey 是无所谓的String routingKey = "wtf";String queueName = "test_fanout_queue";// queue 声明channel.queueDeclare(queueName,false, // durable 队列是否持久化,重启可用false, // exclusive 是否独占,仅一个连接可用false, // autoDelete 没有东西和该 queue 绑定,则会自动删除null); // arguments// exchange 声明channel.exchangeDeclare(exchangeName,exchangeType,true, // durable 是否持久化消息false, // autoDeletefalse, // internalnull // arguments);// 绑定 queue 和 exchangechannel.queueBind(queueName, exchangeName, routingKey);// 创建消费者// 》》》》 新版本通过重写 DefaultConsumer#handleDelivery 进行数据消费MyConsumer consumer = new MyConsumer(channel);// 绑定 channel 和 Consumerchannel.basicConsume(queueName,true, // 是否 autoAckconsumer);// 消费消息while(true) {}} finally {// 5. 关闭 Channel 和 Connectionif (channel != null) {channel.close();}if (connection != null) {connection.close();}}}}
class MyConsumer extends DefaultConsumer {private Channel channel;public MyConsumer(Channel channel){super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));// 手动 ack// channel.basicAck(envelope.getDeliveryTag(), false);}}
autoAck
autoAck=true,即 自动确认模式,一旦 rabbitmq 将消息分发给消费者,就会直接删除该消息- 如果消费者拿到消息却没有做处理,此时杀死正在执行的消费者,会丢失正在处理的消息
autoAck=false, 即 手动模式,一旦消费者挂掉,但是没有手动ack,则该消息会交付给其他消费者- 当消费者发送消息应答(ack),则 rabbitmq 会删除该消息
限流 qos
- qos 服务质量保证。如果一定瞬目的消息未被确认前,不进行消费新的消息
前提,要求 no_ask = fasle(关闭自动签收)
channel.basicConsume(queueName,false, // 关闭自动签收consumer);
channel 设置
basicQos```java channel.basicQos(0,10,false);
/**
* Request specific "quality of service" settings.** These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @see com.rabbitmq.client.AMQP.Basic.Qos* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* 消息体大小限制,0表示不限制* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* 同时给 consumer 同时推送的最大消息数量* 如果该数量的消息没有 ack,则 consumer 会 block, until 消息 ack* @param global true if the settings should be applied to the* entire channel rather than each consumer* 该 Qos 设置是否 channel 全局启用?* false 表示仅在当前 Consumer 启用, 即作用在绑定的 consumer 上* @throws java.io.IOException if an error is encountered
*/ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
<a name="jSfgn"></a>### Nack 和 重回队列- nack 表示告诉 broker 消费失败- **ackAck 肯定是 false**- 重回队列是为了对没有处理成功的消息,讲消息重新投递到 broker 的尾端- **一般都会设置 false(默认 false)**```java// 消费端消费消息时,给 broker 的响应channel.basicNack(envelope.getDeliveryTag(),false, // autoAckfalse); // 重回队列,建议 false/*** Reject one or several received messages.** Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.* @see com.rabbitmq.client.AMQP.Basic.Nack* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param multiple true to reject all messages up to and including* the supplied delivery tag; false to reject just the supplied* delivery tag.* @param requeue true if the rejected message(s) should be requeued rather* than discarded/dead-lettered* @throws java.io.IOException if an error is encountered*/void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;
TTL
- time to live, 生存时间
执行消息的TTL
- 设置消息的
expiration属性- 过期时间内未被消费会被 broker 自动删除
// 额外参数AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化.contentEncoding("UTF-8") // 编码格式.expiration("10000") // ******** 过期时间 ***********.headers(new HashMap<String, Object>(){ // 自定义参数{put("key01", "value1");put("key02", "value2");}}).build();
- 过期时间内未被消费会被 broker 自动删除
channel.basicPublish("normal_exchange", // exchange"routingKey", // routing keytrue , // mandatory 开启 return,监听是否可达properties, // 设置 proeprtiesmsg.getBytes());
设置队列为过期队列
- 从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
- 队列创建时设置
arguments中的项x-message-ttl

Exchange
- 接受消息,根据路由键(routing key)转发消息所绑定的队列
- 属性
- name: 交换机名称
- type: 交换机类型
directtopicfanoutheaders
- durability: 是否需要持久化,server 重启的时候是否保留
- auto delete: 当最后一个绑定到 exchange 的队列删除后,是否自动删除该 exchange
- internal: 当前 exchange 是否用于 rabbitmq 内部使用,默认为 false
- 基本就是 false
- argument:扩展参数,用于扩展 amqp 协议自定义化使用
direct exchange
- 和 queue 之间的 binding 的 routingkey 名称需要全部命中才会转发
topic exchange
- 所有关心 routing key 中指定的 topic 的 queue 都会转发
- 其实就是 Message 用指定规则的 routingKey,Consumer 端进行 routingKey 的模糊匹配
- 可以使用通配符进行模糊匹配
#匹配一个或者多个词*匹配一个词demo.*能够匹配 demo.a, demo.wtf,无法匹配 demo.a.b#.demo可以匹配 a.b.c.d.demo
fanout exchange
- 不处理 routing key, 只是简单将队列绑定到 exchange 上
- 发送到 exchange 的消息都会转发到与 exchange 绑定的所有队列上
- 类似广播
- fanout exchange 的速度是最快的
Queue
- 消息队列,实际存储消息数据
- 属性
- durability: 是否持久化, durable 是, transient 否
- auto delete : yes 则表示当最后一个监听被移除后,该 queue 会自动删除
Message
- server 和 client 之间传送的数据
- 由 properties 和 payload(body) 组成
- 常用属性
- dilivery mode
- haders(自定义属性)
其他属性
- correlation_id、reply_to、expiration, message_id
- timestamp、type、user_id、appid、cluster_id
// com.rabbitmq.client.AMQP.BasicPropertiespublic static class BasicProperties extends AMQBasicProperties {private String contentType;private String contentEncoding;private Map<String, Object> headers;private Integer deliveryMode;private Integer priority;private String correlationId;private String replyTo;private String expiration;private String messageId;private Date timestamp;private String type;private String userId;private String appId;private String clusterId;}
生产端设置 properties 即可
AMQP.BasicProperties basicProperties =new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding(StandardCharsets.UTF_8.toString()).expiration("20000")// 进行设置.build();
virtual host
- 虚拟地址,用于逻辑隔离,最上层的消息路由,类似 redis 不同的 db
- 一个 virtual host 里面可以有若干个 exchange 和 queue
- 一个 virtual host 里面不能有相同名称的 exchange 或 queue
死信队列
- Dead-Letter-Exchange
利用 DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新 publish 到另一个 Exchange,这个 Exchange 就是DLX
进入的时机
消息被拒绝(basic.reject/basic.nack) 且 requeue=false(重回队列false)
- 消息 ttl 过期
- 队列达到最大长度
设置死信队列
// 死信队列的 exchange 是个普通 exchangechannel.exchangeDeclare("dlx.exchange", // exhcangeName"topic", // typetrue, // durablefalse, // autoDeletenull // arguments);// 对应的 queue 也是一个普通的 queuechannel.queueDeclare("dlx.queue", // queueNametrue, // durablefalse, // exclusivefalse, // autoDeletenull); // arguments// binding////////////////////////////////////////////////////////注意 routing key 可以接受所有消息/////////////////////////////////////////////////////channel.queueBind("dlx.queue", "dlx.exchange", "#");
- 注意死信队列的 routingKey 是
#
绑定死信队列
- 给需要死信队列的队列加上属性
arguments.put("x-dead-letter-exchange", "dlx.exchange");```java String exchangeName = “normal_exchange”; String exchangeType = “topic”; String queueName = “normal_queue”;
// 声明一个普通的 exchange channel.exchangeDeclare( exchangeName, exchangeType, true, // durable false, // autoDelete null // argument );
Map
// 声明一个普通的 queue
channel.queueDeclare(
queueName,
true, // durable
false, // exclusive 独占模式
false, // autoDelete
arguments // 设置该普通队列拥有死信队列
);
channel.queueBind(queueName, exchangeName, “routingKey”);
```
