这里 rabbitmq安装就省略了。
注意下载和配置 erlang 就行
简介
RabbitMQ 是基于 AMQP 协议的一种消息队列,有的时候我们称之为消息中间件。
AMQP,即(Advanced Message Queuing Protocol),高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
几种常见的消息模型
下边我们来看看 rabbitmq 的几种消息模型
创建maven项目,导入相关依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
直连模式
这种模式下 是一个点对点的模式。
- P:生产者,也就是要发送消息的一方
- C:消费者,接受消息的一方
- queue:图中红色方块区域,存放消息的队列,先进先出。生产者发送消息到队列,消费者从队列中拿消息。
这个模式下只能有一个 生产者和消费者。
无论在何种模式下
无论角色是生产者还是消费者,在发送和获取消息的时候,都要先获取一个 Connection 连接。
然后通过连接,创建一个 channel 对象,通过channel 对象来绑定一个 queue,然后进行消息的发送和获取。
所以实际上应该是这样的。
关于 channel 这个概念
我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?
试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接。
然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。
RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源
感谢链接 https://www.cnblogs.com/eleven24/p/10326718.html
代码实现
生产者端:
public class Provider {
//生产消息
public static void main(String[] args) throws IOException, TimeoutException {
//首先这里肯定要先去 rabbitmq去创建一个链接
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置 rabbitmq 的地址和端口号
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟机的用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("toor");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//因为在 rabbitmq 中我们都是通过 通道(channel)来获取和发送消息的
Channel channel = connection.createChannel();
//通道去绑定对应的消息队列。
//这个队列会在不存在的情况下,自动创建一个队列。
channel.queueDeclare("hello", //参数名
true, //队列是否持久化,持久化意味着重启后,队列仍然保存,但是队列中的数据会丢失,
// 如果没有持久化,重启后队列会跟着消失, 。
// 我们这里没必要,所以可以设置为 false,在项目中应该设为 true
false, //是否独占队列,true 意味着只能自己这个连接可用这个队列,其它连接不能
false, //在消息完成之后是否自动删除队列(注意!这里是在消费者完全断开连接之后才进行删除的)
null);//附加参数
channel.basicPublish("", //要发给交换机的名称,因为我们这里是简单模式
"hello", //要发给队列的名称
MessageProperties.PERSISTENT_TEXT_PLAIN,//传递消息额外设置,我们可以在这里设置消息的持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
"hello rabbitmq".getBytes() );//消息的具体内容,必须是byte数组
//关闭channel
channel.close();
//关闭连接
connection.close();
}
}
因为以后也要重复获取和关闭连接,我们这里索性抽取一个 util 工具类
public class RabbitMqUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
//首先这里肯定要先去 rabbitmq去创建一个链接
//设置 rabbitmq 的地址和端口号
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
//连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟机的用户名和密码
connectionFactory.setUsername("root");
connectionFactory.setPassword("toor");
}
/**
* 获取连接
* @return
*/
public static Connection getConnection() {
//获取连接对象
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//因为在 rabbitmq 中我们都是通过 通道(channel)来获取和发送消息的
return connection;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
/**
* 关闭连接
* @param channel
* @param connection
* @return
*/
public static boolean close(Channel channel, Connection connection) {
try {
//关闭channel
if (channel != null) channel.close();
//关闭连接
if (connection != null) connection.close();
return true;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return false;
}
}
消费者端:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume(
"hello", //消费哪个队列的消息
true,//开启消息的确认机制
new DefaultConsumer(channel){//消息消费时的回调接口
//处理消息的回调方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//前三个参数,咱们先不管,看看最后一个参数
//见名知意
String s = new String(body);
System.out.println("消费消息:::::" + s);
}
});
System.out.println("channel 准备关闭");
}
}