1.循环分发
使用任务队列的一个优势在于容易并行处理。如果积压了大量的工作,
我们只需要添加更多的工作者(上文中的Worker.java中的概念),这样很容易扩展。
默认情况下,RabbitMQ是轮流发送消息给下一个消费者,平均每个消费者接收到的消息数量是相等的。
这种分发消息的方式叫做循环分发。你可以试一下开3个或更多工作者的情况。
2.消息确认
其中的参数prefetchCount表示:maximum number of messages that the server will deliver其中的参数 prefetchCount表示:maximum number of messages that the server will deliver
3.消息持久化
如果关闭RabbitMQ服务或者RabbitMQ服务崩溃了,RabbitMQ就会丢掉所有的队列和消息: 除非你告诉它不要这样。
要确保RabbitMQ服务关闭或崩溃后消息不会丢失,要做两件事情:持久化队列、持久化消息。
首先,我们要确保RabbitMQ永远不会丢失我们的队列。怎么做呢?在声明队列的时候,指定durable参数为true。
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管上面的代码没有错,但是它不会按所想的那样将队列持久化:因为之前我们已经将hello这个队列设置了不持久化,RabbitMQ不允许重新定义已经存在的队列,否则就会报错。
但是,我们有一个快速的解决办法:声明另外一个队列就行了,只要不叫hello,比如task_queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
现在,我们已经确保队列不会丢失了,那么如何将消息持久化呢:将 MessageProperties的值设置为PERSISTENT_TEXT_PLAIN。
4.公平分发
循环消息分发并不是我们想要的。比如,有两个工作者,当奇数消
息(如上文中的"1..."、"3..."、"5..."、"7...")很耗时而偶数消息(
如上文中的"2."、"4."、"6."、"8.")很简单的时候,其中一个工作者
就会一直很忙而另一个工作者就会闲。然而RabbitMQ对这些一概
不知,它只是在轮流平均地发消息。
这种情况的发生是因为,RabbitMQ 只是当消息进入队列时就分发
出去,而没有查看每个工作者未返回确认信息的数量。
为了改变这种情况,我们可以使用basicQos方法,并将参数
prefetchCount设为1。这样做,工作者就会告诉RabbitMQ:不要同
时发送多个消息给我,每次只发1个,当我处理完这个消息并给你确
认信息后,你再发给我下一个消息。这时候,RabbitMQ就不会轮流
平均发送消息了,而是寻找闲着的工作者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
以下是示例代码(第一个消费者)接受的时候我给他设了休眠时间
package com.demo.task;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
//定义队列名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost( "127.0.0.1" );
//创建通信连接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel chanel = connection.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:
// 是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
chanel.queueDeclare( QUEUE_NAME, false, false, false, null );
// 第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
chanel.basicQos( 1 );
Consumer consumer = new DefaultConsumer( chanel ) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String( body, "UTF-8" );
System.out.println( " [x] Received '" + message + "'" );
try {
doWork( message );
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println( " [x] Done" );
}
}
};
// queue 所订阅的队列 autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
// callback接收到消息之后执行的回调方法
chanel.basicConsume( QUEUE_NAME, true, consumer );
// 第二种消息获取 单个消息获取采用GetResponse
// @Param队列名称 Boolean autoAck 是否自动确认
// while (true) {
// GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
// String message = new String( getResponse.getBody(), "UTF-8" );
// System.out.println( message );
// UInt64 deliveryTag, 结果是否为多条数据
// chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
// }
//
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep( 6666666 );
}
}
第二个消费者
package com.demo.task;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
//定义队列名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost( "127.0.0.1" );
//创建通信连接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel chanel = connection.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:
// 是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
chanel.queueDeclare( QUEUE_NAME, false, false, false, null );
// 第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
chanel.basicQos( 1 );
Consumer consumer = new DefaultConsumer( chanel ) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String( body, "UTF-8" );
System.out.println( " [x] Received '" + message + "'" );
try {
doWork( message );
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println( " [x] Done" );
}
}
};
// queue 所订阅的队列 autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
// callback接收到消息之后执行的回调方法
chanel.basicConsume( QUEUE_NAME, true, consumer );
// 第二种消息获取 单个消息获取采用GetResponse
// @Param队列名称 Boolean autoAck 是否自动确认
// while (true) {
// GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
// String message = new String( getResponse.getBody(), "UTF-8" );
// System.out.println( message );
// UInt64 deliveryTag, 结果是否为多条数据
// chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
// }
//
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep( 6666666 );
// for (char ch : task.toCharArray()) {
// if (ch == '.') Thread.sleep( 600000 );
// }
}
}
先看Worker的接受情况
[x] Received '1221'
----------
ConsumerMQ的接受情况
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
- 原因:在worker接受消息的时候进行休眠了,所以模拟暂忙状态,所以ConsumerMQ处于接受状态