1.添加依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
<!--打包jar-->
<build>
<finalName>service-common-crm</finalName>
<plugins>
<!--spring-boot-maven-plugin-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<!--解决打包出来的jar文件中没有主清单属性问题-->
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.添加RabbitUtils
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("124.71.7.200");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("azhi");
connectionFactory.setPassword("2021888");
connectionFactory.setVirtualHost("vh_test");
}
public static Connection getConnection() {
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
3.添加RabbitConstant
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
4.添加消费者并启动消费者
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者接收到的消息:" + message);
System.out.println("消息的TagId:" + envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
5.添加生产者并启动生产者
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取TCP长连接
Connection conn = RabbitUtils.getConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
String message = "azhi888";
//四个参数
//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
}
运行结果: