1.步骤
- 创建连接工厂
- 获取连接connection
- 通过连接获取通道
- 通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
- 准备消息内容
- 发送消息给队列
- 关闭连接通道
2.生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static final String QUEUE_NAME = "LHM";
public static void main(String[] args) {
//所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp
// ip port
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("39.102.67.107");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
//2.获取连接connection
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
/**
* @param1:队列的名称
* @param2:是否要持久化 durable = false 所谓的持久化消息是否存盘,如果false,非持久化 true 是持久化
* @param3:排他性 是否是独占队列
* @param4:是否自动删除,随着最后一个消费者发送消息完毕是否把队列删除
* @param5:携带一些附属参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//5.准备消息内容
String message = "hello";
//6.发送消息给队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
//7.关闭连接通道
if (channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
3.消费者代码
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Consumer {
private static final String QUEUE_NAME = "LHM";
public static void main(String[] args) {
//所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp
// ip port
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("39.102.67.107");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
//2.获取连接connection
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {
System.out.println("收到的消息为:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
System.out.println("消息接受失败了");
});
System.out.println("开启接受消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
//7.关闭连接通道
if (channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}