MQ 又是什么呢?是 Message Queue 的首字母缩写,也就是说 RabbitMQ 是一款开源的消息队列系统。
RabbitMQ 的主要特点在于健壮性好、易于使用、高性能、高并发、集群易扩展,以及强大的开源社区支持。反正就是很牛逼的样子。
前置条件

安装 Erlang

就像跑npm先装个node.js一样.

安装 Erlang

Erlang 下载地址如下:
https://erlang.org/download/otp_versions_tree.html

安装 RabbitMQ

Erlang 安装成功后,就可以安装 RabbitMQ 了。下载地址如下所示:
https://www.rabbitmq.com/install-windows.html
找到下图中的位置,选择红色框中的文件进行下载。
image.png
image.png

成功后,就可以将 RabbitMQ 作为 Windows 服务启动,可以从“开始”菜单管理 RabbitMQ Windows 服务。

image.png
点击「RabbitMQ Command Prompt (sbin dir)」,进入命令行,输入 rabbitmqctl.bat status 可确认 RabbitMQ 的启动状态。
image.png
可以看到 RabbitMQ 一些状态信息:

  • 进程 ID,也就是 PID 为 6224
  • 操作系统为 Windows
  • 当前的版本号为 3.9.14
  • Erlang 的配置信息
  • 命令行界面看起来不够优雅,因此我们可以输入以下命令来启用客户端管理 UI 插件:

· rabbitmq-plugins enable rabbitmq_management
· 看到以下信息就可以确认插件启用成功了。
image.png

  • 在浏览器地址栏输入 http://localhost:15672
  • 可以进入管理端界面,如下图所示:
  • image.png
  • 默认账号密码都是guest后面可以自己添加删除账户.

前置运行环境配置好之后就可以设定rabbitMQ的相关配置参数,我直接用的win端本地端口和地址了
这些参数写入application.properties就行了.
#rabbitmq相关配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port
=5672
spring.rabbitmq.username
=admin
spring.rabbitmq.password
=123456

在 Java 中使用 RabbitMQ

添加依赖.如果不使用springboot,可以添加如下依赖

com.rabbitmq
amqp-client
5.9.0

如果使用springboot可以添加下列依赖
<_dependency>
<_groupId_>_
org.springframework.boot</_groupId>
<_artifactId_>_
spring-boot-starter-amqp</_artifactId>
_

简单demo.
设定生产者类send和消费者类receive
Send代码:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import org.apache.logging.log4j.message.Message;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.concurrent.TimeoutException;
  8. public class Send {
  9. private final static String QUEUE_NAME ="LOVE";
  10. public static void main(String[] args)throws IOException,TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. try (Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel()) {
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. String message = "小巷,我喜欢你。";
  16. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  17. System.out.println(" [王二] 发送 '" + message + "'");
  18. }
  19. }
  20. }

basicPublish() 方法用于发布消息:
第一个参数为交换机(exchange),当前场景不需要,因此设置为空字符串;
第二个参数为路由关键字(routingKey),暂时使用队列名填充;
第三个参数为消息的其他参数(BasicProperties),暂时不配置;
第四个参数为消息的主体,这里为 UTF-8 格式的字节数组,可以有效地杜绝中文乱码。

生产者类有了,接下来新建消费者类receive

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. import org.joda.time.DateTime;
  5. public class Receive {
  6. private final static String QUEUE_NAME = "LOVE";
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  12. System.out.println("等待接收消息");
  13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  14. String message = new String(delivery.getBody(), "UTF-8");
  15. System.out.println(" [小巷] 接收到的消息 '" + message + "'");
  16. };
  17. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  18. }
  19. }

1.创建通道的代码和生产者差不多,只不过没有使用 try-with-resource 语句来自动关闭连接和通道,因为我们希望消费者能够一直保持连接,直到我们强制关闭它。
2. 在接收消息的时候,必须设置队列名称,通过 queueDeclare() 方法设置。
3. 由于 RabbitMQ 将会通过异步的方式向我们推送消息,因此我们需要提供了一个回调,该回调将对消息进行缓冲,直到我们做好准备接收它们为止。
· (回调:在缺少函数类型的参数的面向对象的程序语言中,例如Java,回调可以用传递抽象类或接口来模拟。回调的接收者会调用抽象类或接口的方法,这些方法由调用者提供实现。这样的对象通常是一些回调函数的集合,同时可能包含它所需要的数据。这种方法在实现某些设计模式)时比较有用,例如访问者模式观察者模式策略模式
)
basicConsume() 方法用于接收消息:
第一个参数为队列名(queue),和生产者相匹配(LOVE)。
第二个参数为 autoAck,如果为 true 的话,表明服务器要一次性交付消息。怎么理解这个概念呢?小伙伴们可以在运行消费者类 receive 类之前,先多次运行生产者类 send,向队列中发送多个消息,等到消费者类启动后,你就会看到多条消息一次性接收到了,就像下面这样。
image.png
第三个参数为 DeliverCallback,也就是消息的回调函数。
第四个参数为 CancelCallback,我暂时没搞清楚是干嘛的。
在消息发送的过程中,也可以使用 RabbitMQ 的管理面板查看到消息的走势图,如下所示。
image.png