1 搭建工程
1.1 新建Maven工程
1.2 添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
2 编写生产者
package com.xudaxian;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*
* @version 1.0
* @since 2021-02-04 10:40
*/
public class Producer {
public static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址,默认是localhost
connectionFactory.setHost("192.168.18.120");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机的名称,默认为/
connectionFactory.setVirtualHost("/xudaxian");
//连接的用户名,默认为guest
connectionFactory.setUsername("xudaxian");
//连接的密码:默认为guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明创建队列
//参数1:队列名称
//参数2:是否持久化队列
//参数3:是否独占本地连接
//参数4:是否在不使用的时候自动删除队列
//参数5:队列的其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
//定义要发送的消息
String message = "你好啊,RabbitMQ";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("已经发送消息:" + message);
//释放资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
- 执行完上述的代码之后,可以登录到RabbitMQ的管理控制台,可以发现队列已经自动被创建,而且队列中已经消息了。
3 编写消费者
package com.xudaxian;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* @version 1.0
* @since 2021-02-04 11:08
*/
public class Consumer {
public static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址,默认是localhost
connectionFactory.setHost("192.168.18.120");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机的名称,默认为/
connectionFactory.setVirtualHost("/xudaxian");
//连接的用户名,默认为guest
connectionFactory.setUsername("xudaxian");
//连接的密码:默认为guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明创建队列
//参数1:队列名称
//参数2:是否持久化队列
//参数3:是否独占本地连接
//参数4:是否在不使用的时候自动删除队列
//参数5:队列的其他参数
channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>());
//监听消息的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定
* @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息
* @throws IOException IO异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key:" + envelope.getRoutingKey());
System.out.println("交换机:" + envelope.getExchange());
System.out.println("消息id:" + envelope.getDeliveryTag());
System.out.println("接收到的消息:" + new String(body, StandardCharsets.UTF_8));
System.out.println();
System.out.println("============================================");
System.out.println();
}
};
//监听消息
//参数1:队列名称
//参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认
//参数3:监听消息的消费者
channel.basicConsume(QUEUE_NAME, true, consumer);
//不需要释放资源,应该保持一直监听消息
// channel.close();
// connection.close();
// connectionFactory.clone();
}
}
4 小结
- 在上图的模型中,有以下的概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者,消息的接受者,会一直等待消息的到来。
- Queue:消息队列,图中红色的部分。类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从其中取出消息。
- 在RabbitMQ中消费者是一定要到某个消息队列中获取消息的。