分发机制
参考:https://rabbitmq.com/getstarted.html
一定要掌握前五种
简单模式(Simple)
导入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
定义生产者
/**
* 生产者
*
* @author zukxu
* CreateTime: 2021/5/18 0018 15:32
*/
public class Producer {
private static Logger log = LoggerFactory.getLogger(Producer.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
log.info("1. 创建连接工厂");
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
log.info("2. 创建连接 Connection");
connection = connectionFactory.newConnection("生产者-1");
// 3. 通过连接获取通道Chanel
log.info("3. 通过连接获取通道Chanel");
channel = connection.createChannel();
// 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
//声明队列
log.info("通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息");
String queueName = "队列-1";
/**
* @param 队列名称
* @param 是否持久化
* @param 是否独占队列(排他性,一般不设置为排他)
* @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
* @param 附加参数map
*
*/
channel.queueDeclare(queueName, false, false, false, null);
// 5. 准备消息
log.info("准备消息");
String msg = "Hello World";
// 6. 发送消息到queue
log.info("6. 发送消息到queue");
/**
* @param 交换机
* @param 队列名称
* @param 消息持久化
* @param 发送内容
*/
channel.basicPublish("", queueName, null, msg.getBytes());
log.info("消息发送成功:{}",msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
log.info("关闭连接");
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- Rabbitmq
定义消费者
public class Consumer {
private static Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) {
// 所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
// 需要ip:port
// 步骤
// 1. 创建连接工厂
log.info("1. 创建连接工厂");
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置相关配置项
connectionFactory.setHost("112.74.175.76");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//声明连接和通道
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接 Connection
log.info("2. 创建连接 Connection");
connection = connectionFactory.newConnection("消费者-1");
// 3. 通过连接获取通道Chanel
log.info("3. 通过连接获取通道Chanel");
channel = connection.createChannel();
// 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
//声明队列
log.info("通过通道接收消息");
String queueName = "队列-1";
log.info("参数传递队列名称:{}", queueName);
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
log.info("消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
log.error("接受消息失败:{}", s);
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
log.info("关闭连接");
// 7. 关闭通道
if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
// 8. 关闭连接
if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
查看消费过程
持久化和非持久化队列的区别就是 服务器重启和消息消费之后队列是否还存在
而且非持久化也会进行存盘,但是重启服务器之后会消失,持久化不会消失
- 生成连接
- 生成通道
- 生成队列