工作队列
工作队列又称为任务队列,其主要思想是避免立即执行资源密集型任务,而将这些任务分发给多个线程,在后台运行的工作中将分发的任务最后执行完毕。当我们有多个工作线程的时候,这些工作线程将一起处理这些任务。
轮询分发消息
在默认情况下,假设我们有三个消费者,也就是工作线程,那么这三个工作线程从队列中获取消息的能力(几率)是一样的,会依次从队列中获取消息并执行相应的任务。
抽取工具类
由于我们创建发送线程和工作线程,都需要获取RabbitMQ
的连接,而这一部分是重复的
因此我们可以将这一部分抽取出来称为公共的工具部分
public class MQUtil {
public static Channel getChannel() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 获取连接
Connection connection = connectionFactory.newConnection();
// 获取发送信息的通道
return connection.createChannel();
}
}
启动两个工作线程
我们可以将某个代码重复运行两次来实现启动两个工作线程,但个人感觉不太优雅(
所以采用多线程的方式来启动两个工作线程
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.two
* @datetime 2022/9/16 星期五
*/
public class Worker1 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = MQUtil.getChannel();
for(int i = 1;i <= 3;i++) {
WorkThread workThread = new WorkThread(i + "", channel);
workThread.start();
}
}
}
class WorkThread extends Thread {
public static final String QUEUE_NAME = "hello";
public String name;
public Channel channel;
public WorkThread(String name, Channel channel) {
this.name = name;
this.channel = channel;
}
@Override
public void run() {
System.out.println("工作线程" + name + "等待接受消息...");
try {
channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {
System.out.println("工作线程" + consumerTag + "接收到消息: " + new String(message.getBody()));
}, consumerTag -> {
System.out.println("工作线程:" + consumerTag + "取消消费的逻辑");
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动一个发送线程
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.two
* @datetime 2022/9/16 星期五
*/
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = MQUtil.getChannel();
/**
* 1. 队列名称
* 2. 是否持久化
* 3. 是否独占队列
* 4. 是否在消费完成后自动删除队列
* 5. 队列其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i = 1;i <= 20;i++) {
String message = "hello rabbitmq" + i;
/**
* 1. 交换机名称
* 2. 队列名称
* 3. 消息其他属性
* 4. 消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者发送消息:" + message);
// Thread.sleep(1500);
}
}
}
结果
我们一共启动了三个工作线程和一个发送线程,三个工作线程会依次获取到消息并打印在控制台中
消息应答
概念
同计算机网络中数据的传输一样,在消息队列中,我们同样需要对收到的数据进行ACK确认,从而告诉消息队列:消息已经被正常处理掉了。
消费者完成一个任务可能需要一段时间。如果其中的一个消费者处理一个长的任务时,中途因为某种原因宕机。假设RabbitMQ一旦向消费者传递一条信息后就将其标记为删除,那么在这种情况下,消息就会出现丢失的情况。
为了保证消息在发送过程中不会丢失,我们需要引入消息应答机制。消息应答就是:消费者在接收到消息并且处理消息之后,通知rabbitmq此条消息已经被处理,可以进行删除了。
自动应答
顾名思义,消息发送后就会自动进行 确认。根据上述分析可知,自动应答会导致消息出现丢失的情况。
- 一般在高吞吐量和数据传输安全性方面做权衡
- 自动应答模式没有对传递的消息数量进行限制
- 这种模式适用于消费者可以高效并且以某种速率能够处理这些消息的情况下使用
消息应答的方式
消息应答一般分为下面两种
- 肯定确认
- 否定确认
其中,肯定确认使用channel.basicAck()
,否定确认有两种,分别是:
channel.basicNack()
channel.basicReject()
:通知mq,此消息直接被拒绝处理了Multiple的解释
在手动应答时,我们可以选择是否批量应答并且减少网络拥堵上图中,8号消息为队头消息
消息自动重新入队
基于上述的消息应答机制,如果消费者由于某些原因失去了连接,导致没有对消息进行ACK确认,RabbitMQ会因为没有收到确认消息而了解到消息并没有被完全处理,从而对其重新排队。如果此时其他的消费者可以进行处理,就可以将其分发给另外的消费者,从而确保消息不会被丢失。
手动消息应答代码
消费者在消费消息的时候,需要设置
autoAck
为false
持久化
概念
默认情况下RabbitMQ退出或者由于某些原因崩溃时,它将忽视暂存的队列和其中的消息。
为了保证消息不会被丢失,我们需要将队列和消息都标记为持久化
如果先前某个队列不是持久化的,那么就需要先对其进行删除,在重新创建的时候再标记为持久化队列
队列实现持久化
消息实现持久化
不公平分发
RabbitMQ
默认是轮训分发,也就是每个消费者依次获取到消息并进行处理,但是当消费者消费能力存在很大的差异时,这种分发策略就不是很好
比如说我们有两个消费者在处理任务,消费者1处理任务的速度很快, 但是消费者2处理任务的速度却很慢,如果还是使用轮训分发的策略,就会导致消费者1在很大一部分时间内都是处于空闲状态。在这种情况下,我们就应该使用不公平分发的策略。
为了避免上述这种情况,我们可以设置参数channel.basicQos();
,从而使用不公平分发的策略
相当于消费者通知消息队列,我每次只能处理一条消息,多的消息你去分发给别人