使用 DirectProducer 作为生产者,NormalConsumer 作为消费者,消费者绑定一个队列

生产者

direct 类型交换器的生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:direct类型交换器的生产者
  9. * 案例:ZJJ_RabbitMQ_2019/11/03_13:17:17_fk6sz
  10. */
  11. public class DirectProducer {
  12. //声明交换器名字
  13. public final static String EXCHANGE_NAME = "direct_logs";
  14. public static void main(String[] args)
  15. throws IOException, TimeoutException {
  16. //创建连接、连接到RabbitMQ
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //设置下连接工厂的连接地址(使用默认端口5672)
  19. connectionFactory.setHost("zjj101");
  20. /*设置登陆用户名和密码*/
  21. connectionFactory.setUsername("guest");
  22. connectionFactory.setPassword("guest");
  23. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  24. connectionFactory.setVirtualHost("/");
  25. // connectionFactory.setHost("localhost");
  26. //创建连接
  27. Connection connection = connectionFactory.newConnection();
  28. //创建信道
  29. Channel channel = connection.createChannel();
  30. //在信道中设置交换器 ,交换器是枚举
  31. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  32. //申明队列(放在消费者中去做)
  33. //申明路由键\消息体
  34. String[] routeKeys = {"king", "mark", "james"};
  35. for (int i = 0; i < 6; i++) {
  36. String routeKey = routeKeys[i % 3];
  37. String msg = "Hello,RabbitMQ" + (i + 1);
  38. //发布消息
  39. channel.basicPublish(EXCHANGE_NAME, routeKey, null, msg.getBytes());
  40. System.out.println("Sent:" + routeKey + ":" + msg);
  41. }
  42. channel.close();
  43. connection.close();
  44. }
  45. }

消费者

普通的消费者

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 类说明:普通的消费者
  6. */
  7. public class NormalConsumer {
  8. public static void main(String[] argv)
  9. throws IOException, TimeoutException {
  10. //创建连接、连接到RabbitMQ
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. //设置下连接工厂的连接地址(使用默认端口5672)
  13. connectionFactory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. connectionFactory.setUsername("guest");
  16. connectionFactory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. connectionFactory.setVirtualHost("/");
  19. //创建连接
  20. Connection connection = connectionFactory.newConnection();
  21. //创建信道
  22. Channel channel = connection.createChannel();
  23. //在信道中设置交换器
  24. channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  25. //申明队列(放在消费者中去做)
  26. String queueName = "queue-king";
  27. channel.queueDeclare(queueName, false, false, false, null);
  28. //绑定:将队列(queuq-king)与交换器通过 路由键 绑定(king)
  29. String routeKey = "king";
  30. channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routeKey);
  31. System.out.println("等待接收消息 ......");
  32. //申明一个消费者
  33. final Consumer consumer = new DefaultConsumer(channel) {
  34. /**
  35. *
  36. * @param s
  37. * @param envelope
  38. * @param basicProperties
  39. * @param bytes 是生产者发送过来的消息
  40. * @throws IOException
  41. */
  42. @Override
  43. public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
  44. String message = new String(bytes, "UTF-8");
  45. System.out.println("收到routingKey为:[" + envelope.getRoutingKey() + "]的消息,消息内容是: " + message);
  46. }
  47. };
  48. //消息者正是开始在指定队列上消费。(queue-king)
  49. /*
  50. * 参数1: 队列名称
  51. * 参数2:自动确认参数,如果是true则是自动确认,意思是我每一次消费了,
  52. * 我需要给RabbitMQ确认一下我已经消费完了
  53. * 参数3:消费者
  54. */
  55. channel.basicConsume(queueName, true, consumer);
  56. }
  57. }

输出结果:

  1. 等待接收消息 ......
  2. 收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ1
  3. 收到routingKey为:[king]的消息,消息内容是: Hello,RabbitMQ4