1.循环分发

  1. 使用任务队列的一个优势在于容易并行处理。如果积压了大量的工作,
  2. 我们只需要添加更多的工作者(上文中的Worker.java中的概念),这样很容易扩展。
  3. 默认情况下,RabbitMQ是轮流发送消息给下一个消费者,平均每个消费者接收到的消息数量是相等的。
  4. 这种分发消息的方式叫做循环分发。你可以试一下开3个或更多工作者的情况。

2.消息确认

        其中的参数prefetchCount表示:maximum number of messages that the server will deliver其中的参数                            prefetchCount表示:maximum number of messages that the server will deliver

3.消息持久化

    如果关闭RabbitMQ服务或者RabbitMQ服务崩溃了,RabbitMQ就会丢掉所有的队列和消息: 除非你告诉它不要这样。
    要确保RabbitMQ服务关闭或崩溃后消息不会丢失,要做两件事情:持久化队列、持久化消息。
    首先,我们要确保RabbitMQ永远不会丢失我们的队列。怎么做呢?在声明队列的时候,指定durable参数为true。
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
      尽管上面的代码没有错,但是它不会按所想的那样将队列持久化:因为之前我们已经将hello这个队列设置了不持久化,RabbitMQ不允许重新定义已经存在的队列,否则就会报错。
      但是,我们有一个快速的解决办法:声明另外一个队列就行了,只要不叫hello,比如task_queue:
      boolean durable = true;
      channel.queueDeclare("task_queue", durable, false, false, null);
      现在,我们已经确保队列不会丢失了,那么如何将消息持久化呢:将    MessageProperties的值设置为PERSISTENT_TEXT_PLAIN。

4.公平分发

        循环消息分发并不是我们想要的。比如,有两个工作者,当奇数消   
        息(如上文中的"1..."、"3..."、"5..."、"7...")很耗时而偶数消息( 
        如上文中的"2."、"4."、"6."、"8.")很简单的时候,其中一个工作者 
        就会一直很忙而另一个工作者就会闲。然而RabbitMQ对这些一概 
         不知,它只是在轮流平均地发消息。
       这种情况的发生是因为,RabbitMQ 只是当消息进入队列时就分发 
       出去,而没有查看每个工作者未返回确认信息的数量。
        为了改变这种情况,我们可以使用basicQos方法,并将参数 
       prefetchCount设为1。这样做,工作者就会告诉RabbitMQ:不要同 
      时发送多个消息给我,每次只发1个,当我处理完这个消息并给你确 
       认信息后,你再发给我下一个消息。这时候,RabbitMQ就不会轮流 
      平均发送消息了,而是寻找闲着的工作者。
      int prefetchCount = 1;
      channel.basicQos(prefetchCount);

以下是示例代码(第一个消费者)接受的时候我给他设了休眠时间
package com.demo.task;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
    //定义队列名字
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost( "127.0.0.1" );
        //创建通信连接
        Connection connection = connectionFactory.newConnection();
        //建立通道
        Channel chanel = connection.createChannel();
//      声明队列【参数说明:参数一:队列名称,参数二:
//      是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
        chanel.queueDeclare( QUEUE_NAME, false, false, false, null );
//      第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
        chanel.basicQos( 1 );
        Consumer consumer = new DefaultConsumer( chanel ) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String( body, "UTF-8" );

                System.out.println( " [x] Received '" + message + "'" );
                try {
                    doWork( message );
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println( " [x] Done" );
                }
            }
        };
//      queue 所订阅的队列  autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
//      callback接收到消息之后执行的回调方法
        chanel.basicConsume( QUEUE_NAME, true, consumer );
//      第二种消息获取 单个消息获取采用GetResponse
//      @Param队列名称  Boolean autoAck 是否自动确认
//        while (true) {
//            GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
//            String message = new String( getResponse.getBody(), "UTF-8" );
//            System.out.println( message );
//            UInt64 deliveryTag,  结果是否为多条数据
//            chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
//        }
//
    }

    private static void doWork(String task) throws InterruptedException {
        Thread.sleep( 6666666 );
    }
}

第二个消费者

package com.demo.task;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
    //定义队列名字
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost( "127.0.0.1" );
        //创建通信连接
        Connection connection = connectionFactory.newConnection();
        //建立通道
        Channel chanel = connection.createChannel();
//      声明队列【参数说明:参数一:队列名称,参数二:
//      是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
        chanel.queueDeclare( QUEUE_NAME, false, false, false, null );
//      第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
        chanel.basicQos( 1 );
        Consumer consumer = new DefaultConsumer( chanel ) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String( body, "UTF-8" );

                System.out.println( " [x] Received '" + message + "'" );
                try {
                    doWork( message );
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println( " [x] Done" );
                }
            }
        };
//      queue 所订阅的队列  autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
//      callback接收到消息之后执行的回调方法
        chanel.basicConsume( QUEUE_NAME, true, consumer );
//      第二种消息获取 单个消息获取采用GetResponse
//      @Param队列名称  Boolean autoAck 是否自动确认
//        while (true) {
//            GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
//            String message = new String( getResponse.getBody(), "UTF-8" );
//            System.out.println( message );
//            UInt64 deliveryTag,  结果是否为多条数据
//            chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
//        }
//
    }

    private static void doWork(String task) throws InterruptedException {
        Thread.sleep( 6666666 );
//        for (char ch : task.toCharArray()) {
//            if (ch == '.') Thread.sleep( 600000 );
//        }
    }
}
先看Worker的接受情况
 [x] Received '1221'
----------
ConsumerMQ的接受情况
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
接收到的消息99999999
  • 原因:在worker接受消息的时候进行休眠了,所以模拟暂忙状态,所以ConsumerMQ处于接受状态