1.pom引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置properties或yaml文件
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3.用RabbitTemplate测试
3.1测试单播模式
@SpringBootTest
class CacheApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 测试单播发消息(点对点模式)
*/
@Test
void test(){
//这种发送写法要自己构造一个Message:定义消息体内容和消息头
//rabbitTemplate.send(exchane,routingKey,message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
Map<String,String> map=new HashMap<>();
map.put("msg","这是一个消息");
rabbitTemplate.convertAndSend("exchange.direct","jf3q.news",map);
}
/**
* 接收消息
*/
@Test
void testreceive(){
Object o = rabbitTemplate.receiveAndConvert("jf3q.news");
System.out.println(o);
}
3.2修改成json序列化方式
只需要新建一个配置类就可以了。
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
3.3测试广播模式
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Book {
public String name;
public String author;
}
/**
* 接收消息
*/
@Test
void testreceive(){
Object o = rabbitTemplate.receiveAndConvert("jf3q.emps");
System.out.println(o);
}
/**
* 测试广播
*/
@Test
void testfanout(){
rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三国演义","罗贯中"));
}
4.用@RabbitListener监听队列
需要在主程序中开启@EnableRabbit这个注解 ```java @EnableRabbit public class RabbitApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitApplication.class, args);
}
}
2. 然后就是在你要监听的方法上加 @RabbitListener(queues = "你要监听的队列")
以下是两种监听后接收信息的方法。
```java
@RabbitListener(queues = "jf3q.news")
public void receive(Book book){
System.out.println("书的信息:"+book);
}
@RabbitListener(queues = "jf3q")
public void receive2(Message message){
System.out.println(message.getBody());//获取消息体内容(字节内容对象)
System.out.println(message.getMessageProperties());//获取消息头信息
}
5.AmqpAdmin
RabbitMQ系统管理功能组件; 创建和删除queue exchange binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
void testamqp(){
//创建一个Direct的交换器
//amqpAdmin.declareExchange(new DirectExchange("amqp.exchange"));
//创建一个永久的队列
/*amqpAdmin.declareQueue(new Queue("amqp.queue",true));*/
//创建一个绑定规则
amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,"amqp.exchange","amqp.hh",null));
}