1 搭建工程

1.1 新建Maven工程

  • 略。

1.2 添加依赖

  • pom.xml
  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.10.0</version>
  5. </dependency>

2 编写生产者

  • Producer.java
  1. package com.xudaxian;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.HashMap;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. * 生产者
  11. *
  12. * @version 1.0
  13. * @since 2021-02-04 10:40
  14. */
  15. public class Producer {
  16. public static String QUEUE_NAME = "simple_queue";
  17. public static void main(String[] args) throws IOException, TimeoutException {
  18. //创建连接工厂
  19. ConnectionFactory connectionFactory = new ConnectionFactory();
  20. //主机地址,默认是localhost
  21. connectionFactory.setHost("192.168.18.120");
  22. //连接端口
  23. connectionFactory.setPort(5672);
  24. //虚拟主机的名称,默认为/
  25. connectionFactory.setVirtualHost("/xudaxian");
  26. //连接的用户名,默认为guest
  27. connectionFactory.setUsername("xudaxian");
  28. //连接的密码:默认为guest
  29. connectionFactory.setPassword("123456");
  30. //创建连接
  31. Connection connection = connectionFactory.newConnection();
  32. //创建信道
  33. Channel channel = connection.createChannel();
  34. //声明创建队列
  35. //参数1:队列名称
  36. //参数2:是否持久化队列
  37. //参数3:是否独占本地连接
  38. //参数4:是否在不使用的时候自动删除队列
  39. //参数5:队列的其他参数
  40. channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
  41. //定义要发送的消息
  42. String message = "你好啊,RabbitMQ";
  43. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  44. System.out.println("已经发送消息:" + message);
  45. //释放资源
  46. channel.close();
  47. connection.close();
  48. connectionFactory.clone();
  49. }
  50. }
  • 执行完上述的代码之后,可以登录到RabbitMQ的管理控制台,可以发现队列已经自动被创建,而且队列中已经消息了。

生产者创建了队列.png

查看生产者推送到队列中的消息.png

3 编写消费者

  • Consumer.java
  1. package com.xudaxian;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.nio.charset.StandardCharsets;
  5. import java.util.HashMap;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消费者
  9. *
  10. * @version 1.0
  11. * @since 2021-02-04 11:08
  12. */
  13. public class Consumer {
  14. public static String QUEUE_NAME = "simple_queue";
  15. public static void main(String[] args) throws IOException, TimeoutException {
  16. //创建连接工厂
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //主机地址,默认是localhost
  19. connectionFactory.setHost("192.168.18.120");
  20. //连接端口
  21. connectionFactory.setPort(5672);
  22. //虚拟主机的名称,默认为/
  23. connectionFactory.setVirtualHost("/xudaxian");
  24. //连接的用户名,默认为guest
  25. connectionFactory.setUsername("xudaxian");
  26. //连接的密码:默认为guest
  27. connectionFactory.setPassword("123456");
  28. //创建连接
  29. Connection connection = connectionFactory.newConnection();
  30. //创建信道
  31. Channel channel = connection.createChannel();
  32. //声明创建队列
  33. //参数1:队列名称
  34. //参数2:是否持久化队列
  35. //参数3:是否独占本地连接
  36. //参数4:是否在不使用的时候自动删除队列
  37. //参数5:队列的其他参数
  38. channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
  39. //监听消息的消费者
  40. DefaultConsumer consumer = new DefaultConsumer(channel) {
  41. /**
  42. * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
  43. * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
  44. * @param properties 消息属性
  45. * @param body 消息
  46. * @throws IOException IO异常
  47. */
  48. @Override
  49. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  50. System.out.println("路由key:" + envelope.getRoutingKey());
  51. System.out.println("交换机:" + envelope.getExchange());
  52. System.out.println("消息id:" + envelope.getDeliveryTag());
  53. System.out.println("接收到的消息:" + new String(body, StandardCharsets.UTF_8));
  54. System.out.println();
  55. System.out.println("============================================");
  56. System.out.println();
  57. }
  58. };
  59. //监听消息
  60. //参数1:队列名称
  61. //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
  62. //参数3:监听消息的消费者
  63. channel.basicConsume(QUEUE_NAME, true, consumer);
  64. //不需要释放资源,应该保持一直监听消息
  65. // channel.close();
  66. // connection.close();
  67. // connectionFactory.clone();
  68. }
  69. }

4 小结

  • 上述入门案例中使用的是如下的简单模式:

RabbitMQ入门小结.png

  • 在上图的模型中,有以下的概念:
    • P:生产者,也就是要发送消息的程序。
    • C:消费者,消息的接受者,会一直等待消息的到来。
    • Queue:消息队列,图中红色的部分。类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从其中取出消息。
  • 在RabbitMQ中消费者是一定要到某个消息队列中获取消息的。