工作队列(Work Queues)
实现
轮询分发
生产者
public class Producer {
private static Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("生产者-1");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
//声明队列
String queueName = "q1";
/**
* @param 队列名称
* @param 是否持久化
* @param 是否独占队列(排他性,一般不设置为排他)
* @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
* @param 附加参数map
*
*/
channel.queueDeclare(queueName, true, false, false, null);
// 5. 准备消息
String msg = null;
for (int i = 1; i <= 20; i++) {
msg = "WORK:POLLING:" + i;
// 6. 发送消息到queue
/**
* @param 交换机(如果没有指定交换机,则第二个参数为队列名)
* @param 队列名称
* @param 消息持久化
* @param 发送内容
*/
channel.basicPublish("", queueName, null, msg.getBytes());
}
log.info("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者1
public class WorkPolling1 {
private static Logger log = LoggerFactory.getLogger(WorkPolling1.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("消费者-Work1");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 声明队列,如果不存在会自动创建
//同一时刻服务器只会推送一条消息给消费者
String queueName = "q1";
//5. 定义接收消息的回调
Channel callbackChannel = channel;
/**
* @param 队列名称
* @param 应答机制(自动应答可能会造成死循环)
* @param 响应函数
*
*/
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
// 模拟服务器性能
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
System.out.println("work1开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者2
public class WorkPolling2 {
private static Logger log = LoggerFactory.getLogger(WorkPolling2.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("消费者-Work2");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 声明队列,如果不存在会自动创建
//同一时刻服务器只会推送一条消息给消费者
String queueName = "q1";
//5. 定义接收消息的回调
Channel callbackChannel = channel;
/**
* @param 队列名称
* @param 应答机制(自动应答可能会造成死循环)
* @param 响应函数
*
*/
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("work2 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
// 模拟服务器性能
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
System.out.println("work2开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
结果
先启动work1 work2 再进行消息投递
查看消费的消息规律
轮询分发机制对于多个消费者而言是进行轮流消费,不会因为服务器资源有限而产生不对等性
但是应答机制需要配置为自动应答才会生效,但是实际使用过程中不会这样进行配置
公平分发
公平分发区别于轮询分发的区别点在于,没有自动应答机制,需要设置手动应答和指定指标
//制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
//根据服务器进行设置 最初可以保持默认值
callbackChannel.basicQos(1);
//设置为false
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
// 模拟服务器性能
try {
Thread.sleep(1000);
//手动应答
callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
生产者
public class Producer {
private static Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("生产者-1");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
//声明队列
String queueName = "q1";
/**
* @param 队列名称
* @param 是否持久化
* @param 是否独占队列(排他性,一般不设置为排他)
* @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
* @param 附加参数map
*
*/
channel.queueDeclare(queueName, true, false, false, null);
// 5. 准备消息
String msg = null;
for (int i = 1; i <= 20; i++) {
msg = "WORK:POLLING:" + i;
// 6. 发送消息到queue
/**
* @param 交换机(如果没有指定交换机,则第二个参数为队列名)
* @param 队列名称
* @param 消息持久化
* @param 发送内容
*/
channel.basicPublish("", queueName, null, msg.getBytes());
}
log.info("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者1
public class WorkFair1 {
private static Logger log = LoggerFactory.getLogger(WorkFair1.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("消费者-Work1");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 声明队列,如果不存在会自动创建
//同一时刻服务器只会推送一条消息给消费者
String queueName = "q1";
//5. 定义接收消息的回调
Channel callbackChannel = channel;
//制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
callbackChannel.basicQos(1);
/**
* @param 队列名称
* @param 应答机制(自动应答可能会造成死循环)
* @param 响应函数
*
*/
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
// 模拟服务器性能
try {
Thread.sleep(1000);
//手动应答
callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
System.out.println("work1开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者2
public class WorkFair2 {
private static Logger log = LoggerFactory.getLogger(WorkFair2.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
connection = connectionFactory.newConnection("消费者-Work2");
// 3. 通过连接获取通道Chanel
channel = connection.createChannel();
// 4. 声明队列,如果不存在会自动创建
//同一时刻服务器只会推送一条消息给消费者
String queueName = "q1";
//5. 定义接收消息的回调
Channel callbackChannel = channel;
//制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
callbackChannel.basicQos(1);
/**
* @param 队列名称
* @param 应答机制(自动应答可能会造成死循环)
* @param 响应函数
*
*/
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("work2 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
// 模拟服务器性能
try {
Thread.sleep(200);
//手动应答
callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
System.out.println("work2开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}