工作队列(Work Queues)

image.png

实现

轮询分发

生产者

  1. public class Producer {
  2. private static Logger log = LoggerFactory.getLogger(Producer.class);
  3. public static void main(String[] args) {
  4. // 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
  5. // 需要ip:port
  6. // 步骤
  7. // 1. 创建连接工厂
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. //配置相关配置项
  10. connectionFactory.setHost("112.74.175.76");
  11. connectionFactory.setPort(5672);
  12. connectionFactory.setUsername("admin");
  13. connectionFactory.setPassword("admin");
  14. connectionFactory.setVirtualHost("/");
  15. //声明连接和通道
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. // 2. 创建连接 Connection
  20. connection = connectionFactory.newConnection("生产者-1");
  21. // 3. 通过连接获取通道Chanel
  22. channel = connection.createChannel();
  23. // 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
  24. //声明队列
  25. String queueName = "q1";
  26. /**
  27. * @param 队列名称
  28. * @param 是否持久化
  29. * @param 是否独占队列(排他性,一般不设置为排他)
  30. * @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
  31. * @param 附加参数map
  32. *
  33. */
  34. channel.queueDeclare(queueName, true, false, false, null);
  35. // 5. 准备消息
  36. String msg = null;
  37. for (int i = 1; i <= 20; i++) {
  38. msg = "WORK:POLLING:" + i;
  39. // 6. 发送消息到queue
  40. /**
  41. * @param 交换机(如果没有指定交换机,则第二个参数为队列名)
  42. * @param 队列名称
  43. * @param 消息持久化
  44. * @param 发送内容
  45. */
  46. channel.basicPublish("", queueName, null, msg.getBytes());
  47. }
  48. log.info("消息发送成功");
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. } finally {
  52. // 7. 关闭通道
  53. if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
  54. try {
  55. channel.close();
  56. } catch (IOException | TimeoutException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. // 8. 关闭连接
  61. if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
  62. try {
  63. connection.close();
  64. } catch (IOException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }
  69. }
  70. }

消费者1

public class WorkPolling1 {
    private static Logger log = LoggerFactory.getLogger(WorkPolling1.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            connection = connectionFactory.newConnection("消费者-Work1");

            //    3. 通过连接获取通道Chanel
            channel = connection.createChannel();

            //    4. 声明队列,如果不存在会自动创建
            //同一时刻服务器只会推送一条消息给消费者
            String queueName = "q1";

            //5. 定义接收消息的回调
            Channel callbackChannel = channel;            
            /**
             * @param 队列名称
             * @param 应答机制(自动应答可能会造成死循环)
             * @param 响应函数
             *
             */
            callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                    //    模拟服务器性能
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
            });

            System.out.println("work1开始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者2

public class WorkPolling2 {
    private static Logger log = LoggerFactory.getLogger(WorkPolling2.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            connection = connectionFactory.newConnection("消费者-Work2");

            //    3. 通过连接获取通道Chanel
            channel = connection.createChannel();

            //    4. 声明队列,如果不存在会自动创建
            //同一时刻服务器只会推送一条消息给消费者
            String queueName = "q1";

            //5. 定义接收消息的回调
            Channel callbackChannel = channel;            
            /**
             * @param 队列名称
             * @param 应答机制(自动应答可能会造成死循环)
             * @param 响应函数
             *
             */
            callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("work2 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                   //    模拟服务器性能
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
            });

            System.out.println("work2开始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

结果

先启动work1 work2 再进行消息投递
查看消费的消息规律
image.png
image.png

轮询分发机制对于多个消费者而言是进行轮流消费,不会因为服务器资源有限而产生不对等性
但是应答机制需要配置为自动应答才会生效,但是实际使用过程中不会这样进行配置

公平分发

公平分发区别于轮询分发的区别点在于,没有自动应答机制,需要设置手动应答和指定指标

 //制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
//根据服务器进行设置 最初可以保持默认值
callbackChannel.basicQos(1);

//设置为false
callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                    //    模拟服务器性能
                    try {
                        Thread.sleep(1000);
                        //手动应答
                        callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
});

生产者

public class Producer {
    private static Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            connection = connectionFactory.newConnection("生产者-1");

            //    3. 通过连接获取通道Chanel
            channel = connection.createChannel();

            //    4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
            //声明队列
            String queueName = "q1";
            /**
             * @param 队列名称
             * @param 是否持久化
             * @param 是否独占队列(排他性,一般不设置为排他)
             * @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
             * @param 附加参数map
             *
             */
            channel.queueDeclare(queueName, true, false, false, null);

            //    5. 准备消息

            String msg = null;
            for (int i = 1; i <= 20; i++) {
                msg = "WORK:POLLING:" + i;

                //    6. 发送消息到queue
                /**
                 * @param 交换机(如果没有指定交换机,则第二个参数为队列名)
                 * @param 队列名称
                 * @param 消息持久化
                 * @param 发送内容
                 */
                channel.basicPublish("", queueName, null, msg.getBytes());
            }
            log.info("消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者1

public class WorkFair1 {
    private static Logger log = LoggerFactory.getLogger(WorkFair1.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            connection = connectionFactory.newConnection("消费者-Work1");

            //    3. 通过连接获取通道Chanel
            channel = connection.createChannel();

            //    4. 声明队列,如果不存在会自动创建
            //同一时刻服务器只会推送一条消息给消费者
            String queueName = "q1";

            //5. 定义接收消息的回调
            Channel callbackChannel = channel;
            //制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
            callbackChannel.basicQos(1);
            /**
             * @param 队列名称
             * @param 应答机制(自动应答可能会造成死循环)
             * @param 响应函数
             *
             */
            callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("work1 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                    //    模拟服务器性能
                    try {
                        Thread.sleep(1000);
                        //手动应答
                        callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
            });

            System.out.println("work1开始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者2

public class WorkFair2 {
    private static Logger log = LoggerFactory.getLogger(WorkFair2.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            connection = connectionFactory.newConnection("消费者-Work2");

            //    3. 通过连接获取通道Chanel
            channel = connection.createChannel();

            //    4. 声明队列,如果不存在会自动创建
            //同一时刻服务器只会推送一条消息给消费者
            String queueName = "q1";

            //5. 定义接收消息的回调
            Channel callbackChannel = channel;
            //制定qos指标默认为null,即为轮询分发: qos=1 一次性取出多少条数据进行消费
            callbackChannel.basicQos(1);
            /**
             * @param 队列名称
             * @param 应答机制(自动应答可能会造成死循环)
             * @param 响应函数
             *
             */
            callbackChannel.basicConsume(queueName, false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("work2 消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                    //    模拟服务器性能
                    try {
                        Thread.sleep(200);
                        //手动应答
                        callbackChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
            });

            System.out.println("work2开始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

结果

image.png

image.png