RabbitMQ整合Spring AMQP

RabbitAdmin操作以及SpringAMQP声明方式

依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.8.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>

RabbitMQConfig示例

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Bean
  4. public ConnectionFactory connectionFactory() {
  5. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  6. connectionFactory.setAddresses("139.9.62.232:30004");
  7. connectionFactory.setUsername("xinzhang");
  8. connectionFactory.setPassword("Xinzhang123");
  9. connectionFactory.setVirtualHost("xinzhang");
  10. return connectionFactory;
  11. }
  12. @Bean
  13. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  14. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  15. // 必须为true, spring容器启动时将rabbitAdmin注入
  16. rabbitAdmin.setAutoStartup(true);
  17. return rabbitAdmin;
  18. }
  19. @Bean
  20. public Exchange xztest01Exchange() {
  21. return ExchangeBuilder.topicExchange("xztest01").build();
  22. }
  23. @Bean
  24. public Exchange xztest02Exchange() {
  25. return ExchangeBuilder.topicExchange("xztest02").build();
  26. }
  27. @Bean
  28. public Queue xztest001Queue() {
  29. return QueueBuilder.durable("xztest001").build();
  30. }
  31. @Bean
  32. public Queue xztest002Queue() {
  33. return QueueBuilder.durable("xztest002").build();
  34. }
  35. @Bean
  36. public Binding xztest001QueueBindingWithTest(@Qualifier("xztest001Queue") Queue xztest001Queue,
  37. @Qualifier("xztest01Exchange") Exchange xztest01Exchange) {
  38. return BindingBuilder.bind(xztest001Queue).to(xztest01Exchange).with("test.01.#").noargs();
  39. }
  40. @Bean
  41. public Binding xztest002QueueBindingWithTest(@Qualifier("xztest002Queue") Queue xztest002Queue,
  42. @Qualifier("xztest02Exchange") Exchange xztest02Exchange) {
  43. return BindingBuilder.bind(xztest002Queue).to(xztest02Exchange).with("test.02.#").noargs();
  44. }
  45. }

RabbitAdmin相关操作

  1. @SpringBootTest(classes = ConsumerApplication.class)
  2. @RunWith(SpringRunner.class)
  3. public class ConsumerTest {
  4. @Autowired
  5. private RabbitAdmin rabbitAdmin;
  6. @Test
  7. public void testRabbitAdmin() {
  8. TopicExchange xztestExchange = new TopicExchange("xztest", true, false);
  9. rabbitAdmin.declareExchange(xztestExchange);
  10. rabbitAdmin.declareQueue(new Queue("xztest01", true, false, false));
  11. rabbitAdmin.declareBinding(new Binding("xztest01", DestinationType.QUEUE, "xztest", "test.#", null));
  12. rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("xztest02", true)).to(xztestExchange).with("test02.#"));
  13. }
  14. }

RabbitTemplate操作

配置

  1. @Bean
  2. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  3. return new RabbitTemplate(connectionFactory);
  4. }

使用示例

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @Test
  4. public void testRabbitTemplate() {
  5. MessageProperties messageProperties = new MessageProperties();
  6. Map<String, Object> headers = messageProperties.getHeaders();
  7. headers.put("desc", "test信息");
  8. Message message = new Message("hello! rabbitmq!".getBytes(), messageProperties);
  9. rabbitTemplate.convertAndSend("xztest01", "test.01.save", message, new MessagePostProcessor() {
  10. @Override
  11. public Message postProcessMessage(Message message) throws AmqpException {
  12. System.out.println("添加额外的设置");
  13. message.getMessageProperties().getHeaders().put("desc", "信息2");
  14. return message;
  15. }
  16. });
  17. rabbitTemplate.send("xztest01", "test.01.save2", message);
  18. }

SimpleMessageListenerContainer简单消息监听容器

  • 可以监听多个队列, 自动启用, 自动声明
  • 设置事务特性, 事务管理器, 事务属性, 事务容量(并发), 是否开启事务, 回滚消息等
  • 设置消费者数量, 最小最大数量, 批量消费
  • 设置消息确认和自动确认模式, 是否重回队列, 异常捕获handler函数
  • 设置消费者标签生成策略, 是否独占模式, 消费者属性等
  • 设置具体的监听器, 消息转换器等

配置示例

  1. @Bean
  2. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
  4. connectionFactory);
  5. // 监听的队列
  6. container.setQueues(xztest001Queue());
  7. // 设置并发消费者数量
  8. container.setConcurrentConsumers(3);
  9. // 设置最多的并发消费者数量
  10. container.setMaxConcurrentConsumers(5);
  11. // 重回队列
  12. container.setDefaultRequeueRejected(false);
  13. // 设置自动ack
  14. container.setAcknowledgeMode(AcknowledgeMode.AUTO);
  15. // 自定义consumerTag的生成方式
  16. container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  17. @Override
  18. public String createConsumerTag(String queueName) {
  19. return queueName + "_" + UUID.randomUUID().toString();
  20. }
  21. });
  22. // 消息监听方法
  23. container.setMessageListener(new ChannelAwareMessageListener() {
  24. @Override
  25. public void onMessage(Message message, Channel channel) throws Exception {
  26. System.out.println("接收到消息: " + new String(message.getBody()));
  27. }
  28. });
  29. return container;
  30. }

MessageListenerAdapter消息监听适配器

  • 使用自定义的MessageDelegate消息监听对象, 可以自定义消息转化器
  • 可以使队列名称与方法进行一一对应

messageContainer配置示例

  1. @Bean
  2. public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
  4. connectionFactory);
  5. // 设置消息监听适配器
  6. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  7. adapter.setDefaultListenerMethod("consumeMessage");
  8. adapter.setMessageConverter(new TextMessageConverter());
  9. // 使队列与方法名一一对应
  10. // HashMap<String, String> queueOrTagToMethodName = new HashMap<>(2);
  11. // queueOrTagToMethodName.put("xztest001","method1");
  12. // adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
  13. container.setMessageListener(adapter);
  14. ...
  15. }

MessageDelegate

  1. public class MessageDelegate {
  2. /**
  3. * MessageListenerAdapter默认的方法
  4. */
  5. // public void handleMessage(byte[] messageBody) {
  6. // System.out.println("接收到消息: " + new String(messageBody));
  7. // }
  8. /**
  9. * 自定义的方法, 有了转换器, 参数转成了String
  10. */
  11. public void consumeMessage(String message) {
  12. System.out.println("接收到消息: " + message);
  13. }
  14. }

TextMessageConverter

  1. package top.xinzhang0618.consumer.spring;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.amqp.support.converter.MessageConversionException;
  5. import org.springframework.amqp.support.converter.MessageConverter;
  6. /**
  7. * TextMessageConverter
  8. *
  9. * @author xinzhang
  10. * @author Shenzhen Greatonce Co Ltd
  11. * @version 2020/3/2
  12. */
  13. public class TextMessageConverter implements MessageConverter {
  14. /**
  15. * java对象转成message对象
  16. */
  17. @Override
  18. public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
  19. return new Message(o.toString().getBytes(), messageProperties);
  20. }
  21. /**
  22. * message对象转成java对象
  23. */
  24. @Override
  25. public Object fromMessage(Message message) throws MessageConversionException {
  26. // 此处可利用messageProperties做相关的判断
  27. // MessageProperties messageProperties = message.getMessageProperties();
  28. return new String(message.getBody());
  29. }
  30. }

MessageConverter消息转换器

可以设置多种类型的消息转换器, 代码示例如下:

  1. // 1.自定义文本转换器, 对应java参数类型: String message
  2. TextMessageConverter textMessageConverter = new TextMessageConverter();
  3. // 2.支持json格式的转换器, 对应java参数类型: Map messageBody
  4. // 必需: messageProperties.setContentType("application/json");
  5. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  6. // 3.Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper 支持java对象转换, 对应java参数类型: java类
  7. // 除contentType声明为json外, 必需: messageProperties.getHeaders().put("__TypeId__","类的全路径");
  8. DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
  9. jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
  10. //4.支持java对象多映射转换, 略
  11. //5.多种converter一起使用, 需要设置对应的contentType
  12. ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
  13. converter.addDelegate("text",textMessageConverter);
  14. converter.addDelegate("xml/text",textMessageConverter);
  15. converter.addDelegate("html/text",textMessageConverter);
  16. converter.addDelegate("text/plain",textMessageConverter);
  17. converter.addDelegate("json",jackson2JsonMessageConverter);
  18. converter.addDelegate("application/json",jackson2JsonMessageConverter);
  19. adapter.setMessageConverter(converter);