DirectProducer: direct 类型交换器的生产者
NormalConsumer: 普通的消费者
MultiBindConsumer: 队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定
对比:单个绑定的消费者只能收到指定的消息,多重绑定的的消费者可以收到所有的消息

生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:direct类型交换器的生产者
  9. */
  10. public class DirectProducer {
  11. //声明交换器名字
  12. public final static String EXCHANGE_NAME = "direct_logs";
  13. public static void main(String[] args)
  14. throws IOException, TimeoutException {
  15. //创建连接、连接到RabbitMQ
  16. ConnectionFactory connectionFactory = new ConnectionFactory();
  17. //设置下连接工厂的连接地址(使用默认端口5672)
  18. connectionFactory.setHost("zjj101");
  19. /*设置登陆用户名和密码*/
  20. connectionFactory.setUsername("guest");
  21. connectionFactory.setPassword("guest");
  22. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  23. connectionFactory.setVirtualHost("/");
  24. // connectionFactory.setHost("localhost");
  25. //创建连接
  26. Connection connection = connectionFactory.newConnection();
  27. //创建信道
  28. Channel channel = connection.createChannel();
  29. //在信道中设置交换器 ,交换器是枚举
  30. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  31. //申明队列(放在消费者中去做)
  32. //申明路由键\消息体
  33. String[] routeKeys = {"king", "mark", "james"};
  34. for (int i = 0; i < 6; i++) {
  35. String routeKey = routeKeys[i % 3];
  36. String msg = "Hello,RabbitMQ" + (i + 1);
  37. //发布消息
  38. channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());
  39. System.out.println("Sent:" + routeKey + ":" + msg);
  40. }
  41. channel.close();
  42. connection.close();
  43. }
  44. }

消费者1

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:队列和交换器的多重绑定
  6. * 多个路由键的绑定
  7. * 案例:ZJJ_RabbitMQ_2019/11/03_13:17:45_y9y2f
  8. */
  9. public class MultiBindConsumer {
  10. public static void main(String[] argv) throws IOException, TimeoutException {
  11. //连接工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //连接rabbitMq的地址
  14. factory.setHost("zjj101");
  15. /*设置登陆用户名和密码*/
  16. factory.setUsername("guest");
  17. factory.setPassword("guest");
  18. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  19. factory.setVirtualHost("/");
  20. // 打开连接和创建频道,与发送端一样
  21. Connection connection = factory.newConnection();
  22. //创建一个信道
  23. final Channel channel = connection.createChannel();
  24. //信道设置交换器类型(direct)
  25. channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  26. //声明一个随机队列,每一次生成的队列都是不同的.
  27. String queueName = channel.queueDeclare().getQueue();
  28. /*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定
  29. * 做法就是for循环执行多次方法
  30. * */
  31. String[] routekeys = {"king", "mark", "james"};
  32. for (String routekey : routekeys) {
  33. channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME,
  34. routekey);
  35. }
  36. System.out.println(" [*] 等待消息:");
  37. // 创建队列消费者
  38. final Consumer consumerA = new DefaultConsumer(channel) {
  39. @Override
  40. public void handleDelivery(String consumerTag,
  41. Envelope envelope,
  42. AMQP.BasicProperties
  43. properties,
  44. byte[] body)
  45. throws IOException {
  46. String message = new String(body, "UTF-8");
  47. System.out.println(" 接收到routingKey为 "
  48. + envelope.getRoutingKey() + ":'" + message
  49. + "'");
  50. }
  51. };
  52. channel.basicConsume(queueName, true, consumerA);
  53. }
  54. }

输出:

  1. [*] 等待消息:
  2. 接收到routingKey king:'Hello,RabbitMQ1'
  3. 接收到routingKey mark:'Hello,RabbitMQ2'
  4. 接收到routingKey james:'Hello,RabbitMQ3'
  5. 接收到routingKey king:'Hello,RabbitMQ4'
  6. 接收到routingKey mark:'Hello,RabbitMQ5'
  7. 接收到routingKey james:'Hello,RabbitMQ6'

消费者2

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:普通的消费者
  6. */
  7. public class NormalConsumer {
  8. public static void main(String[] argv)
  9. throws IOException, TimeoutException {
  10. //创建连接、连接到RabbitMQ
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. //设置下连接工厂的连接地址(使用默认端口5672)
  13. connectionFactory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. connectionFactory.setUsername("guest");
  16. connectionFactory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. connectionFactory.setVirtualHost("/");
  19. //创建连接
  20. Connection connection = connectionFactory.newConnection();
  21. //创建信道
  22. Channel channel = connection.createChannel();
  23. //在信道中设置交换器
  24. channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  25. //申明队列(放在消费者中去做)
  26. String queueName = "queue-king";
  27. channel.queueDeclare(queueName, false, false, false, null);
  28. //绑定:将队列(queuq-king)与交换器通过 路由键 绑定(king)
  29. String routeKey = "king";
  30. channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routeKey);
  31. System.out.println("等待接收消息 ......");
  32. //申明一个消费者
  33. final Consumer consumer = new DefaultConsumer(channel) {
  34. /**
  35. *
  36. * @param s
  37. * @param envelope
  38. * @param basicProperties
  39. * @param bytes 是生产者发送过来的消息
  40. * @throws IOException
  41. */
  42. @Override
  43. public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
  44. String message = new String(bytes, "UTF-8");
  45. System.out.println("收到routingKey为:[" + envelope.getRoutingKey() + "]的消息,消息内容是: " + message);
  46. }
  47. };
  48. //消息者正是开始在指定队列上消费。(queue-king)
  49. /*
  50. * 参数1: 队列名称
  51. * 参数2:自动确认参数,如果是true则是自动确认,意思是我每一次消费了,
  52. * 我需要给RabbitMQ确认一下我已经消费完了
  53. * 参数3:消费者
  54. */
  55. channel.basicConsume(queueName, true, consumer);
  56. }
  57. }

输出:

  1. 等待接收消息 ......
  2. 收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ1
  3. 收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ4