DirectProducer: direct 类型交换器的生产者
MultiChannelConsumer: 一个连接下允许有多个信道
一个连接,我们可以使用多线程的方式模拟多个信道进行通讯。这样可以做到多路复用。

生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:direct类型交换器的生产者
  9. * 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz
  10. */
  11. public class DirectProducer {
  12. //声明交换器名字
  13. public final static String EXCHANGE_NAME = "direct_logs";
  14. public static void main(String[] args)
  15. throws IOException, TimeoutException {
  16. //创建连接、连接到RabbitMQ
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //设置下连接工厂的连接地址(使用默认端口5672)
  19. connectionFactory.setHost("zjj101");
  20. /*设置登陆用户名和密码*/
  21. connectionFactory.setUsername("guest");
  22. connectionFactory.setPassword("guest");
  23. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  24. connectionFactory.setVirtualHost("/");
  25. // connectionFactory.setHost("localhost");
  26. //创建连接
  27. Connection connection = connectionFactory.newConnection();
  28. //创建信道
  29. Channel channel = connection.createChannel();
  30. //在信道中设置交换器 ,交换器是枚举
  31. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  32. //申明队列(放在消费者中去做)
  33. //申明路由键\消息体
  34. String[] routeKeys = {"king", "mark", "james"};
  35. for (int i = 0; i < 6; i++) {
  36. String routeKey = routeKeys[i % 3];
  37. String msg = "Hello,RabbitMQ" + (i + 1);
  38. //发布消息
  39. channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());
  40. System.out.println("Sent:" + routeKey + ":" + msg);
  41. }
  42. channel.close();
  43. connection.close();
  44. }
  45. }

输出:

  1. Sent:king:Hello,RabbitMQ1
  2. Sent:mark:Hello,RabbitMQ2
  3. Sent:james:Hello,RabbitMQ3
  4. Sent:king:Hello,RabbitMQ4
  5. Sent:mark:Hello,RabbitMQ5
  6. Sent:james:Hello,RabbitMQ6

消费者

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:一个连接多个信道
  6. * 使用了多线程的支持
  7. * 演示消费者使用一个连接两个信道
  8. * 案例:ZJJ_RabbitMQ_2019/11/03_13:19:03_2vcj5
  9. */
  10. public class MultiChannelConsumer {
  11. private static class ConsumerWorker implements Runnable {
  12. final Connection connection;
  13. public ConsumerWorker(Connection connection) {
  14. this.connection = connection;
  15. }
  16. public void run() {
  17. try {
  18. /*创建一个信道,意味着每个线程单独一个信道*/
  19. Channel channel = connection.createChannel();
  20. //信道设置交换器类型(direct)
  21. channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  22. // 声明一个随机队列
  23. String queueName = channel.queueDeclare().getQueue();
  24. //String queueName = "queue-king"; // 同一个队列
  25. //消费者名字,打印输出用
  26. final String consumerName = Thread.currentThread().getName() + "-all";
  27. /*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
  28. String[] routekeys = {"king", "mark", "james"};
  29. for (String routekey : routekeys) {
  30. channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,
  31. routekey);
  32. }
  33. System.out.println("[" + consumerName + "] Waiting for messages:");
  34. // 创建队列消费者
  35. final Consumer consumerA = new DefaultConsumer(channel) {
  36. @Override
  37. public void handleDelivery(String consumerTag,
  38. Envelope envelope,
  39. AMQP.BasicProperties
  40. properties,
  41. byte[] body)
  42. throws IOException {
  43. String message =
  44. new String(body, "UTF-8");
  45. System.out.println(consumerName
  46. + " Received " + envelope.getRoutingKey()
  47. + ":'" + message + "'");
  48. }
  49. };
  50. channel.basicConsume(queueName, true, consumerA);
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. public static void main(String[] argv) throws IOException,
  57. InterruptedException, TimeoutException {
  58. //连接工厂
  59. ConnectionFactory factory = new ConnectionFactory();
  60. //连接rabbitMq的地址
  61. factory.setHost("zjj101");
  62. /*设置登陆用户名和密码*/
  63. factory.setUsername("guest");
  64. factory.setPassword("guest");
  65. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  66. factory.setVirtualHost("/");
  67. // 打开连接和创建频道,与发送端一样
  68. Connection connection = factory.newConnection();
  69. //一个连接多个信道
  70. for (int i = 0; i < 2; i++) {
  71. /*将连接作为参数,传递给每个线程*/
  72. Thread worker = new Thread(new ConsumerWorker(connection));
  73. worker.start();
  74. }
  75. }
  76. }

输出:

  1. [Thread-0-all] Waiting for messages:
  2. [Thread-1-all] Waiting for messages:
  3. Thread-1-all Received king:'Hello,RabbitMQ1'
  4. Thread-1-all Received mark:'Hello,RabbitMQ2'
  5. Thread-1-all Received james:'Hello,RabbitMQ3'
  6. Thread-0-all Received king:'Hello,RabbitMQ1'
  7. Thread-0-all Received mark:'Hello,RabbitMQ2'
  8. Thread-0-all Received james:'Hello,RabbitMQ3'
  9. Thread-0-all Received king:'Hello,RabbitMQ4'
  10. Thread-0-all Received mark:'Hello,RabbitMQ5'
  11. Thread-0-all Received james:'Hello,RabbitMQ6'
  12. Thread-1-all Received king:'Hello,RabbitMQ4'
  13. Thread-1-all Received mark:'Hello,RabbitMQ5'
  14. Thread-1-all Received james:'Hello,RabbitMQ6'