1.引入依赖,配置基本信息

使用RabbitMQ 1、引入amqp场景;RabbitAutoConfiguration就会自动生效 2、给容器中自动配置了 RabbitTemplate AmgpAdmin~CachingConnectionFactory RabbitMessagingTemplate; 所有的属性都是spring..rabbitmq @ConfigurationProperties(prefix “spring.rabbitmq”) public class Rabbitproperties 3、 给配置文件中配置spring,rabbitmq信息 4、@EnableRabbit:@EnabLeXxxxx;开启功能 5、监听消息:使用@RabbitListener;必须有@EnableRabbit @RabbitListener:类+方法上(监听哪些队列即可) @RabbitHandler:标在方法上(重载区分不同的消息)

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.virtual-host=/
  1. @SpringBootApplication
  2. @EnableRabbit
  3. public class RedissonDemoApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(RedissonDemoApplication.class, args);
  6. }
  7. }

2.创建交换机

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. @Slf4j
  4. public class RedissonDemoApplicationTests {
  5. @Autowired
  6. AmqpAdmin amqpAdmin;
  7. @Test
  8. public void createExchange() {
  9. //创建交换机
  10. DirectExchange directExchange = new DirectExchange("hello_java_exchange", true, false);
  11. amqpAdmin.declareExchange(directExchange);
  12. log.info("Exchange created[{}]", directExchange.getName());
  13. }
  14. @Test
  15. public void createQueue() {
  16. //创建队列
  17. Queue queue = new Queue("hello_java_queue", true, false, false);
  18. amqpAdmin.declareQueue(queue);
  19. log.info("queue created[{}]", "hello_java_exchang");
  20. }
  21. @Test
  22. public void createBinding() {
  23. //创建绑定
  24. //String destination【目的地】,
  25. //DestinationType destinationType【目的地类型】,
  26. //String exchange【交换机】
  27. //String routingKey【路由键】
  28. //Map<String,.Object>.arguments【自定义参数】)》
  29. //将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
  30. Binding binding = new Binding("hello_java_queue", Binding.DestinationType.QUEUE, "hello_java_exchange", "hello.java", null);
  31. amqpAdmin.declareBinding(binding);
  32. log.info("binding created[{}]", "hello_java_binding");
  33. }
  34. }

发送消息

  1. @Configuration
  2. public class RabbitConfig {
  3. // 序列化json
  4. @Bean
  5. public MessageConverter messageConverter(ConnectionFactory connectionFactory) {
  6. return new Jackson2JsonMessageConverter();
  7. }
  8. }
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. @Slf4j
  4. public class RedissonDemoApplicationTests {
  5. @Autowired
  6. AmqpAdmin amqpAdmin;
  7. @Autowired
  8. RabbitTemplate rabbitTemplate;
  9. @Test
  10. public void sendMessageTest() {
  11. Hello hello = new Hello();
  12. hello.setName("hello");
  13. hello.setMessage("hello world");
  14. rabbitTemplate.convertAndSend("hello_java_exchange","hello.java", hello);
  15. }
  16. @Data
  17. static class Hello {
  18. private String name;
  19. private String message;
  20. }
  21. }

接收消息

queues: 声明需要监听的所有队列 org,springframework.amgp.core.Message

参数可以写一下类型 1、Message message:原生消息详细信息。头+体 2、T<发送的消息的类型>OrderReturnReasonEntity content; 3、ChanneL channel:当前传输数据的通道 Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息 防止重复消费 场景: 1)、订单服务启动多个;同一个消息,只能有一个客户端收到 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息

1.@RabbitListener标注在方法上直接接收

@RabbitListener(queues = {“hello_java_queue”}) //此处为监听的队列
_public void _test(Message message){
System.out.println(message);
}

2.@RabbitListener搭配@RabbitHandler

@RabbitListener标注到类上 @RabbitHandler标注到类上

  1. @RestController
  2. @RabbitListener(queues = {"hello_java_queue"})
  3. public class RedissonController {
  4. /**
  5. * queues:声明需要监听的所有队列
  6. * <p>
  7. * org.springframework.amgp.core.Message
  8. * 参数可以写一下类型
  9. */
  10. @RabbitHandler
  11. public void test(Message message, hello_Test content, Channel channel) {
  12. System.out.println("接收到消息"+content);
  13. byte[] body = message.getBody();
  14. MessageProperties messageProperties = message.getMessageProperties();
  15. System.out.println("消息处理完成..." +content.getName());
  16. }
  17. @RabbitHandler
  18. public void test2(Message message, hello_Test content, Channel channel) {
  19. System.out.println("接收到的消息..." +content);
  20. }
  21. }

开启可靠投递

#开启发送端确认
_spring.rabbitmq.publisher-confirms=_true
#开启发送端消息抵达队列的确认
_spring.rabbitmq.publisher-returns=_true
#只要抵达队列,以异步发送优先回调我们这个returns-confirm
_spring.rabbitmq.template.mandatory=_true

#手动ack消息
_spring.rabbitmq.listener.simple.acknowledge-mode=_manual

  1. @Configuration
  2. public class RabbitConfig {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. /**
  6. * 序列化改为json 这个单独配置 不然会引起循环依赖
  7. */
  8. @Bean
  9. public MessageConverter messageConverter(ConnectionFactory connectionFactory) {
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. /**
  13. * 定制RabbitTemplate
  14. * 1、spring.rabbitmq.publisher-confirms=true
  15. * 2、设置确认回调
  16. */
  17. @PostConstruct //RabbitConfig对象创建完成后执行这个方法
  18. public void initRabbitTemplate() {
  19. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  20. /**
  21. *
  22. * @param correlationData 当前消息的唯一关联数据(这个是消息的为id)
  23. * @param ack 消息是否成功收到
  24. * @param cause 失败的原因
  25. */
  26. @Override
  27. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  28. System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]==>cause[" + cause + "]");
  29. }
  30. });
  31. }
  32. }
  1. @RabbitHandler
  2. public void test(Message message, hello_Test content, Channel channel) {
  3. System.out.println("接收到消息"+content);
  4. byte[] body = message.getBody();
  5. MessageProperties messageProperties = message.getMessageProperties();
  6. System.out.println("消息处理完成..." +content.getName());
  7. }