1 搭建工程
1.1 新建Maven工程
1.2 添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version></dependency>
2 编写生产者
package com.xudaxian;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.concurrent.TimeoutException;/** * 生产者 * * @version 1.0 * @since 2021-02-04 10:40 */public class Producer { public static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址,默认是localhost connectionFactory.setHost("192.168.18.120"); //连接端口 connectionFactory.setPort(5672); //虚拟主机的名称,默认为/ connectionFactory.setVirtualHost("/xudaxian"); //连接的用户名,默认为guest connectionFactory.setUsername("xudaxian"); //连接的密码:默认为guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //声明创建队列 //参数1:队列名称 //参数2:是否持久化队列 //参数3:是否独占本地连接 //参数4:是否在不使用的时候自动删除队列 //参数5:队列的其他参数 channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>()); //定义要发送的消息 String message = "你好啊,RabbitMQ"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("已经发送消息:" + message); //释放资源 channel.close(); connection.close(); connectionFactory.clone(); }}
- 执行完上述的代码之后,可以登录到RabbitMQ的管理控制台,可以发现队列已经自动被创建,而且队列中已经消息了。


3 编写消费者
package com.xudaxian;import com.rabbitmq.client.*;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.concurrent.TimeoutException;/** * 消费者 * * @version 1.0 * @since 2021-02-04 11:08 */public class Consumer { public static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //主机地址,默认是localhost connectionFactory.setHost("192.168.18.120"); //连接端口 connectionFactory.setPort(5672); //虚拟主机的名称,默认为/ connectionFactory.setVirtualHost("/xudaxian"); //连接的用户名,默认为guest connectionFactory.setUsername("xudaxian"); //连接的密码:默认为guest connectionFactory.setPassword("123456"); //创建连接 Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //声明创建队列 //参数1:队列名称 //参数2:是否持久化队列 //参数3:是否独占本地连接 //参数4:是否在不使用的时候自动删除队列 //参数5:队列的其他参数 channel.queueDeclare(QUEUE_NAME, true, false, false, new HashMap<>()); //监听消息的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * @param consumerTag 消费者标签,在channel.basicConsume的时候可以指定 * @param envelope 消息包内容,可以从其中获取消息的id,消息routing key,交换机、消息和重试标记(收到消息失败后是否需要重新发送) * @param properties 消息属性 * @param body 消息 * @throws IOException IO异常 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key:" + envelope.getRoutingKey()); System.out.println("交换机:" + envelope.getExchange()); System.out.println("消息id:" + envelope.getDeliveryTag()); System.out.println("接收到的消息:" + new String(body, StandardCharsets.UTF_8)); System.out.println(); System.out.println("============================================"); System.out.println(); } }; //监听消息 //参数1:队列名称 //参数2:是否自动确认,设置为true表示消息接收到消息自动向MQ回复接收到了,MQ收到消息后会删除消息,设置为false,需要手动确认 //参数3:监听消息的消费者 channel.basicConsume(QUEUE_NAME, true, consumer); //不需要释放资源,应该保持一直监听消息// channel.close();// connection.close();// connectionFactory.clone(); }}
4 小结

- 在上图的模型中,有以下的概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者,消息的接受者,会一直等待消息的到来。
- Queue:消息队列,图中红色的部分。类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从其中取出消息。
- 在RabbitMQ中消费者是一定要到某个消息队列中获取消息的。