测试目的

用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。在下图中,“ P”是我们的生产者“ C”是我们的消费者中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区
03 HelloWorld - 图1

pom依赖

  1. <!--指定 jdk 编译版本-->
  2. <build>
  3. <plugins>
  4. <plugin>
  5. <groupId>org.apache.maven.plugins</groupId>
  6. <artifactId>maven-compiler-plugin</artifactId>
  7. <configuration>
  8. <source>8</source>
  9. <target>8</target>
  10. </configuration>
  11. </plugin>
  12. </plugins>
  13. </build>
  14. <dependencies>
  15. <!--rabbitmq 依赖客户端-->
  16. <dependency>
  17. <groupId>com.rabbitmq</groupId>
  18. <artifactId>amqp-client</artifactId>
  19. <version>5.8.0</version>
  20. </dependency>
  21. <!--操作文件流的一个依赖-->
  22. <dependency>
  23. <groupId>commons-io</groupId>
  24. <artifactId>commons-io</artifactId>
  25. <version>2.6</version>
  26. </dependency>
  27. </dependencies>

消息生产者

  1. public class Producer {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. // 创建一个连接工厂
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("192.168.109.141");
  7. factory.setUsername("admin");
  8. factory.setPassword("123");
  9. // channel 实现了自动 close 接口 自动关闭 不需要显示关闭
  10. try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
  11. /*
  12. * 生成一个队列
  13. * 1.队列名称
  14. * 2.队列里面的消息是否持久化 默认消息存储在内存中
  15. * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  16. * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  17. * 5.其他参数
  18. */
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. String message = "hello world";
  21. /*
  22. * 发送一个消息
  23. * 1.发送到那个交换机
  24. * 2.路由的 key 是哪个
  25. * 3.其他的参数信息
  26. * 4.发送消息的消息体
  27. */
  28. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  29. System.out.println("消息发送完毕");
  30. }
  31. }
  32. }

消息消费者

  1. public class Consumer {
  2. private final static String QUEUE_NAME = "hello";
  3. public static void main(String[] args) throws Exception {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("192.168.109.141");
  6. factory.setUsername("admin");
  7. factory.setPassword("123");
  8. Connection connection = factory.newConnection();
  9. Channel channel = connection.createChannel();
  10. System.out.println("等待接收消息 ");
  11. // 推送的消息如何进行消费的接口回调
  12. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  13. String message = new String(delivery.getBody());
  14. System.out.println(message);
  15. };
  16. // 取消消费的一个回调接口 如在消费的时候队列被删除掉了
  17. CancelCallback cancelCallback = (consumerTag) -> {
  18. System.out.println("消息消费被中断");
  19. };
  20. /*
  21. 消费者消费消息
  22. 1.消费哪个队列
  23. 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  24. 3.消费者未成功消费的回调
  25. 4.消费者取消消费的回调
  26. */
  27. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  28. }
  29. }

测试

运行消费者, 然后运行生产者, 消费者可以接受到生产者的消息