MQ
是什么❓ message queue 消息队列。FIFO。跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种常见的上下游”逻辑解耦+物理解耦”的消息通信服务。使用MQ后,消息发送上游只需要依赖MQ,不需要依赖其他服务。
🤔为什么使用MQ
流量消峰
当高峰期下单操作次数过多无法处理时,使用消息队列做缓冲,把1s内下的订单分散成一段时间来处理。
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物理系统、支付系统。用户创建订单后,耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,会造成下单操作异常。
转变为基于消息队列的方式后,系统间调用的问题会减少,一个系统发生故障,该系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。系统恢复后继续处理订单信息即可。用户感受不到物流系统的故障,提升系统可用性。
异步处理
服务之间调用异步时,如A调用B,B需要花费很长时间执行。一般通过①A过段时间调用B的查询api查询B的情况②A提供一个callback api,B执行完之后调用api通知A服务。
使用消息总线,A调用B服务后,只需要监听B处理完成的信息,B处理完成后发送信息给MQ,MQ将信息转发给A服务。A既不用循环调用B的查询api,也不用提供callback api。B也不用做这些操作,A也可以及时的得到异步处理成功的信息。
🗑分类
ActiveMQ
优点: 单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性。较低的概率丢失数据
缺点:高吞吐量场景较少使用
Kafka
为大数据而生的消息中间件。百万级TPS的吞吐量。分布式,一个数据多个副本,少数及其宕机不会导致不可用。消费者采用pull的方式获取消息,通过控制能保证所有消息消费且仅被消费一次,第三方kafka web管理界面kafka-manager.大数据领域的实时计算和日志采集被大规模使用。
缺点:单机超过64队列/分区,Load会发生明显的飙高,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消息失败不支持重试。支持消息顺序,但是一台代理宕机后,就会产生消息乱序。
RocketMQ
优点:单机吞吐量十万级,分布式架构,消息0丢失,支持10亿级别的消息堆积。alibaba
缺点:支持java,c++。没有在mq核心中实现JMS接口。
RabbitMQ
AMQP advanced message queuing protocol 高级消息队列协议。可复用的企业消息系统。
Rabbit MQ
核心概念
生产者
交换机
一方面它接收来自生产者的消息,另一方面将消息推送到队列中。交换机必须明确知道如何处理它接收到的信息,是将这些消息推送到特定队列还是多个队列或者把消息丢弃,由交换机决定。
队列
消息流经RabbitMQ和应用程序,但只存在队列,队列仅受主机内存和磁盘限制的约束,本质上是大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。
消费者
等待接收消息的程序。同一个应用程序可能是生产者也可能是消费者。生产者消费者和消息中间件很多时候并不在同一机器。
Broker:接收和分发消息的应用。RabbitMQ Server就是Message Broker。
Virtual host:出于多租户和安全因素设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace。多个不同的用户使用同一个RabbitMQ server提供的服务时,划分出多个vhost,每个用户在自己的vhost仓库创建exchange/queue等。
Connection:publisher/consumer和broker之间的TCP连接。
Channel:每次访问Rabbitmq都建立一个connection,消息量大的时候建立Tcp connection的开销巨大,效率也低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯。AMQP method包含了channel id帮助客户端和message broker识别chennel,所以channel之间完全隔离,channl作为轻量级的connection极大减少了操作系统建立TCP connection的开销。
Exchange:message达到broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中。常用类型:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)
Queue:消息最终被送到Queue等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange的查询表中,用于message的分发依据。
下载与初步使用
rabbitmq-server-3.8.8-1.el7.noarch.rpm
GitHub:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8
erlang-21.3.8.21-1.el7.x86_64.rpm
官网:https://www.erlang-solutions.com/downloads/
加速:https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.21-1.el7.x86_64.rpm
- 安装命令
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
- 添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
- 查看服务状态
/sbin/service rabbitmq-server status
- 停止使用
/sbin/service rabbitmq-server stop
- 开启web管理插件
rabbitmq-plugins enable rabbitmq_management
- 访问地址 http://公网ip:15672/(阿里云服务器要在安全组配置15672端口
- 添加新的账户
abbitmqctl add_user admin 123
- 设置用户角色
rabbitmqctl set_user_tags admin administrator
hello world
package com.atcompany.rabbitmq.one;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 生产者代码*/public class Producer {//队列public static final String queue_name = "hello";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂ip 连接rabbitmq的队列factory.setHost("121.40.98.13");factory.setUsername("admin");factory.setPassword("123");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();//采用默认交换机 信道直接连接队列/*** 队列名 string*boolean 队列里面的消息是否持久化(磁盘 默认情况消息存储在内存中* 队列是否只供一个消费者进行消费 是否进行消息共享 boolean* String 队列最后一个消费者断开连接后,队列是否自动删除* Map 其他参数*/channel.queueDeclare(queue_name,true,false,false,null);String msg = "hello_world";/*** 发送一个消息* void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;* string exchange 交换机* 路由key值 string* basciproperties 其他参数消息* byte[] 发送消息的消息体*///交换机暂定为"" 而非nullchannel.basicPublish("",queue_name,null,msg.getBytes());System.out.println("消息发送完毕");}}
package com.atcompany.rabbitmq.two;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 消费者 接收消息*/public class Consumer {//队列名称public static final String queue_name="hello";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("121.40.98.13");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// void handle(String var1, Delivery var2) throws IOException;//消息接收的回调函数DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//取消消息的回调函数//void handle(String var1) throws IOException;CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** String basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) throws IOException;* string 队列名称* boolean 消费者是否自动应答* DeliverCallback 消费者未成功消费的自动应答 @FunctionalInterface* CancelCallback 消费者取消消费的回调 @FunctionalInterface*/channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);}}

可能会遇到的错误:https://blog.csdn.net/pang_ping/article/details/111227552
〓work queues
工作队列的主要思想 是避免立即执行资源密集型任务,不得不等待它完成。安排任务在之后执行,把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将轮询处理这些任务。
可能会遇到的错误:https://blog.csdn.net/kangguang/article/details/104555546
package com.atcompany.rabbitmq.two;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 工作线程,相当于消费者*/public class Work01 {public static final String queue_name = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息是:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag)->{System.out.println("消费者取消消费接口回调逻辑");};System.out.println("c2等待接收消息...");channel.basicConsume(queue_name,true,deliverCallback,cancelCallback);}}
edit configuration->allow parallel run 开启两个或多个并行work01线程 模拟多个工作队列
package com.atcompany.rabbitmq.two;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9* 生产者*/public class Task01 {public static final String queue_name = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();//生产者初始化队列channel.queueDeclare(queue_name,false,false,false,null);System.out.println("请输入要发送的消息:");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String msg = scanner.next();channel.basicPublish("",queue_name,null,msg.getBytes());System.out.println("发送消息完成:"+msg);}}}
✉️消息应答
消费者完成任务可能需要一段时间。Rabbitmq一旦向消费者传递一条消息,便立即将该消息标记为删除。这种情况下,突然有一个消费者宕机,将会丢失该消费者正在处理和后续发送给该消费者的消息,因为它无法接收。
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制。❓消费者在接收到消息并处理该消息之后,告诉rabbitmq该消息已被处理,rabbitmq可以删除该消息。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。
如果消息在接收到之前,消费者出现连接或channel关闭,消息会丢失。这种模式下消费者可以传递过载的消息,没有对传递的消息数量进行限制,虽然可能导致消费者由于接收太多而来不及处理消息,积压消息,最终耗尽内存,消费者线程被操作系统杀死。这种模式仅适用于消费者可以高效并以某种速率处理消息时。
手动应答
- Channel.basicAck 肯定确认
- Channel.basicNack 否定确认
Channel.basicReject 否定确认 ```java
//Multiple
void basicAck(Channel var1, long var2, boolean var4);
void basicNack(Channel var1, long var2);
void basicReject(Channel var1, long var2);
手动应答可以批量应答Multiple并且减少网络拥堵。<br />**Multiple**:true 批量应答Channel上未应答的消息。<br />false 只应答收到的消息。```javapackage autoAck;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9* 生产者* 消息在手动应答时不丢失,丢失会放回队列重新消费*/public class Task01 {public static final String queue_name = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();channel.queueDeclare(queue_name,false,false,false,null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String msg = scanner.next();channel.basicPublish("",queue_name,null,msg.getBytes());System.out.println("生产者发送信息:"+msg);}}}
C1休眠1s应答,c2休眠30s应答,随着消息的堆积,堆积的消息会重新入队,由队列选择转置c1处理。(但是我没演示出来这个情况
package autoAck;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.atcompany.rabbitmq.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9*/public class Work03 {public static final String queue_name = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();System.out.println("c1等待消息处理,时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(1);System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));/** void basicAck(long var1, boolean multiple) throws IOException;* 消息接收后要进行手动应答* long 消息的标记 tag* boolean 是否批量应答(批量应答当前接收之前未接收的消息 & 只应答当前消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = (consumerTag)->{};//关闭自动应答boolean autoAck=false;channel.basicConsume(queue_name,autoAck,deliverCallback,cancelCallback);}}
package autoAck;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.atcompany.rabbitmq.Utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9*/public class Work04 {public static final String queue_name = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();System.out.println("c1等待消息处理,时间较长");DeliverCallback deliverCallback = (consumerTag, message)->{SleepUtils.sleep(30);System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));/** void basicAck(long var1, boolean multiple) throws IOException;* 消息接收后要进行手动应答* long 消息的标记 tag* boolean 是否批量应答(批量应答当前接收之前未接收的消息 & 只应答当前消息*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);、};CancelCallback cancelCallback = (consumerTag)->{};//关闭自动应答boolean autoAck=false;channel.basicConsume(queue_name,autoAck,deliverCallback,cancelCallback);}}
消息自动重新入队
消费者由于某些原因失去连接(通道关闭、连接关闭、tcp连接丢失),导致其他消息未发送ack确认,rabbitmq将了解到消息未完全处理,会对其重新排队。如果此时其他消费者可以处理,将很快重新分发给另一个消费者。即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
Rabbitmq持久化
为了保证rabbitmq停机状态消息生产者发送的消息不丢失,需要将队列和消息都标记为持久化。
队列的持久化
//需要删除原队列 重新创建一个队列boolean durable = true;//在声明队列时设置队列持久化channel.queueDeclare(queue_name,durable,false,false,null);
持久化的队列在rabbitmq的管理中心的features属性显示 D (urable
消息的持久化
//生产者发送信息时设置消息持久化 MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish("",queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
消息的持久化是告诉rabbitmq将信息存储到磁盘。但是仍存在 消息刚准备存储到磁盘,还在缓存的一个间隔点,没有真正写入磁盘。持久性保证不强。
🔌不公平分发
rabbitmq采用的是轮询分发,但当两个处理任务速度不同的消费者处理任务时,处理速度快的消费者很大一部分时间处于空闲状态,处理速度慢的消费者一直在处理。为了避免这种情况,设置参数channel.basicQos(1);
//在消费者中设置int prefetchCount = 1;channel.basicQos(prefetchCount);
预取值prefetch
消息异步发送,任何时候channel上肯定不止一个消息。来自消费者的手动确认也是异步的。因此存在一个未确认的消息缓冲区,可以通过basic.qos方法来限制缓冲区的大小,避免缓冲区里面无限制的未确认消息问题。该值定义通道上允许的未处理消息的最大数量。一旦数量达到配置数量,rabbitmq将停止传递更多信息。消息应答和qos预取值对用户吞吐量影响重大,增加预期将提高向消费者传递消息的速度。虽然自动应答传输消息速率最佳,但这种情况下已传递但未处理的消息数量也会增加,从而增加了消费者的RAM消耗。
小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费大量信息没有确认会导致消费者连接节点的内存消耗变大,因此需要一个合适的预取值。
发布确认
开启发布确认的方法
//生产者开启发布确认channel.confirmSelect();
单个发布确认
同步发布确认,一条信息发布之后只有被确认发布,后续的信息才可以继续发布。发布速度特别慢。
package com.atcompany.rabbitmq.ConfirmSelect;//单个确认public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {Channel channel = RabbitmqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);channel.confirmSelect();//开启时间long beginTime = System.currentTimeMillis();for(int i=0;i<msg_count;i++){String msg = i+"";channel.basicPublish("",queueName,null,msg.getBytes());//单个消息 马上进行发布确认boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long endTime = System.currentTimeMillis();System.out.println("发布"+msg_count+"个单独确认消息,耗时"+(endTime-beginTime));}output:发布1000个单独确认消息,耗时36932
批量确认发布
缺点:发生故障导致发布出现问题时,不知道哪个消息出现问题,必须将整个批处理保存在内存中,以记录重要的消息而后重新发布。
一百条信息的批量确认
public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {Channel channel = RabbitmqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);channel.confirmSelect();//开启时间int batchSize = 1000;long beginTime = System.currentTimeMillis();//批量发送信息for(int i=0;i<msg_count;i++){String msg = i+"";channel.basicPublish("",queueName,null,msg.getBytes());//批量确认:batchsizeif(i%batchSize==0){//批量发布确认channel.waitForConfirms();}}long endTime = System.currentTimeMillis();System.out.println("发布一千条信息,耗时:"+(endTime-beginTime));}//output:发布一千条信息,耗时:274
异步确认发布
利用回调函数来达到消息可靠性传递,中间件通过回调函数保证是否投递成功。
public static void publicMessageAsync() throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);channel.confirmSelect();//开启时间long beginTime = System.currentTimeMillis();//批量发送信息//需要监听消息//消息确认成功回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{System.out.println("确认的消息:"+deliveryTag);};/*** 消息标记 deliverTag* 是否为批量确认 multiple*///消息确认失败回调函数ConfirmCallback nackCallback = (deliverTag,multiple)->{System.out.println("未确认的消息:"+deliverTag);};channel.addConfirmListener(ackCallback,nackCallback);for(int i=0;i<msg_count;i++){String msg = i+"";channel.basicPublish("",queueName,null,msg.getBytes());//异步确认}long endTime = System.currentTimeMillis();System.out.println("异步发布一千条信息,耗时:"+(endTime-beginTime));}output:异步发布一千条信息,耗时:395
处理异步未发送信息
把未确认的消息放在一个基于内存的能被发布线程访问的队列。例如 ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。
发布一条消息,就在队列里记录。确认一条消息,就在队列里删除。队列中的剩余消息就是未确认消息。❓❓❓ 存疑。正常队列先进先出,如何确认消息时队首正好是该消息。有可能发送了许多消息后一点点确认,那么就无法正常删除了….?
使用ConcurrentSkipListMap来辅助异步发送
//异步发布确认public static void publicMessageAsync() throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,true,false,false,null);channel.confirmSelect();//适用于高并发 线程安全的哈希表ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();//开启时间long beginTime = System.currentTimeMillis();//批量发送信息//需要监听消息//消息确认成功回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{if(multiple) {//channel.getNextPublishSeqNo() = deliverTag 消息投递序号System.out.println("确认的消息:" + deliveryTag);//headMap() 返回映射中严格小于tokey的键的视图//如果是批量传送信息 将会在该确定消息之前的消息都删除ConcurrentNavigableMap<Long, String> confirmdmap =map.headMap(deliveryTag);confirmdmap.clear();}else{map.remove(deliveryTag);}};/*** 消息标记 deliverTag* 是否为批量确认 multiple*///消息确认失败回调函数ConfirmCallback nackCallback = (deliverTag,multiple)->{String msg = map.get(deliverTag);System.out.println("未确认的消息:"+msg);System.out.println("未确认的消息tag"+deliverTag);};channel.addConfirmListener(ackCallback,nackCallback);for(int i=0;i<msg_count;i++){String msg = i+"";channel.basicPublish("",queueName,null,msg.getBytes());//异步确认//Long Stringmap.put(channel.getNextPublishSeqNo(),msg);}//一个线程监听消息,另一个线程发布消息long endTime = System.currentTimeMillis();System.out.println("异步发布一千条信息,耗时:"+(endTime-beginTime));}
速度对比
- 单独确认 简单 吞吐量有限
- 批量确认 一旦出问题很难判断是哪条信息
- 异步处理 实现困难
交换机
在上面的部分,交换机被设置为“”,假设工作队列背后每个任务都恰好交付给一个消费者(工作进程)。
将消息传达给多个消费者,这种模式称为 发布/订阅模式。
rabbitmq消息传递模型的核心思想:生产者生产的消息从不会直接发送到队列。生产者只能把消息发送给交换机。交换机一方面接收来自生产者的消息,另一方面将他们推入队列。交换机确切知道如何处理收到的消息:把消息放到特定队列还是丢弃,这由交换机的类型决定。类型
直接(direct 主题(topic 标题(headers 扇出(fanout无名exchange
默认交换,通过空字符串进行表示。channel.basicPublish("",queueName,null,msg.getBytes());
临时队列
队列名称随机,消费者断开与队列的连接时,队列就自动删除String queueName = channel.queueDeclare().getQueue();
绑定binding
Fanout
将接收到的所有消息广播到它知道的所有队列中。 ```java package com.atcompany.rabbitmq.fanout;
import com.atcompany.rabbitmq.Utils.RabbitmqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;
/**
- Created by IntelliJ IDEA.
- User: luna
- Date: 2022/2/9
发消息给交换机 */ public class EmitLog {
public static final String exchange_name = “logs”;
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(exchange_name,"fanout");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String msg = scanner.next();channel.basicPublish(exchange_name,"",null,msg.getBytes("UTF-8"));System.out.println("生产者发送消息:"+msg); }
} }
```javapackage com.atcompany.rabbitmq.fanout;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9* 消息接收*/public class ReceiveLogs01 {public static final String exchange_name = "logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();//扇出类型的交换机channel.exchangeDeclare(exchange_name,"fanout");//临时队列 队列名称随机,消费者断开与队列的连接时,队列就自动删除String queue = channel.queueDeclare().getQueue();//绑定队列和交换机 routekeychannel.queueBind(queue,exchange_name,"");System.out.println("ReceiveLogs01等待接收消息,把接收到的消息打印...");//消费者取消消息时回调接口DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(queue,true,deliverCallback,consumerTag->{});}}
package com.atcompany.rabbitmq.fanout;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9*/public class ReceiveLogs02 {public final static String exchange_name = "logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(exchange_name,"fanout");String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,exchange_name,"");System.out.println("ReceiveLogs02等待接收消息...");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(queue,true,deliverCallback,consumerTag->{});}}
生产者发送的信息,两个消费者都可以接收处理。删除交换机和routeKey参数无关。
direct
当我们希望将日志消息写入磁盘的程序仅接受严重错误(errors),而不存储警告(warning)或信息(info)日志消息,避免浪费磁盘空间。fanout这种交换机类型灵活性不够高,它会进行无意识的广播。
direct交换机的消息发送到它绑定的routingKey中。
如果绑定的多个队列key都相同,即多重绑定,会和fanout表现类似,开始广播。
edit configurations->allow parallel run ✅
package com.atcompany.rabbitmq.direct;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.Scanner;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9*/public class DirectLog {public static final String exchange_name = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String msg = scanner.next();channel.basicPublish(exchange_name,"warning",null,msg.getBytes("UTF-8"));System.out.println("生产者发送消息:"+msg);}}}
生产者所在队列不重要,更改生产者和消费者的routingKey可以实现向特定消费者发送信息。
package com.atcompany.rabbitmq.direct;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9*/public class ReceiveLogsDirect01 {//command shift u 小写变大写public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();//扇出类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//临时队列 队列名称随机,消费者断开与队列的连接时,队列就自动删除channel.queueDeclare("disk", false, false, false, null);//绑定队列和 交换机 routekeychannel.queueBind("disk", EXCHANGE_NAME, "error");System.out.println("ReceiveLogs03等待接收error,把接收到的消息打印...");DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("ReceiveLogsDirect03接收到消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("disk",true,deliverCallback,consumerTag->{});}}
Topics
direct交换机可以有选择性的接收日志,但仍存在局限性。如果想要接收info.base和info.advantage类型的日志,而一个队列只能接收info.base的消息,direct无法实现多种类型的日志接收。
topic交换机的routing_key必须是一个单词列表,以点号分开。单词列表最多不能超过255个字节。
匹配规则:*可以代替一个单词。#可以代替零个或多个单词。
- .orange.->中间为orange带三个单词的字符串
- ..rabbit->最后一个单词的rabbit的三个单词
- lazy.#->第一个单词是lazy的多个单词
- 当一个队列绑定键是#,那么该队列将接收所有数据,fanout
- 当队列绑定键中没有#和*出现,direct
由于点号连接,单词可以满足多个routing_key的要求,可以同时发送给多个队列。
package com.atcompany.rabbitmq.Topic;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9* 主题交换机及相关队列*/public class ReceiveLogsTopic01 {public static final String exchange_name = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(exchange_name,"topic");String queueName = "C2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,exchange_name,"lazy.#");System.out.println("等待接收消息...");DeliverCallback deliverCallback = (consumerTag, message)->{System.out.println("ReceiveLogsTopic02接收到消息:"+new String(message.getBody(),"UTF-8"));System.out.println("接收队列:"+queueName+"绑定键:"+message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}}
懒得赋值粘贴,我直接更改队列名,run parallel,并行一个线程。
package com.atcompany.rabbitmq.Topic;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/9* 生产者*/public class EmitLogTopic {public static final String exchange_name = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();Map<String,String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for(Map.Entry<String,String> stringEntry:bindingKeyMap.entrySet()){String routing_key = stringEntry.getKey();String msg = bindingKeyMap.get(routing_key);channel.basicPublish(exchange_name,routing_key,null,msg.getBytes("UTF-8"));System.out.println("生产者发送消息:"+msg);}}}//-----生产者发送生产者发送消息:是四个单词不匹配任何绑定会被丢弃生产者发送消息:不匹配任何绑定不会被任何队列接收到会被丢弃生产者发送消息:被队列 Q1Q2 接收到生产者发送消息:被队列 Q2 接收到生产者发送消息:被队列 Q1Q2 接收到生产者发送消息:被队列 Q1 接收到生产者发送消息:虽然满足两个绑定但只被队列 Q2 接收一次生产者发送消息:是四个单词但匹配 Q2
两个队列的情况:
等待接收消息…
ReceiveLogsDirect03接收到消息:被队列 Q1Q2 接收到
接收队列:C1绑定键:lazy.orange.elephant
ReceiveLogsDirect03接收到消息:被队列 Q1Q2 接收到
接收队列:C1绑定键:quick.orange.rabbit
ReceiveLogsDirect03接收到消息:被队列 Q1 接收到
接收队列:C1绑定键:quick.orange.fox
等待接收消息…
ReceiveLogsTopic02接收到消息:被队列 Q1Q2 接收到
接收队列:C2绑定键:lazy.orange.elephant
ReceiveLogsTopic02接收到消息:被队列 Q1Q2 接收到
接收队列:C2绑定键:quick.orange.rabbit
ReceiveLogsTopic02接收到消息:是四个单词但匹配 Q2
接收队列:C2绑定键:lazy.orange.male.rabbit
等待接收消息…
ReceiveLogsTopic02接收到消息:被队列 Q2 接收到
接收队列:C2绑定键:lazy.brown.fox
ReceiveLogsTopic02接收到消息:虽然满足两个绑定但只被队列 Q2 接收一次
接收队列:C2绑定键:lazy.pink.rabbit
死信队列
死信,无法被消费的消息。一般来说,producer将消息投递到broker或直接到queue中,consumer从queue中取出消息进行消费,某些时候由于特定原因导致queue中的某些消息无法被消费,若消息没有后续的处理,就变成了死信。
应用场景:保证订单业务的消息数据不丢失,需要使用Rabbitmq的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。或者:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。
来源
- 消息TTL过期
- 队列达到最大长度
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false(不能重排)

在写代码前记得删除之前案例已定义重名的交换机和队列。
package com.atcompany.rabbitmq.dead_letter;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/10* 死信队列*/public class Consumer01 {//普通交换机public static final String normal_exchange = "normal_exchange";//死信交换机public static final String dead_exchange = "dead_exchange";public static void main(String[] args) throws IOException, TimeoutException {String dead_queue = "dead_queue";Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(normal_exchange,"direct");channel.exchangeDeclare(dead_exchange,"direct");channel.queueDeclare(dead_queue,false,false,false,null);channel.queueBind(dead_queue,dead_exchange,"lisi");Map<String,Object> arguments = new HashMap<>();// arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",dead_exchange);//正常队列设置死信routing_keyarguments.put("x-dead-letter-routing-key","lisi");//指定队列最大长度// arguments.put("x-max-length",6);String normal_queue = "normal_queue";channel.queueDeclare(normal_queue,false,false,false,arguments);channel.queueBind(normal_queue,normal_exchange,"zhangsan");System.out.println("等待接收消息...");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("consumer01接受的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(normal_queue,true,deliverCallback,consumerTag->{});}}
package com.atcompany.rabbitmq.dead_letter;import com.atcompany.rabbitmq.Utils.RabbitmqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** Created by IntelliJ IDEA.* User: luna* Date: 2022/2/10* 死信队列的生产者*/public class Producer {//普通交换机public static final String normal_exchange = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(normal_exchange,"direct");//设置ttl 10000msAMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for(int i=0;i<11;i++){String msg = "info"+i;channel.basicPublish(normal_exchange,"zhangsan",properties,msg.getBytes());}}}
死信队列和正常队列一样,只不过名字是死信队列,接收死信队列的消息。
public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange";public static void main(String[] argv) throws Exception {Channel channel = RabbitmqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信队列消息........... ");DeliverCallback deliverCallback = (consumerTag, delivery) ->{String message = new String(delivery.getBody(), "UTF-8");System.out.println("Consumer02 接收死信队列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});}}//---------output等待接收死信队列消息...........Consumer02 接收死信队列的消息info0Consumer02 接收死信队列的消息info1Consumer02 接收死信队列的消息info2Consumer02 接收死信队列的消息info3Consumer02 接收死信队列的消息info4Consumer02 接收死信队列的消息info5Consumer02 接收死信队列的消息info6Consumer02 接收死信队列的消息info7Consumer02 接收死信队列的消息info8Consumer02 接收死信队列的消息info9Consumer02 接收死信队列的消息info10
演示成功。开启消费者01后关闭,开启生产者向消费者01发送数据发送失败,转向死信队列发送数据。
这种情况是 1. 超过ttl未处理而进入死信队列
- arguments.put(“x-max-length”,6); 这种情况指定队列最大长度,超过最大长度的消息会进入死信队列。此时生产者的basicProperties参数为null。而原重名队列需要删除重建。队列的Featrues属性会出现Lim标签。
演示被拒绝的消息如何进入死信队列:
DeliverCallback deliverCallback = (consumerTag,message)->{String msg = new String(message.getBody(),"UTF-8");if(msg.equals("info5"){System.out.println("consumer01拒绝的消息:"+msg);//拒绝 且不放回普通队列(falsechannel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{System.out.println("consumer01接受的消息:"+msg); }channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//开启手动应答channel.basicConsume(normal_queue,false,deliverCallback,consumerTag->{});
延迟队列
延迟队列内部有序,延时队列的元素希望在指定时间到了以后或之前取出和处理。延时队列就是用来存放需要在指定时间(生产者指定)被处理的元素的队列。延迟队列基于死信队列的ttl过期的情况。
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺在十天内没有上传过商品,则自动发送消息提醒
- 用户注册成功后三天未登录,进行短信提醒
- 用户发起退款,三天内没有得到处理则通知相关运营人员
- 预定会议后,需要在预定时间点前十分钟通知各个与会人员参加会议。
- …
队列TTL

自定义ttl的队列 消息可能不会按时过期。因为rabbitmq排队检查消息,如果第一条消息过期则丢到死信队列,如果第一条消息延时很长,而第二条消息延时时长很短,第二条消息不会优先执行。
[
](https://www.rabbitmq.com/community-plugins.html)
Rabbitmq 插件实现延迟队列
- 需要将插件放在rabbitmq的plugins文件夹下 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装插件
- 重启rabbitmq systemctl restart rabbitmq-server


带插件的延时队列解决了之前的问题,当第二条消息的delayed_time小于第一条消息的delayed_time,会解决顺序问题,先到达延时时间的先被延迟队列消费。
一种延迟队列其实就是死信队列,ttl过期后消息转到死信队列消费。另一种延迟队列就是带插件的延迟队列,解决了死信问题的顺序问题。
发布确认高级
生产环境中不明原因导致rabbitmq服务器重启,生产者消息投递失败,导致消息丢失。即当rabbitmq集群不可用时,如何处理无法投递的消息。
回退消息
仅开启生产者确认机制的情况下,交换机接收到消息后,会直接跟消息生产者发送确认消息,如果发现该消息不可路由,消息会被直接丢弃。而此时生产者不知道消息被丢弃,需要设置mandatory参数将不可达目的地的消息返回给生产者。
备份交换机
mandatory参数和回退消息可以感知无法投递的消息,有机会在生产者的消息无法被投递时发现并处理。但有时不知道如何处理这些无法路由的消息,当生产者所在服务有多台机器,手动复制日志会麻烦且易出错,设置manddatory参数会增加生产者的复杂性。
备份交换机作为交换机的备胎,当交换机接收一个不可路由消息,会转发到备份交换机,由备份交换机进行转发和处理。备份交换机的类型是Fanout,这样可以把所有消息都投递到与其绑定的队列中,在备份交换机中绑定一个队列,所有原交换机无法路由的消息都会进入该队列。还可以创建报警队列用独立的消费者来检测和报警。
当备份交换机和生产者回退(mandatory)同时使用,消息会按照备份交换机的设置来,备份交换机优先级较高。
其他知识点
幂等性
用户对于同一操作发起的一次或多次请求结果一致,不会因为多次点击而产生副作用。
消息重复消费
使用全局ID或唯一标识符如时间戳、UUID、订单消费者MQ的id来判断,每次消费消息时用该id表示消息是否消费过。
海量订单生成的业务高峰期,生产端可能会重复发送消息,消费端需要实现幂等性。①唯一ID+指纹码机制。指纹码即一些自定义规则或时间戳加别的服务产生的唯一信息码,具有唯一性。查询语句判断id是否存在数据库中,优势就是实现简单,但是高并发状态单个数据库会有写入性能瓶颈,不过可以采用分库分表提升性能。②redis的原子性,执行setnx命令,天然具有幂等性。
优先级队列
用户在设定时间内未付款需要推送消息提醒。可以使用redis存放定时轮询,当订单量增多采用rabbitmq进行改造,并设定优先级。
Map<String,Object> map = new HashMap<>();//设置队列的优先级map.put("x-max-priority",10);channel.queueDeclare(QUEUE_NAME,false,false,false,map);for(int i=0;i<11;i++){if(i==5){//设置优先级为5AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());}else{channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}}
惰性队列
尽可能将消息存入磁盘中,消费者消费到相应的消息时才会被加载到内存。设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(如消费者下线、宕机、由于维护而关闭等)致使长时间不能消费消息而堆积时,惰性队列很有必要,把消息存储在磁盘中,不会造成消息的堆积。
默认情况下生产者发送消息给RabbitMQ,队列消息尽可能存储在内存中,可以快速的将消息发送给消费者。即使是持久化的消息,被写入磁盘的同时也会在内存中驻留一份备份。当rabbitmq需要释放内存时,会将内存中的消息换页至磁盘中,这个操作会消费较长时间,也会阻塞队列的操作,进而无法接受新的消息。
惰性队列存在lazy default两种模式。lazy模式中,通过调用channel.queueDeclare()在参数中设置,也可以通过Policy的方式设置,如果一个队列同时用这两种方法设置,Policy方式具有更高的优先级。如果通过声明的方式改变已有队列的模式,只能先删除队列重新声明。
“x-queue-model”
Map<String,Object> map = new HashMap<>();map.put("x-queue-model","lazy");channel.queueDeclare(queue_name,false,false,false,map);
每发送1百万消息,每条消息占1kb,普通队列占用内存1.2GB,惰性队列占用内存1.5MB。
RabbitMQ集群
- 确保各个节点的 cookie 文件使用的是同一个值 在 node1 上执行远程操作命令 scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以 下命令) rabbitmq-server -detached
- rabbitmqctl stop_app (rabbitmqctl stop 会将Erlang 虚拟机关闭 rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)
- 查看集群状态 rabbitmqctl cluster_status
设置集群用户
- 创建账号 rabbitmqctl add_user admin 123
- 设置用户角色 rabbitmqctl set_user_tags admin administrator
- 设置用户权限 rabbitmqctl set_permissions -p “/“ admin “.“ “.“ “.*”
镜像队列
当RabbitMQ 集群中只有一个 Broker 节点,该节点的失效将导致整体服务的临时性不可用,也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但 是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一 个短暂却会产生问题的时间窗。通过 publisher confirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽 管如此,一般不希望遇到因单点故障导致的服务不可用。
一般不希望遇到因单点故障导致的服务不可用。引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中 的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。Haproxy实现负载均衡
提供高可用性、负载均衡及基于TCP/HTTP应用的代理,支持虚拟主机。实现了一种事件驱动、单一进程模型,支持非常大的并发连接数。Federation Exchange
使用Federation解决不同地区的网络延迟问题。
rabbitmq-plugins enable rabbitmq_federation
- rabbitmq-plugins enable rabbitmq_federation_management
- 每台节点单独运行。
Federation Queue
联邦队列可以在多个Broker节点之间为单个队列提供负载均衡的功能。一个联邦队列可以连接一个或多个上游队列upstream queue,并从上游队列中获取消息满足本地消费者消费消息。shovel
可靠持续的从一个Broker的队列(源端)拉取数据并转发到另一个Broker的交换器(目的端destination).。负责连接源地和目的端、负责消息的读写、负责连接失败等问题的处理。
