- rabbitmq是放入queue之前逻辑复杂(所有连接到queue的都消费)
kafka是从queue(partition)消费逻辑复杂(消费者组来决定哪些节点消费哪)
Exchange
如果不指定exchange,使用默认exchange,默认exchange绑定 RoutingKey 与队列名称相同。
四种Exchange:fanout,direct,topic,header
direct:routing key 与 bind key 匹配
- fanout:会忽略rountingkey
-
routing key & binding key
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
RabbitMQ 6种模式
RabbitMQ Tutorials — RabbitMQ工作队列模式(默认exchange)
发布订阅模式(fanout exchange分发)
路由模式(direct exchange路由)
主题模式(topic exchange模糊路由)
rpc模式
发送端
同步确认模式
```java //将channel设置为confirm模式 channel.confirmSelect();
//发送数据 for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); }
//同步等待 if(!channel.waitForConfirms()){ System.out.println(“send message failed.”); }
<a name="Ami5Z"></a>
## 异步确认模式【ConfirmListener】
不推荐?推荐使用单条的消息确认模式。
```java
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
//异步回调
channel.addConfirmListener(new ConfirmListener() {
//deliveryTag:消息编号,多个发送者会重复有并发问题
//multiple:是否连带前面的确认
//回调和主方法不在一个线程执行
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
exchange路由确认【ReturnCallback】【mandatory】
【注意】exchange路由不到queue(并不是网络问题,重试没有意义,需要人工介入),ConfirmCallback也会返回ack=true。同时也会调用ReturnCallback(当mandatory设置为true时)
- ConfirmCallback 只确认是否正确到达 Exchange 中
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- mandatory【没有queue】:
- 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
- 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
- immediate【没有消费者】:
- true:当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
RabbitMQ之mandatory和immediate_朱小厮的博客-CSDN博客
RabbitTemplate发送消息确认:correlation
- 设置PublisherConfirmType & PulisherReturns
- 发送设置correlationData(设置id)
- ConfirmCallback收到correlationData
消费端
消费端确认 & 重回队列
//手动发送ack
//deliveryTag是自增的,第二参数是是否批量ack
channel.basicAck(envelope.getDeliveryTag(),false);
//第三个参数是是否重回队列,一般不用。
channel.basicNack(envelope.getDeliveryTag() , false , true);
消费端限流qos
当消费端有一定数量的消息未被ack时,RabbitMQ不会推送新消息。
例如,有两个消费者,MQ一股脑将消息推送给一个消费者。另一个没事做。
首先是在非自动确认的前提下。autoAck设置为false。
// void basicQos(int prefetchSize, int prefetchCount, boolean global)
//允许通过总数prefetchSize=0 不限制(rabbitmq没实现)
//prefetchCount每次最大发送
//global=false 只设置到当前consumer,否则设置到整个Channel(rabbitmq没实现)
channel.basicQos(0, 1, false);
//autoAck=false
channel.basicConsume(queueName, false, new MyConsumer(channel));
basicReject / basicNack / basicRecover区别
//第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。 channel.BasicReject(result.DeliveryTag, false);
//【批量】退回或删除, //参数2: 是否批量确认 true是/false否 (也就是只一条) //参数3:是否requeue channel.BasicNack(result.DeliveryTag, true, true);
//参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。 //【不单单是拒绝的问题,是请求MQ重新发送,发送给谁由参数决定】 channel.BasicRecover(true);
rabbitmq basicReject / basicNack / basicRecover区别_t0m的专栏-CSDN博客_basicnack
Queue
死信队列
设置死信队列
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
最终test_dlx_queue中的死信都发送到dlx.queue中了。
死信消息
- 消息被拒绝basic.reject、basic.nack并且requeue=false
- ttl过期
-
Spring RabbitMQ
消费端
SimpleMessageListenerContainer
手动确认
-
消费端限流
MessaListenerAdapter【自定义 队列-执行方法】
MessageConverter
@RabbitListener
配合配置