一、Work queues 工作队列模式

1、模式说明

image.png
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况下使用工作队列可以提高任务处理速度。
多个消费者对提供者 提供消息是轮询的进行消费【你一个我一个】

2、代码实现

work queues 与入门程序的简单模式的代码是一样得,可以完全复制,并多复制一个消费者进行消费同时对消费消息的测试

2.1 生产端

  1. public class ProviderWorkQueues {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. // 1.获取连接工厂
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. // 2.设置参数
  6. // 设置地址,默认localhost
  7. connectionFactory.setHost("192.168.163.10");
  8. // 设置端口 默认5672
  9. connectionFactory.setPort(5672);
  10. // 设置用户名 默认guest
  11. connectionFactory.setUsername("admin");
  12. // 设置密码 默认 guest
  13. connectionFactory.setPassword("admin123");
  14. // 3.创建连接
  15. Connection connection = connectionFactory.newConnection();
  16. // 4.创建channel
  17. Channel channel = connection.createChannel();
  18. // 修改的代码------------------------------------------------------------------------------------
  19. // 5.创建队列 Queue
  20. /*
  21. queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  22. 参数说明:
  23. 1. queue: 队列名称
  24. 2. durable:是否持久化,当mq重启之后,他还在
  25. 3. exclusive: 通常设置为false
  26. - 是否独占,只能有一个消费者来监听队列
  27. - 当connection关闭时 是否删除队列
  28. 4. autoDelete:是否自动删除,当没有consumer时,自动删除掉
  29. 5. arguments:参数信息
  30. */
  31. // 如果没有一个名字叫 hello_world的队列,则会创建,如果有则不会创建
  32. channel.queueDeclare("work_queues", false, false, false, null);
  33. // 6.发送消息
  34. /*
  35. basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  36. 基础出版() 参数说明:
  37. exchange:交换机的名称,简单模式下交换机会使用默认的,使用""设置为默认
  38. routingKey:路由名称
  39. props:参数信息
  40. body:发送的消息信息
  41. */
  42. // 发送一百条消息
  43. for (int i = 1; i <= 100; i++) {
  44. String body = i + "Hello World ...";
  45. channel.basicPublish("","work_queues",null,body.getBytes());
  46. }
  47. //--------------------------------------------------------------------------------------------------
  48. // 7.释放资源
  49. channel.close();
  50. connection.close();
  51. }
  52. }

2.2 消费端

复制两份

  1. public class ConsumerWorkQueues {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. // 1.创建连接工厂
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. // 2.设置参数
  6. connectionFactory.setPassword("admin123");
  7. connectionFactory.setUsername("admin");
  8. connectionFactory.setHost("192.168.163.10");
  9. connectionFactory.setPort(5672);
  10. // 3.获取连接
  11. Connection connection = connectionFactory.newConnection();
  12. // 4.创建channel
  13. Channel channel = connection.createChannel();
  14. // 5.接收消息
  15. /*
  16. basicConsume(String queue, boolean autoAck, Consumer callback)
  17. 方法参数说明:
  18. 1. queue:队列名称
  19. 2. autoAck: 是否自动确认
  20. 3. callback: 回调对象
  21. */
  22. // 创建回调对象,参数是 channel
  23. DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
  24. /**
  25. * 回调方法 当收到消息后 会自动执行该方法
  26. *
  27. * @param consumerTag 消息标识 标签
  28. * @param envelope 获取一些信息,交换机 路由key ...
  29. * @param properties 配置属性
  30. * @param body 真实数据
  31. * @throws IOException ioexception
  32. */
  33. @Override
  34. public void handleDelivery(String consumerTag, Envelope envelope,
  35. AMQP.BasicProperties properties, byte[] body)
  36. throws IOException {
  37. super.handleDelivery(consumerTag, envelope, properties, body);
  38. /* System.out.println("consumerTag: " +consumerTag);
  39. System.out.println("Exchange: " + envelope.getExchange());
  40. System.out.println("RoutingKey: " + envelope.getRoutingKey());
  41. System.out.println("properties: " + properties); */
  42. System.out.println("body: " + new String(body));
  43. }
  44. };
  45. channel.basicConsume("work_queues",true,defaultConsumer);
  46. //消费者不要关闭资源,要保持一直监听
  47. }
  48. }

2.3 测试结果

轮询效果

image.png
image.png

3、小结

  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争关系。

  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

  3. 一条消息,只能被一个消费者所消费

二、Pub/Sub 订阅模式

1、模式说明

image.png

x 就是交换机

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接收者,会一直等待消息到来
  • Queue:消息队列,接收消息、缓存消息
  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

2、代码实现

2.1 步骤:

  1. 创建连接工厂
  2. 设置参数
  3. 创建连接Connection
  4. 创建Channel
  5. 创建交换机
  6. 创建队列
  7. 绑定队列和交换机
  8. 发送消息
  9. 释放资源