MultiConsumerOneQueue: 一个队列多个消费者,则会表现出消息在消费者之间的轮询发送

生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:direct类型交换器的生产者
  9. * 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz
  10. */
  11. public class DirectProducer {
  12. //声明交换器名字
  13. public final static String EXCHANGE_NAME = "direct_logs";
  14. public static void main(String[] args)
  15. throws IOException, TimeoutException {
  16. //创建连接、连接到RabbitMQ
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //设置下连接工厂的连接地址(使用默认端口5672)
  19. connectionFactory.setHost("zjj101");
  20. /*设置登陆用户名和密码*/
  21. connectionFactory.setUsername("guest");
  22. connectionFactory.setPassword("guest");
  23. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  24. connectionFactory.setVirtualHost("/");
  25. // connectionFactory.setHost("localhost");
  26. //创建连接
  27. Connection connection = connectionFactory.newConnection();
  28. //创建信道
  29. Channel channel = connection.createChannel();
  30. //在信道中设置交换器 ,交换器是枚举
  31. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  32. //申明队列(放在消费者中去做)
  33. //申明路由键\消息体
  34. String[] routeKeys = {"king", "mark", "james"};
  35. for (int i = 0; i < 6; i++) {
  36. String routeKey = routeKeys[i % 3];
  37. String msg = "Hello,RabbitMQ" + (i + 1);
  38. //发布消息
  39. channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());
  40. System.out.println("Sent:" + routeKey + ":" + msg);
  41. }
  42. channel.close();
  43. connection.close();
  44. }
  45. }

输出:

  1. Sent:king:Hello,RabbitMQ1
  2. Sent:mark:Hello,RabbitMQ2
  3. Sent:james:Hello,RabbitMQ3
  4. Sent:king:Hello,RabbitMQ4
  5. Sent:mark:Hello,RabbitMQ5
  6. Sent:james:Hello,RabbitMQ6

消费者

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:一个队列绑定多个消费者,则会表现出消息在消费者之间的轮询发送。
  6. * 案例:ZJJ_RabbitMQ_2019/11/03_13:19:31_krcss
  7. */
  8. public class MultiConsumerOneQueue {
  9. private static class ConsumerWorker implements Runnable {
  10. final Connection connection;
  11. final String queueName;
  12. public ConsumerWorker(Connection connection, String queueName) {
  13. this.connection = connection;
  14. this.queueName = queueName;
  15. }
  16. public void run() {
  17. try {
  18. /*创建一个信道,意味着每个线程单独一个信道*/
  19. final Channel channel = connection.createChannel();
  20. //信道设置交换器类型(direct)
  21. channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  22. /*声明一个队列,rabbitmq,如果队列已存在,不会重复创建*/
  23. channel.queueDeclare(queueName, false, false, false, null);
  24. //消费者名字,打印输出用
  25. final String consumerName = Thread.currentThread().getName();
  26. /*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
  27. String[] routekeys = {"king", "mark", "james"};
  28. for (String routekey : routekeys) {
  29. channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,
  30. routekey);
  31. }
  32. System.out.println(" [" + consumerName + "] Waiting for messages:");
  33. // 创建队列消费者
  34. final Consumer consumerA = new DefaultConsumer(channel) {
  35. @Override
  36. public void handleDelivery(String consumerTag,
  37. Envelope envelope,
  38. AMQP.BasicProperties
  39. properties,
  40. byte[] body)
  41. throws IOException {
  42. String message =
  43. new String(body, "UTF-8");
  44. System.out.println(consumerName
  45. + " Received " + envelope.getRoutingKey()
  46. + ":'" + message + "'");
  47. }
  48. };
  49. channel.basicConsume(queueName, true, consumerA);
  50. } catch (Exception e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }
  55. public static void main(String[] argv) throws IOException,
  56. InterruptedException, TimeoutException {
  57. //连接工厂
  58. ConnectionFactory factory = new ConnectionFactory();
  59. //连接rabbitMq的地址
  60. factory.setHost("zjj101");
  61. /*设置登陆用户名和密码*/
  62. factory.setUsername("guest");
  63. factory.setPassword("guest");
  64. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  65. factory.setVirtualHost("/");
  66. // 打开连接和创建频道,与发送端一样
  67. Connection connection = factory.newConnection();
  68. //3个线程,线程之间共享队列,一个队列多个消费者
  69. String queueName = "focusAll";
  70. for (int i = 0; i < 3; i++) {
  71. /*将队列名作为参数,传递给每个线程*/
  72. Thread worker = new Thread(new ConsumerWorker(connection, queueName));
  73. worker.start();
  74. }
  75. }
  76. }

输出:

  1. [Thread-2] Waiting for messages:
  2. [Thread-1] Waiting for messages:
  3. [Thread-0] Waiting for messages:
  4. Thread-2 Received king:'Hello,RabbitMQ1'
  5. Thread-2 Received king:'Hello,RabbitMQ4'
  6. Thread-0 Received mark:'Hello,RabbitMQ2'
  7. Thread-0 Received mark:'Hello,RabbitMQ5'
  8. Thread-1 Received james:'Hello,RabbitMQ3'
  9. Thread-1 Received james:'Hello,RabbitMQ6'