1.引入依赖,配置基本信息
使用RabbitMQ 1、引入amqp场景;RabbitAutoConfiguration就会自动生效 2、给容器中自动配置了 RabbitTemplate AmgpAdmin~CachingConnectionFactory RabbitMessagingTemplate; 所有的属性都是spring..rabbitmq @ConfigurationProperties(prefix “spring.rabbitmq”) public class Rabbitproperties 3、 给配置文件中配置spring,rabbitmq信息 4、@EnableRabbit:@EnabLeXxxxx;开启功能 5、监听消息:使用@RabbitListener;必须有@EnableRabbit @RabbitListener:类+方法上(监听哪些队列即可) @RabbitHandler:标在方法上(重载区分不同的消息)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/
@SpringBootApplication@EnableRabbitpublic class RedissonDemoApplication {public static void main(String[] args) {SpringApplication.run(RedissonDemoApplication.class, args);}}
2.创建交换机
@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class RedissonDemoApplicationTests {@AutowiredAmqpAdmin amqpAdmin;@Testpublic void createExchange() {//创建交换机DirectExchange directExchange = new DirectExchange("hello_java_exchange", true, false);amqpAdmin.declareExchange(directExchange);log.info("Exchange created[{}]", directExchange.getName());}@Testpublic void createQueue() {//创建队列Queue queue = new Queue("hello_java_queue", true, false, false);amqpAdmin.declareQueue(queue);log.info("queue created[{}]", "hello_java_exchang");}@Testpublic void createBinding() {//创建绑定//String destination【目的地】,//DestinationType destinationType【目的地类型】,//String exchange【交换机】//String routingKey【路由键】//Map<String,.Object>.arguments【自定义参数】)》//将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键Binding binding = new Binding("hello_java_queue", Binding.DestinationType.QUEUE, "hello_java_exchange", "hello.java", null);amqpAdmin.declareBinding(binding);log.info("binding created[{}]", "hello_java_binding");}}
发送消息
@Configurationpublic class RabbitConfig {// 序列化json@Beanpublic MessageConverter messageConverter(ConnectionFactory connectionFactory) {return new Jackson2JsonMessageConverter();}}
@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class RedissonDemoApplicationTests {@AutowiredAmqpAdmin amqpAdmin;@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMessageTest() {Hello hello = new Hello();hello.setName("hello");hello.setMessage("hello world");rabbitTemplate.convertAndSend("hello_java_exchange","hello.java", hello);}@Datastatic class Hello {private String name;private String message;}}
接收消息
queues: 声明需要监听的所有队列 org,springframework.amgp.core.Message
参数可以写一下类型 1、Message message:原生消息详细信息。头+体 2、T<发送的消息的类型>OrderReturnReasonEntity content; 3、ChanneL channel:当前传输数据的通道 Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息 防止重复消费 场景: 1)、订单服务启动多个;同一个消息,只能有一个客户端收到 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
1.@RabbitListener标注在方法上直接接收
@RabbitListener(queues = {“hello_java_queue”}) //此处为监听的队列
_public void _test(Message message){
System.out.println(message);
}
2.@RabbitListener搭配@RabbitHandler
@RabbitListener标注到类上 @RabbitHandler标注到类上
@RestController@RabbitListener(queues = {"hello_java_queue"})public class RedissonController {/*** queues:声明需要监听的所有队列* <p>* org.springframework.amgp.core.Message* 参数可以写一下类型*/@RabbitHandlerpublic void test(Message message, hello_Test content, Channel channel) {System.out.println("接收到消息"+content);byte[] body = message.getBody();MessageProperties messageProperties = message.getMessageProperties();System.out.println("消息处理完成..." +content.getName());}@RabbitHandlerpublic void test2(Message message, hello_Test content, Channel channel) {System.out.println("接收到的消息..." +content);}}
开启可靠投递
#开启发送端确认
_spring.rabbitmq.publisher-confirms=_true #开启发送端消息抵达队列的确认
_spring.rabbitmq.publisher-returns=_true #只要抵达队列,以异步发送优先回调我们这个returns-confirm
_spring.rabbitmq.template.mandatory=_true
#手动ack消息
_spring.rabbitmq.listener.simple.acknowledge-mode=_manual
@Configurationpublic class RabbitConfig {@AutowiredRabbitTemplate rabbitTemplate;/*** 序列化改为json 这个单独配置 不然会引起循环依赖*/@Beanpublic MessageConverter messageConverter(ConnectionFactory connectionFactory) {return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1、spring.rabbitmq.publisher-confirms=true* 2、设置确认回调*/@PostConstruct //RabbitConfig对象创建完成后执行这个方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 当前消息的唯一关联数据(这个是消息的为id)* @param ack 消息是否成功收到* @param cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]==>cause[" + cause + "]");}});}}
@RabbitHandlerpublic void test(Message message, hello_Test content, Channel channel) {System.out.println("接收到消息"+content);byte[] body = message.getBody();MessageProperties messageProperties = message.getMessageProperties();System.out.println("消息处理完成..." +content.getName());}
