1、消息发送方(发布者)

1)添加maven依赖

  1. <!-- springboot rabbitmq 使用-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2)配置RabbitMQ配置(application.properties)

  1. #RabbitMQ 服务配置,不写默认走本地ip
  2. spring.rabbitmq.host=192.168.0.3
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest

3)创建发送方法

  1. package com.example.provide.rabbitmq;
  2. import com.alibaba.fastjson.JSON;
  3. import com.example.provide.dto.UserDTO;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import java.util.List;
  10. import static org.springframework.integration.jmx.JmxHeaders.OPERATION_NAME;
  11. /*
  12. * @auth yuesf
  13. * @data 2019/11/4
  14. */
  15. @Component
  16. public class Sender {
  17. private static final Logger logger = LoggerFactory.getLogger(Sender.class);
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. public void storeInfoWindQSend(User user) {
  21. String message = JSON.toJSONString(user);
  22. logger.info("RabbitMQ: 发送消息={}", message);
  23. //指定交换机和路由的routingkey
  24. rabbitTemplate.convertAndSend("demo.direct.exchange", "demo.direct", message);
  25. logger.info("发送消息完成 message={}", message);
  26. }
  27. }

2、消息接收方(订阅者)

1)添加maven依赖

  1. <!-- springboot rabbitmq 使用-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2)配置RabbitMQ配置(application.properties)

  1. #RabbitMQ 服务配置,不写默认走本地ip
  2. spring.rabbitmq.host=192.168.0.3
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest

3)声明RabbitMQ

示例中使用的直连交换机,声明一个交换机,一个队列。交换机与队列绑定关系

  1. package com.example.consume.listener;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. /*
  10. * Rabbitmq的配置示例
  11. * @auth yuesf
  12. * @data 2019/11/4
  13. */
  14. @Configuration
  15. public class RabbitConfigDemo {
  16. /**
  17. * 示例交换机
  18. *
  19. * @return
  20. */
  21. @Bean
  22. public DirectExchange demoExchange() {
  23. return new DirectExchange("demo.direct.exchange", true, false);
  24. }
  25. /**
  26. * 示例队列
  27. *
  28. * @return
  29. */
  30. @Bean
  31. public Queue demoQueue() {
  32. return new Queue("demo.queue", true, false, false);
  33. }
  34. /**
  35. * 交换机与队列的绑定关系
  36. *
  37. * @param demoQueue
  38. * @param demoExchange
  39. * @return
  40. */
  41. @Bean
  42. public Binding bindingDemoQueue(@Qualifier("demoQueue") Queue demoQueue,
  43. @Qualifier("demoExchange") DirectExchange demoExchange) {
  44. return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo.direct");
  45. }
  46. }

4)监听方法

  1. package com.example.consume.listener;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /*
  7. * @auth yuesf
  8. * @data 2019/11/4
  9. */
  10. @Component
  11. public class RabbitDemoListener {
  12. private static final Logger logger = LoggerFactory.getLogger(RabbitDemoListener.class);
  13. @RabbitListener(queues = "demo.queue")
  14. public void goodsListenerProcess(Object message) {
  15. logger.info("接收消息 message={}", message);
  16. }
  17. }

3、手动ACK指定搭配其他注解使用方式(订阅者)

配置类:

  1. //配置工厂类
  2. @Slf4j
  3. @Configuration
  4. public class RabbitConfig {
  5. @Bean(name = "oneConnectionFactory")
  6. public ConnectionFactory oneConnectionFactory(
  7. @Value("${spring.rabbitmq.host}") String host,
  8. @Value("${spring.rabbitmq.port}") int port,
  9. @Value("${spring.rabbitmq.username}") String username,
  10. @Value("${spring.rabbitmq.password}") String password) {
  11. log.info("mq队列连接信息 host={}, port={}, username={}", host, port, username);
  12. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  13. connectionFactory.setHost(host);
  14. connectionFactory.setPort(port);
  15. connectionFactory.setUsername(username);
  16. connectionFactory.setPassword(password);
  17. connectionFactory.setPublisherConfirms(true);
  18. connectionFactory.setPublisherReturns(true);
  19. return connectionFactory;
  20. }
  21. @Bean(name = "oneFactory")
  22. public SimpleRabbitListenerContainerFactory oneFactory(
  23. SimpleRabbitListenerContainerFactoryConfigurer configurer,
  24. @Qualifier("oneConnectionFactory") ConnectionFactory oneConnectionFactory) {
  25. log.info("初始化比分 scoreFactory 实例");
  26. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  27. //手动
  28. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  29. configurer.configure(factory, oneConnectionFactory);
  30. return factory;
  31. }
  32. }

监听类:

  1. //监听类
  2. @Component
  3. public class Listener {
  4. //指定交换机、队列、路由routingKey
  5. //ignoreDeclarationExceptions已有交换机可能会有异常忽略掉,从源码上看不存在交换机和队列会自动创建
  6. @RabbitHandler
  7. @RabbitListener(bindings = @QueueBinding(
  8. value = @Queue(value = "demo.queue", durable = "true"),
  9. exchange = @Exchange(
  10. value = "demoExchange",
  11. ignoreDeclarationExceptions = "true",
  12. type = ExchangeTypes.Direct
  13. ),
  14. key = {"demo.direct"}),
  15. containerFactory = "oneFactory")
  16. public void listen(String msg, Channel channel, Message message) throws IOException {
  17. try {
  18. User user = JSON.parseObject(msg, User.class);
  19. // 确认收到消息,只确认当前消费者的一个消息收到
  20. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  21. } catch (Exception e) {
  22. if (message.getMessageProperties().getRedelivered()) {
  23. log.info("消息已经回滚过,拒绝接收消息 : {}", msg);
  24. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  25. } else {
  26. log.info("消息即将返回队列重新处理 :{}", msg);
  27. //设置消息重新回到队列处理,requeue表示是否重新回到队列,true重新入队
  28. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  29. }
  30. log.error("消息消费异常, msg={},e={}", msg,e.getStackTrace());
  31. }
  32. }
  33. }

配置文件

  1. spring:
  2. rabbitmq:
  3. host: mq.dev.qiuhui.com
  4. port: 5672
  5. username: admin
  6. password: TY111111
  7. listener:
  8. simple:
  9. #指定消息确认模式为手动确认
  10. acknowledge-mode: manual