介绍
:::tips
基础的消息队列模型包括的三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
-
使用
发布消息
@SpringBootTest
public class MyTest {
@Test
public void test() throws IOException, TimeoutException {
//构建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数:主机名、端口号、vhost、用户名、密码
factory.setHost("RabbitMQ服务的IP");
factory.setPort(RabbitMQ服务的端口号);
factory.setVirtualHost("/");
factory.setUsername("用户名");
factory.setPassword("密码");
//创建连接对象
Connection connection = factory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//发送消息
String message = "这是一条测试消息";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功:" + message);
//关闭连接
channel.close();
connection.close();
}
}
订阅消息
@SpringBootTest
public class MyTest {
public void test() throws IOException, TimeoutException {
//构建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数:主机名、端口号、vhost、用户名、密码
factory.setHost("RabbitMQ服务的IP");
factory.setPort(RabbitMQ服务的端口号);
factory.setVirtualHost("/");
factory.setUsername("用户名");
factory.setPassword("密码");
//创建连接对象
Connection connection = factory.newConnection();
//创建连接通道
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//处理消息
String message = new String(body);
System.out.println("接收到消息:" + message);
}
});
System.out.println("等待接收消息");
}
}