1、实现步骤

1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程

2、构建一个maven工程

image.png

3、导入rabbitmq的maven依赖

3.1、Java原生依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.10.0</version>
  5. </dependency>

3.2、spring依赖

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-amqp</artifactId>
  4. <version>2.2.5.RELEASE</version></dependency><dependency>
  5. <groupId>org.springframework.amqp</groupId>
  6. <artifactId>spring-rabbit</artifactId>
  7. <version>2.2.5.RELEASE</version>
  8. </dependency>

3.3、springboot依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

上面根据自己的项目环境进行选择即可。

:::info 番外:rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。 :::

4、启动rabbitmq-server服务

  1. systemctl start rabbitmq-server或者docker start myrabbit

5、定义生产者

  1. package com.theory.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. public static void main(String[] args) {
  9. // 所有的中间件技术都是支持基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
  10. // IP port
  11. // 1. 创建连接工厂
  12. ConnectionFactory connectionFactory = new ConnectionFactory();
  13. // 2. 设置连接属性
  14. connectionFactory.setHost("192.168.222.111"); // ip地址
  15. connectionFactory.setPort(5672); // 端口号
  16. connectionFactory.setUsername("admin"); // 账号
  17. connectionFactory.setPassword("admin"); // 密码
  18. connectionFactory.setVirtualHost("/"); //路由地址
  19. Connection connection = null;
  20. Channel channel = null;
  21. try {
  22. // 3. 从连接工厂中获取连接 Connection
  23. connection = connectionFactory.newConnection("生产者");
  24. // 4. 通过连接获取通道Channel
  25. channel = connection.createChannel();
  26. // 5. 通过通道创建交换机,声明队列,绑定关系,如有key,发送消息,接收消息
  27. String queueName = "queue1";
  28. /**
  29. * @params1: 队列的名称,如果队列不存在,则会创建
  30. * @params2: 是否要持久化 durable=false所谓的持久化消息就是是否存盘,如果为false即非持久化,true是持久化 非持久化也会存盘,但是重启服务器会丢失
  31. * @params3: 排他性,是否是独占队列,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
  32. * @params4: 是否自动删除,随着消费者消息完毕消息以后是否把队列自动删除
  33. * @params5: 携带附属参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
  34. */
  35. channel.queueDeclare(queueName,false,false,false,null);
  36. // 6. 准备发送消息内容
  37. String message = "Hello theory";
  38. // 7. 发送消息给队列queue
  39. /**
  40. * @params1: 交换机exchange
  41. * @params2: 队列、路由key
  42. * @params3: 消息的状态控制,即属性
  43. * @params4: 消息主题,即发送消息的内容
  44. */
  45. // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定存在一个默认的交换机
  46. channel.basicPublish("",queueName,null,message.getBytes());
  47. System.out.println("消息发送成功!!!");
  48. } catch (IOException e) {
  49. e.printStackTrace();
  50. } catch (TimeoutException e) {
  51. e.printStackTrace();
  52. } finally {
  53. // 8. 关闭通道
  54. if (channel != null && channel.isOpen()) {
  55. try {
  56. channel.close();
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. } catch (TimeoutException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. // 9. 释放连接
  64. if (connection != null && connection.isOpen()) {
  65. try {
  66. connection.close();
  67. } catch (IOException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }
  72. }
  73. }

1:执行发送,这个时候可以在web控制台查看到这个队列queue的信息
RabbitMQ入门案例 - Simple 简单模式 - 图2
RabbitMQ入门案例 - Simple 简单模式 - 图3
2:我们可以进行对队列的消息进行预览和测试如下:
RabbitMQ入门案例 - Simple 简单模式 - 图4
3:进行预览和获取消息进行测试
RabbitMQ入门案例 - Simple 简单模式 - 图5

6、定义消费者

  1. package com.theory.routing;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6. public static void main(String[] args) {
  7. // 所有的中间件技术都是支持基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp
  8. // IP port
  9. // 1. 创建连接工厂,并设置参数
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. connectionFactory.setHost("192.168.222.111");
  12. connectionFactory.setPort(5672);
  13. connectionFactory.setUsername("admin");
  14. connectionFactory.setPassword("admin");
  15. connectionFactory.setVirtualHost("/");
  16. Connection connection = null;
  17. Channel channel = null;
  18. try {
  19. // 2. 创建连接 Connection
  20. connection = connectionFactory.newConnection("生产者");
  21. // 3. 通过连接获取通道Channel
  22. channel = connection.createChannel();
  23. // 4. 通过通道创建交换机,声明队列,绑定关系,如有key,发送消息,接收消息
  24. channel.basicConsume("queue1", true, new DeliverCallback() {
  25. @Override
  26. public void handle(String s, Delivery delivery) throws IOException {
  27. System.out.println("收到消息是" + new String(delivery.getBody(), "utf-8"));
  28. }
  29. }, new CancelCallback() {
  30. @Override
  31. public void handle(String s) throws IOException {
  32. System.out.println("接收失败了...");
  33. }
  34. }
  35. );
  36. System.out.println("开始接收消息");
  37. System.in.read();
  38. } catch (IOException e) {
  39. e.printStackTrace();
  40. } catch (TimeoutException e) {
  41. e.printStackTrace();
  42. } finally {
  43. // 7. 关闭通道
  44. if (channel != null && channel.isOpen()) {
  45. try {
  46. channel.close();
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. } catch (TimeoutException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. // 8. 关闭通道
  54. if (connection != null && connection.isOpen()) {
  55. try {
  56. connection.close();
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }
  62. }
  63. }

7、观察消息的在rabbitmq-server服务中的过程