一、认识消息队列
1.1 什么是消息队列
消息:应用中传递的数据,可以是文本,对象等待。
消息队列:是一种应用间的通信方式,用于上下游传递消息,FIFO先入先出,消息发送上下游只需要依赖mq不用依赖其他服务。
1.2 消息队列的作用
- **峰
假如在一个订单系统中当应用服务器处理数据最大峰值是每秒一万单,当高峰期超过一万单就比较吃力,那么就可以使用消息队列,将一部分订单缓冲到队列中,后面再处理,让用户下单后十几秒后才收到下单成功的操作。
- **
当服务中A系统调用B系统,B需要花费很长的时间才能执行完成,一般情况下需要等待B执行完成后通知A,或者A定时查询B是否执行完成,使用MQ就可以让A监听B,只要B完成后发生一条消息被A监听到,这样A就能及时得到异步处理的结果;或者当A调用B系统,B系统整体逻辑与A没有关联,这个时候A系统可以将发送给B系统的任务发送到队列中,可以继续执行剩下的事情而不是等待。
- **
假如一个创建订单的过程需要调用订单系统,支付系统,库存系统等等;如果每个系统都是耦合在一起,任何一个子系统发生故障或者雪崩,整个下单就会中断,如果将各个子系统分开中间用MQ连接,当中间某个系统故障也会把下单进行下去,任务会存在队列中,故障被修复后会拉取队列中的任务,提高系统可用性。
1.3 mq的分类
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
| 社区活跃度 | 高 | 中 | 中 | 高 |
二、rabbitMQ简介
官网传送门
RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
2.1 rabbitMQ四大概念
生产者
交换机
rabbitmq中重要的组件,接收来自生产者发送的消息;将消息推送到队列中,交换机必须知道如果处理这些消息,是将消息推送到单个队列还是多个队列,还是丢弃,这个由交换机的类型决定。
队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
2.2 rabbitMQ工作原理
生产端生产消息通过连接把消息传到信道里面,信道里的消息再传给交换机,交换机根据routing key将消息路由到不同的队列中,队列收到消息后就将消息通过连接传到消费端的信道里,然后消费者就收到消息了。
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer和broker之间的TCP连接
Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:直连direct (point-to-point), 主题topic (publish-subscribe) and 广播fanout (multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
2.3 docker 安装rabbitMQ
官方指导 带有management表示带有控制台
docker run -di --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
三、简单入门
集成springboot的 pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.1 hello world
使用amqp 最基本的生产消费
生产者
package com.rem.rabbitmq.ee.Aquick;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/*** 生产者** @author Rem* @date 2021-12-12*/public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("106.14.72.13");factory.setPort(5672);factory.setUsername("rem");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {/***创建队列**queue 队列名称*durable 队列里的消息是否持久化 (服务重启是否存在) 默认在内存中,*exclusive 如果我们声明一个独占队列(仅限于此连接),则为 true (只能有一个消费者)* autoDelete 如果我们声明一个自动删除队列,则为 true(服务器将在不再使用时将其删除)*arguments 队列的其他属性(构造参数)*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";/***发送消息**exchange 发送到哪个交换机 默认交换机 用空表示*routingKey 路由键*props 消息的其他属性 -路由标头等* message 消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");} finally {channel.close();connection.close();}}}
消费者
package com.rem.rabbitmq.ee.Aquick;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;/*** 消费者** @author Rem* @date 2021-12-12*/public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("106.14.72.13");factory.setPort(5672);factory.setUsername("rem");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();try {//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.err.println("接收到的消息:" + new String(message.getBody()));};//取消消费回到 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.err.println("消费消息被中断");};/*** queue – 队列的名称* autoAck – 如果服务器应该考虑消息一旦发送就确认为真; 如果服务器应该期待显式确认,则为 false* DeliverCallback – 传递消息时的回调* cancelCallback – 消费者取消时的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);} finally {channel.close();connection.close();}}}
3.2 多个消费轮训接收消息
构造一个公共类
/*** 获取信道** @return* @throws Exception*/public static Channel getChannel() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("106.14.72.13");factory.setPort(5672);factory.setUsername("rem");factory.setPassword("123456");connection = factory.newConnection();channel = connection.createChannel();return channel;}/*** 关闭连接** @throws Exception*/public static void close() throws Exception {if (channel.isOpen()) {channel.close();}if (connection.isOpen()) {connection.close();}}public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.err.println(channel.isOpen());Connection connection = channel.getConnection();System.err.println(connection);close();}
生产者
package com.rem.rabbitmq.ee.Brotation;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 分发消息* 两个consumer 轮训接收消息** @author Rem* @date 2021-12-23*/public class RotationProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_ROTATION, false, false, false, null);/***发送消息**exchange 发送到哪个交换机 默认交换机 用空表示*routingKey 路由键*props 消息的其他属性 -路由标头等* message 消息内容*/Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", RabbitMqUtils.QUEUE_ROTATION, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
消费者1
package com.rem.rabbitmq.ee.Brotation;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** 消费者01** @author Rem* @date 2021-12-23*/public class Consumer01 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者01 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_ROTATION, true, deliverCallback, cancelCallback);}}
消费者2
package com.rem.rabbitmq.ee.Brotation;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** 消费者02** @author Rem* @date 2021-12-23*/public class Consumer02 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者02 等待消费");channel.basicConsume(RabbitMqUtils.QUEUE_ROTATION, true, deliverCallback, cancelCallback);}}
3.3 消息应答ack
package com.rem.rabbitmq.ee.Cack;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。* RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。* 在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。* 为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,* 消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。* <p>* <p>* 消息应答类型:* 1) 手动应答* ********消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。* 2) 自动应答* ********不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,实现自动补偿(队列投递过去 重新处理)* 这种模式需要在高吞吐量和数据传输安全性方面做权衡,* ** a.channel或者连接关闭 消息会丢失* ** b. 消息过载,并且对消息数量没有限制(或者消费者来不及处理) 消息积压,导致内存耗尽 消费者内存被系统杀死* 所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用* <p>* <p>* 消息应答类型方式:* A.Channel.basicAck(用于肯定确认) RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了* B.Channel.basicNack(用于否定确认)* C.Channel.basicReject(用于否定确认) 与Channel.basicNack相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了* <p>* <p>* Multiple: 批量处理* true代表批量应答channel上未应答的消息 包含还未应答的消息都会被确认收到应答消息* false 会应答当前tag的消息 之前的消息不会应答* <p>* <p>* 消息自动重新入队* 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,* RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。* <p>* 轮训消费消息时,即使其中有消费者在等待,也会均匀消费 当关闭其中一个消费者时会将为消费的消息转给另外一个消费者* <p>* <p>** @author Rem* @date 2021-12-23*/public class AckProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_ACK, false, false, false, null);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发送消息channel.basicPublish("", RabbitMqUtils.QUEUE_ACK, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.Cack;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.concurrent.TimeUnit;/**** @author Rem* @date 2021-12-23*/public class Consumer03 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));try {//休眠10sTimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}/*** DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签* 多个 – true 确认所有消息,包括提供的交付标签; false 仅确认提供的交付标签*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者03 等待消费");//手动应答boolean autoAck = false;channel.basicConsume(RabbitMqUtils.QUEUE_ACK, autoAck, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Cack;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.concurrent.TimeUnit;/*** @author Rem* @date 2021-12-23*/public class Consumer04 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));try {//休眠1sTimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}/*** DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签* 多个 – true 确认所有消息,包括提供的交付标签; false 仅确认提供的交付标签*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者04 等待消费");//手动应答boolean autoAck = false;channel.basicConsume(RabbitMqUtils.QUEUE_ACK, autoAck, deliverCallback, cancelCallback);}}
3.4 队列持久化
package com.rem.rabbitmq.ee.Ddurable;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 队列持久化 重启mq服务队列依旧存在* <p>* <p>* 如果队列已经持久化 再次创建就会报error* channel error; protocol method: #method<channel.close>(reply-code=406,* reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'QUEUE_ACK' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)** @author Rem* @date 2021-12-23*/public class DurableProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/***创建队列**queue 队列名称*durable 队列里的消息是否持久化 (服务重启是否存在) 默认在内存中, true 持久化*exclusive 如果我们声明一个独占队列(仅限于此连接),则为 true*autoDelete 如果我们声明一个自动删除队列,则为 true(服务器将在不再使用时将其删除)*arguments 队列的其他属性(构造参数)*/channel.queueDeclare(RabbitMqUtils.QUEUE_DURABLE, true, false, false, null);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();/***发送消息**exchange 发送到哪个交换机 默认交换机 用空表示*routingKey 路由键*props 消息的其他属性 -路由标头等**MessageProperties.PERSISTENT_TEXT_PLAIN: 消息持久化***message 消息内容*/channel.basicPublish("", RabbitMqUtils.QUEUE_DURABLE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.Ddurable;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer05 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者05 等待消费");//手动应答boolean autoAck = true;channel.basicConsume(RabbitMqUtils.QUEUE_DURABLE, autoAck, deliverCallback, cancelCallback);}}
3.5 不公平分发
package com.rem.rabbitmq.ee.Eunfair;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 不公平分发* 在消费者方 设置 channel.basicQos(1); 消费消息则不是轮训分发* <p>* <p>* basicQos 设置的是缓存区大小 0 到 65535 即unack的消息数量* 虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗* <p>* 消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,* 不同的负载该值取值也不同100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为1是最保守的。* 当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的** @author Rem* @date 2021-12-23*/public class UnfairProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_UNFAIR, false, false, false, null);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发送消息channel.basicPublish("", RabbitMqUtils.QUEUE_UNFAIR, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.Eunfair;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.concurrent.TimeUnit;/*** @author Rem* @date 2021-12-23*/public class Consumer06 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));try {//休眠10sTimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}/*** DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签* 多个 – true 确认所有消息,包括提供的交付标签; false 仅确认提供的交付标签*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//取消消费回到 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者06 等待消费");/*** 不公平分发 预取值 0 到 65535*/channel.basicQos(1);//手动应答boolean autoAck = false;channel.basicConsume(RabbitMqUtils.QUEUE_UNFAIR, autoAck, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Eunfair;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.concurrent.TimeUnit;/*** @author Rem* @date 2021-12-23*/public class Consumer07 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));try {//休眠1sTimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}/*** DeliveryTag – 来自收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签* 多个 – true 确认所有消息,包括提供的交付标签; false 仅确认提供的交付标签*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> {System.out.println("消费消息被中断");};System.out.println("消费者07 等待消费");/*** 不公平分发*/// channel.basicQos(1);//手动应答boolean autoAck = false;channel.basicConsume(RabbitMqUtils.QUEUE_UNFAIR, autoAck, deliverCallback, cancelCallback);}}
四、发布确认
单个发布确认
package com.rem.rabbitmq.ee.Fconfirm;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** 发布确认* 1.要求队列必须持久胡* 2.要求队列里的消息必须持久化* 3.发布确认* <p>* <p>* 发布确认原理* 所有在该信道上面发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),* 这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出* <p>* <p>* 单个发布确认 (同步) 也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发 发布速度特别的慢* 同步等待确认,简单,但吞吐量非常有限。* waitForConfirms和waitForConfirmsOrDie的区别是异常后 waitForConfirmsOrDie信道被关闭** @author Rem* @date 2021-12-24*/public class SingleConfirmProducer {private final static int MESSAGE_COUNT = 1005;public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_CONFIRM, true, false, false, null);/*** 启用发布确认*/channel.confirmSelect();System.err.println("开始发送消息...");long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + ":confirm";channel.basicPublish("", RabbitMqUtils.QUEUE_CONFIRM, null, message.getBytes());/*** 等到自上次调用以来发布的所有消息都被代理确认或确认。请注意,在非确认通道上调用时,waitForConfirms 会抛出 IllegalStateException。** return 是否所有消息都被确认(并且没有消息被确认)服务端返回false或超时时间内未返回,生产者可以消息重发*/// channel.waitForConfirmsOrDie(3000);boolean flag = channel.waitForConfirms();if (flag) {System.out.println(i + "消息发送成功");} else {System.out.println(i + "消息发送失败");}}long end = System.currentTimeMillis();System.err.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");} finally {RabbitMqUtils.close();}}}
批量发布确认
package com.rem.rabbitmq.ee.Fconfirm;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** 发布确认* 1.要求队列必须持久胡* 2.要求队列里的消息必须持久化* 3.发布确认* <p>* <p>* 发布确认原理* 所有在该信道上面发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),* 这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出* <p>* 批量确认 每发100个确认一次 这种方案仍然是同步的,也一样阻塞消息的发布* 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。** @author Rem* @date 2021-12-24*/public class BatchConfirmProducer {private final static int MESSAGE_COUNT = 1005;public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_CONFIRM, true, false, false, null);/*** 启用发布确认*/channel.confirmSelect();System.err.println("开始发送消息...");//批量确认消息大小int batchSize = 100;//未确认消息个数int outstandingMessageCount = 0;long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + ":confirm";channel.basicPublish("", RabbitMqUtils.QUEUE_CONFIRM, null, message.getBytes());/*** 等到自上次调用以来发布的所有消息都被代理确认或确认。请注意,在非确认通道上调用时,waitForConfirms 会抛出 IllegalStateException。** return 是否所有消息都被确认(并且没有消息被确认)服务端返回false或超时时间内未返回,生产者可以消息重发*/outstandingMessageCount++;if (outstandingMessageCount == batchSize) {channel.waitForConfirms();outstandingMessageCount = 0;System.out.println(i + "消息批量" + batchSize + "发送成功");}}/*** 最后未被确认的 再次确认*/if (outstandingMessageCount > 0) {channel.waitForConfirms();System.out.println("last 消息发送成功");}long end = System.currentTimeMillis();System.err.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");} finally {RabbitMqUtils.close();}}}
异步发布确认
package com.rem.rabbitmq.ee.Fconfirm;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;/*** 发布确认* 1.要求队列必须持久胡* 2.要求队列里的消息必须持久化* 3.发布确认* <p>* <p>* 发布确认原理* 所有在该信道上面发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),* 这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出* <p>* <p>* 异步确认发布 是通过函数回调来保证是否投递成功 可靠和效率都最高,实现比较复杂 性价比最高* confirm模式最大的好处在于他是异步的* 当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息** @author Rem* @date 2021-12-24*/public class AsyncConfirmProducer {private final static int MESSAGE_COUNT = 1005;public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//声明队列channel.queueDeclare(RabbitMqUtils.QUEUE_CONFIRM, true, false, false, null);/*** 启用发布确认*/channel.confirmSelect();System.err.println("开始发送消息...");/*** 线程安全有序的哈希表,适用于高并发* 1.轻松的将序列号与消息关联* 2.轻松批量删除条目 只要给到序号* 3.支持高并发**/ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();/*** 发布确认回调 消息确认成功* deliveryTag 消息的标记* multiple 是否为批量*/ConfirmCallback ackCallback = (deliveryTag, multiple) -> {ConcurrentNavigableMap<Long, String> confirmed = null;if (multiple) {//返回的是小于等于当前序列号的确认消息 是一个mapconfirmed = outstandingConfirms.headMap(deliveryTag);} else {outstandingConfirms.remove(deliveryTag);}if (confirmed != null) {confirmed.clear();}System.out.println("成功的消息" + deliveryTag);};/*** 发布确认回调 消息确认失败*/ConfirmCallback nackCallback = (deliveryTag, multiple) -> {System.out.println("未确认的编号" + deliveryTag + "消息:" + outstandingConfirms.get(deliveryTag));};/*** 消息监听器*/channel.addConfirmListener(ackCallback, nackCallback);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + ":confirm";channel.basicPublish("", RabbitMqUtils.QUEUE_CONFIRM, null, message.getBytes());outstandingConfirms.put(channel.getNextPublishSeqNo(), message);}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");} finally {RabbitMqUtils.close();}}}
五、认识rabbitMQ交换机
广播交换机
package com.rem.rabbitmq.ee.Gexchanges.fanout;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.Scanner;/*** 交换机-广播模式* 将接收到的所有消息广播到它知道的所有队列中** @author Rem* @date 2021-12-24*/public class FanoutExchangeProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机* exchange: 交换机的名称* type:交换机的类型**/channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();//发送消息channel.basicPublish(RabbitMqUtils.EXCHANGE_FANOUT, "", null, message.getBytes());System.err.println("发送消息完毕" + message);}}}
package com.rem.rabbitmq.ee.Gexchanges.fanout;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer08 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//在生产者已经创建过//channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT);/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_FANOUT, "");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者08 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Gexchanges.fanout;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer09 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_FANOUT, "");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者09 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
直连交换机
package com.rem.rabbitmq.ee.Gexchanges.direct;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;import java.util.Scanner;/*** 交换机-直连模式* 将消息发送到绑定的routingkey的队列中去 没有绑定的消息会被丢弃** @author Rem* @date 2021-12-24*/public class DirectExchangeProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机* exchange: 交换机的名称* type:交换机的类型**/channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);/*** 创建多个bindingKey* 没有与routingKey绑定的消费者 消息发出后会丢失*/Map<String, String> routingKeyMap = new HashMap<>();routingKeyMap.put("1", "green");routingKeyMap.put("2", "blue");routingKeyMap.put("3", "red");routingKeyMap.put("4", "pink");while (scanner.hasNext()) {String message = scanner.next();String routingKey;for (Map.Entry<String, String> entry : routingKeyMap.entrySet()) {String key = entry.getKey();if (message.equals(key)) {routingKey = entry.getValue();//发送消息channel.basicPublish(RabbitMqUtils.EXCHANGE_DIRECT, routingKey, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}}}
package com.rem.rabbitmq.ee.Gexchanges.direct;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer10 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_DIRECT, "green");channel.queueBind(queue, RabbitMqUtils.EXCHANGE_DIRECT, "red");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者10 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Gexchanges.direct;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer11 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_DIRECT, "blue");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者11 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
主题交换机
package com.rem.rabbitmq.ee.Gexchanges.topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rem.rabbitmq.ee.RabbitMqUtils;import java.util.HashMap;import java.util.Map;import java.util.Scanner;/*** 交换机-主题模式 必须是一个单词列表,以点号分隔开 最多不能超过255个字节* routingkey 匹配规则* *(星号)可以代替一个单词* #(井号)可以替代零个或多个单词* <p>* <p>* 将消息发送到绑定的routingkey的队列中去 没有绑定的消息会被丢弃** @author Rem* @date 2021-12-24*/public class TopicExchangeProducer {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 声明一个交换机* exchange: 交换机的名称* type:交换机的类型**/channel.exchangeDeclare(RabbitMqUtils.EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC);System.err.println("开始发送消息...");Scanner scanner = new Scanner(System.in);/*** 创建多个bindingKey* 没有与routingKey绑定的消费者 消息发出后会丢失*/Map<String, String> routingKeyMap = new HashMap<>();routingKeyMap.put("1", "green.blue.red.pink");routingKeyMap.put("2", "green.blue.red");routingKeyMap.put("3", "green.blue");routingKeyMap.put("4", "green");routingKeyMap.put("5", "blue.red.pink");routingKeyMap.put("6", "blue.red");routingKeyMap.put("7", "blue");routingKeyMap.put("8", "red.pink");routingKeyMap.put("9", "pink");while (scanner.hasNext()) {String message = scanner.next();String routingKey;for (Map.Entry<String, String> entry : routingKeyMap.entrySet()) {String key = entry.getKey();if (message.equals(key)) {routingKey = entry.getValue();//发送消息channel.basicPublish(RabbitMqUtils.EXCHANGE_TOPIC, routingKey, null, message.getBytes());System.err.println("发送消息完毕" + message);}}}}}
package com.rem.rabbitmq.ee.Gexchanges.topic;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer12 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_TOPIC, "*.blue");channel.queueBind(queue, RabbitMqUtils.EXCHANGE_TOPIC, "#.red");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者12 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Gexchanges.topic;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer13 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_TOPIC, "green.*");channel.queueBind(queue, RabbitMqUtils.EXCHANGE_TOPIC, "#.blue.#");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者13 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
package com.rem.rabbitmq.ee.Gexchanges.topic;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rem.rabbitmq.ee.RabbitMqUtils;/*** @author Rem* @date 2021-12-23*/public class Consumer14 {public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();/*** 创建一个临时队列 队列名称随机* 当消费者断开和该队列的连接时,队列自动删除*/String queue = channel.queueDeclare().getQueue();/*** 将临时队列绑定到交换机*/channel.queueBind(queue, RabbitMqUtils.EXCHANGE_TOPIC, "pink");//消费回调DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息:" + new String(message.getBody()));//取消消费回调 如果在消费的时候队列被删除了CancelCallback cancelCallback = consumerTag -> System.out.println("消费消息被中断");System.out.println("消费者14 等待消费");channel.basicConsume(queue, true, deliverCallback, cancelCallback);}}
