整合 spring

pom.xml

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.7.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>

bean 注入(spring 方式)

  • 设置一个 配置类

    1. @Configuration
    2. public class RabbitmqConfig {
    3. }
  • 注入 ConnectionFactory

    1. /**
    2. * Rabbitmq connectionFactory
    3. *
    4. * @return
    5. */
    6. @Bean
    7. public ConnectionFactory connectionFactory() {
    8. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    9. connectionFactory.setAddresses("127.0.0.1:5672");
    10. connectionFactory.setUsername("guest");
    11. connectionFactory.setPassword("guest");
    12. connectionFactory.setVirtualHost("/");
    13. return connectionFactory;
    14. }
  • 注入 rabbitadmin

    • 底层实现是从 spring 容器中获取 Exchange 、Binding、 RoutingKey 和 Queue 的 @Bean 声明
    • setAutoStartup 必须为 true,否则 Spring 不会加载 RabbitAdmin 类
      1. /**
      2. * 底层实现是从 spring 容器中获取 Exchange Binding RoutingKey 和 Queue 的 @Bean 声明
      3. *
      4. * @param connectionFactory
      5. * @return
      6. */
      7. @Bean
      8. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
      9. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
      10. rabbitAdmin.setAutoStartup(true);
      11. return rabbitAdmin;
      12. }
  • 注入 rabbitmqTemplate

    1. /**
    2. * rabbitmq 模板
    3. *
    4. * @param connectionFactory
    5. * @return
    6. */
    7. @Bean
    8. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    9. return new RabbitTemplate(connectionFactory);
    10. }
  • 注入 exchange、 queue、 binding ```java // 下面的这些 bean 都会被 rabbitAdmin 获取 @Bean public TopicExchange exchange1() { // String name, boolean durable, boolean autoDelete, Map arguments return new TopicExchange(“topic01”, true, false); }

@Bean public Queue queue01() { // String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments return new Queue(“queue01”, true); }

@Bean public Queue queue02() { // String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments return new Queue(“queue02”, true); }

@Bean public Binding binding01() { return BindingBuilder .bind(queue01()) // 队列 .to(exchange1()) // 交换机 .with(“routingKey01”); // 指定 routing key }

@Bean public Binding binding02() { return BindingBuilder .bind(queue01()) .to(exchange1()) .with(“routingKey02”); }

  1. <a name="E7bwd"></a>
  2. ### 简单消息监听容器
  3. ```java
  4. /**
  5. * 简单消息监听容器,为多个 queue 设置统一的 consumer 配置等等
  6. *
  7. * @param connectionFactory
  8. * @return
  9. */
  10. @Bean
  11. public SimpleMessageListenerContainer messageListenerContainer(
  12. ConnectionFactory connectionFactory) {
  13. SimpleMessageListenerContainer container =
  14. new SimpleMessageListenerContainer(connectionFactory);
  15. container.setQueues(queue01(), queue02());
  16. // 设置并行消费者属性
  17. container.setConcurrentConsumers(1);
  18. // 设置最大并行消费者数量
  19. container.setMaxConcurrentConsumers(5);
  20. // 设置重回队列
  21. container.setDefaultRequeueRejected(false);
  22. /*
  23. 设置响应 ack格式
  24. AcknowledgeMode.NONE:自动确认
  25. AcknowledgeMode.AUTO:根据情况确认
  26. AcknowledgeMode.MANUAL:手动确认
  27. */
  28. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  29. // qos 中的 prefetchCount
  30. container.setPrefetchCount(1);
  31. // 设置消费者tag名称
  32. container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  33. @Override
  34. public String createConsumerTag(String s) {
  35. return s + "_" + UUID.randomUUID().toString().replace("-", "");
  36. }
  37. });
  38. // 简单的设置 消费逻辑
  39. container.setMessageListener(new ChannelAwareMessageListener() {
  40. @Override
  41. public void onMessage(Message message, Channel channel) throws Exception {
  42. String msg = new String(message.getBody());
  43. System.err.println("消费者进行了消费" + msg);
  44. // 手动 ack
  45. channel.basicAck(
  46. message.getMessageProperties().getDeliveryTag(),
  47. false);
  48. }
  49. });
  50. return container;
  51. }

使用 MessageListenerAdapter

  1. @Bean
  2. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
  4. // 略去其他配置
  5. // 不使用默认的消费逻辑
  6. /*
  7. container.setMessageListener(new ChannelAwareMessageListener() {
  8. @Override
  9. public void onMessage(Message message, Channel channel) throws Exception {
  10. String msg = new String(message.getBody());
  11. System.err.println("消费者进行了消费" + msg);
  12. // 手动 ack
  13. channel.basicAck(
  14. message.getMessageProperties().getDeliveryTag(),
  15. false);
  16. }
  17. });
  18. */
  19. // 消息监听适配器,可以更加灵活的给消费者添加自定义的逻辑
  20. // 设置 Delegate(委托) 对象
  21. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  22. /*
  23. public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
  24. private final Map<String, String> queueOrTagToMethodName
  25. = new HashMap<String, String>();
  26. // 默认是处理传入对象的 defaultListenerMethod, 即 handleMessage
  27. //Out-of-the-box value for the default listener method: "handleMessage".
  28. public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
  29. // ...
  30. }
  31. // 则这里会处理 MessageDelegate#handleMessage()
  32. // 即要求 MessageDelegate 实现一个 handleMessage() 方法
  33. */
  34. // 1. 可以修改 defaultListenerMethod 的值
  35. // 比如这里设置后会处理 MessageDelegate#newMethod()
  36. // adapter.setDefaultListenerMethod("newMethod");
  37. // 2. 指定 MessageConverter, 即 defaultListenerMethod 方法的参数形式
  38. // 默认是 byte[] message,这里后续重写后参数为 String
  39. adapter.setMessageConverter(new TextMessageConverter());
  40. // 3. 指定队列和方法匹配器, 让进入 queue 的消息进入对应的 method 进行处理
  41. // 注意参数会被上面自己指定的 MessageConverter 影响,默认是 byte[]
  42. Map<String, String> queueOrTagToMethodName = new HashMap<>();
  43. // queue01 的消息会进入 MessageDelegate#method01(),
  44. queueOrTagToMethodName.put("queue01", "method01");
  45. // queue02 的消息会进入 MessageDelegate#method02()
  46. queueOrTagToMethodName.put("queue02", "method02");
  47. adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
  48. container.setMessageListener(adapter);
  49. return container;
  50. }
  • MessageListenerAdapter 设置的 Delegate

    1. private static class MessageDelegate {
    2. // 默认会进入的方法
    3. public void handleMessage(byte[] messageBody) {
    4. System.err.println(new String(messageBody));
    5. }
    6. // 进行 setDefaultListenerMethod 后默认会进入的方法
    7. public void newMethod(byte[] messageBody) {
    8. System.err.println(new String(messageBody));
    9. }
    10. // queue 绑定,参数会被 MessageConverter 影响
    11. public void method01(String convertMessage){
    12. System.err.println("method01: " + convertMessage);
    13. }
    14. public void method02(String convertMessage){
    15. System.err.println("method02: " +convertMessage);
    16. }
    17. }
  • MessageListenerAdapter 设置的 MessageConverter

    1. private static class TextMessageConverter implements MessageConverter {
    2. /**
    3. * java 对象转 Message
    4. */
    5. @Override
    6. public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
    7. return new Message(o.toString().getBytes(), messageProperties);
    8. }
    9. /**
    10. * 返回值是 DefaultListenerMethod 对应 方法的参数类型
    11. *
    12. * @param message
    13. * @return
    14. * @throws MessageConversionException
    15. */
    16. @Override
    17. public Object fromMessage(Message message) throws MessageConversionException {
    18. // 可以返回不同的类型,让该 Converter 复用
    19. return new String(message.getBody());
    20. }
    21. }

其他MessageConverter 消息转换器

  • json转换器 Jackson2JsonMessageConverter:可以进行 java 对象的转换功能
    • producer 发送的内容是对象的 json 字符串的字节数组
    • producer 发送的时候需要设置 properties 中的 ContentTypeapplication/json ```java // 支持 json 格式的转换器

// 注意,由于后面使用了 jackson2JsonMessageConverter // 所以要求 Delegate 类的 defaultListenerMethod() 的形参是 Map 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2Method”);

// 给 adapter 添加 jackson2JsonMessageConverter Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter);

container.setMessageListener(adapter);

  1. - Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper 映射器:可以进行 java 对象的映射关系
  2. - **producer 发送的内容是对象的 json 字符串的字节数组**
  3. - **producer 发送的时候需要设置 `properties` 中的 `ContentType` `application/json`**
  4. - **producer 发送的时候需要设置一个 **`**headers**`**,键值为 **`**"__TypedId__": "pojo全类名"**`
  5. ```java
  6. //支持 java 对象转换
  7. // 注意,由于后面使用了 jackson2JsonMessageConverter 和 javaTypeMapper
  8. // 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的 pojo 类型
  9. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  10. adapter.setDefaultListenerMethod("json2PojoMethod");
  11. // 给 jackson2JsonMessageConverter 添加一个 javaTypeMapper
  12. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  13. DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
  14. jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
  15. // 给 adapter 添加 jackson2JsonMessageConverter
  16. adapter.setMessageConverter(jackson2JsonMessageConverter);
  17. container.setMessageListener(adapter);
  • Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper+ idClassMapping
    • producer 发送的内容是对象的 json 字符串的字节数组
    • producer 发送的时候需要设置 properties 中的 ContentTypeapplication/json
    • producer 发送的时候需要设置一个 **headers**,键值为 **"__TypedId__": "配置的全类名"**
      • 注意这里只要下面 idClassMapping 配置好的类名即可 ```java // java 对象多映射转换

// 和上面类似, 由于使用了 IdClassMapping // 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的pojo 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2MorePojoMethod”);

// 给 jackson2JsonMessageConverter 添加一个 javaTypeMapper Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

// 给 javaTypeMapper 添加 实体类和类名 的映射关系, 可以添加多个 Map> idClassMapping = new HashMap<>(); idClassMapping.put(“order”, com.example.rabbitmqapi.model.Order.class);

javaTypeMapper.setIdClassMapping(idClassMapping);

adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);

  1. - 全局转换器 可以配置多个转换器,根据 `Content-Type` 进行对应转换器投递
  2. ```java
  3. // 全局的转换器
  4. MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
  5. // 消息会转发 Delegate 类的 defaultListenerMethod()
  6. adapter.setDefaultListenerMethod("moreMethod");
  7. ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
  8. // 根据 ContentType 来将消息投放到对应的转换器
  9. TextMessageConverter textMessageConverter = new TextMessageConverter();
  10. converter.addDelegate("text", textMessageConverter);
  11. converter.addDelegate("text/plain", textMessageConverter);
  12. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  13. converter.addDelegate("json", jackson2JsonMessageConverter);
  14. converter.addDelegate("application/json", jackson2JsonMessageConverter);
  15. // 还可以设置其他处理图片、pdf等的转换器
  16. // 将全局 convert 设置进 adapter
  17. adapter.setMessageConverter(converter);
  18. container.setMessageListener(adapter);
  19. return container;
  • 自定义二进制转换器
    • 需要继承 MessageConverter,注意实现的 toMessage 会对应 adapter 的 setDefaultListenerMethod 方法的参数值

整合 springboot

application.properties

  1. spring.rabbitmq.addresses=127.0.0.1
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/
  6. spring.rabbitmq.connection-timeout=15000
  7. spring.http.encoding.charset=UTF-8
  8. spring.jackson.date-format=yyy-MM-dd HH:mm:ss
  9. spring.jackson.time-zone=UTC
  10. spring.jackson.default-property-inclusion=NON_NULL
  11. # producer 端配置
  12. spring.rabbitmq.publisher-confirm-type=simple
  13. spring.rabbitmq.publisher-returns=true
  14. # 保证监听有效
  15. spring.rabbitmq.template.mandatory=true
  16. # consumer 端配置
  17. spring.rabbitmq.listener.simple.concurrency=5
  18. spring.rabbitmq.listener.simple.max-concurrency=10
  19. # 签收模式 auto 自动 manual 手动
  20. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  21. # 单个线程同时消费消息个数 (qos)
  22. spring.rabbitmq.listener.simple.prefetch=1

producer

  • 可以给 RabbitTemplate 绑定 publisher-confirms 和 publiser-returns

    • 前提是开启 spring.rabbitmq.template.mandatory=true 保证监听有效
    • 可以配置其他属性,比如发送重试,超时时间,次数,间隔
    • CorrelationData 用于可靠性投递 ```java @Component public class SpringbootProducer { @Autowired private RabbitTemplate rabbitTemplate; /**

      • avoid
      • java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate */ @PostConstruct public void initCallback(){ // 注册 confirm callback // 用于监听 broker 端返回的确认请求 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

        1. @Override
        2. public void confirm(CorrelationData correlationData, boolean b, String s) {
        3. }

        });

        // 注册 return callback // 保证消息对broker端是可达的,如果出现路由键不可达,则需要对不可达的消息进行后续的处理 // 保证消息的路由成功 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

        1. @Override
        2. public void returnedMessage(org.springframework.amqp.core.Message message, int i, String s, String s1, String s2) {
        3. }

        }); } public void send(Object payload, HashMap properties, String exchangeName, String routingKey) throws Exception{ MessageHeaders messageHeaders = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(payload, messageHeaders);

        CorrelationData correlationData = new CorrelationData(“100”); System.out.println(correlationData); rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData); } }

  1. <a name="6ZLrz"></a>
  2. ## consumer
  3. - 使用 `@RabbitMQListener` 注解
  4. - 可以配置 `@QueueBinding` `@Queue` `@Exchange` 等直接通过该注解直接完成消费端的交换机、队列、绑定路由、配置监听功能等
  5. ```java
  6. import com.example.rabbitmqapi.model.Order;
  7. import com.rabbitmq.client.Channel;
  8. import org.springframework.amqp.rabbit.annotation.*;
  9. import org.springframework.amqp.support.AmqpHeaders;
  10. import org.springframework.messaging.Message;
  11. import org.springframework.messaging.MessageHeaders;
  12. import org.springframework.messaging.handler.annotation.Headers;
  13. import org.springframework.messaging.handler.annotation.Payload;
  14. import org.springframework.stereotype.Component;
  15. import java.util.Map;
  16. @Component
  17. public class SpringbootConsumer {
  18. @RabbitListener(bindings = {
  19. @QueueBinding(
  20. value = @Queue(value = "queue1", durable = "true", autoDelete = "false"),
  21. exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),
  22. key = {"springboot.normal"},
  23. ignoreDeclarationExceptions = "false",
  24. declare = "true"
  25. )
  26. })
  27. @RabbitHandler
  28. public void onNormalMessage(Message message, Channel channel) throws Exception {
  29. // 消息体
  30. Object payLoadBody = message.getPayload();
  31. MessageHeaders messageHeaders = message.getHeaders();
  32. long deliveryTag = (long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
  33. // 手动 ack
  34. // 需要 spring.rabbitmq.listener.simple.acknowledge-mode=manual
  35. channel.basicAck(deliveryTag, false);
  36. }
  37. @RabbitListener(bindings = {
  38. @QueueBinding(
  39. value = @Queue(value = "queue2", durable = "true", autoDelete = "false"),
  40. exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),
  41. key = {"springboot.pojo"},
  42. ignoreDeclarationExceptions = "false",
  43. declare = "true"
  44. )
  45. })
  46. @RabbitHandler
  47. public void onPojoMessage(@Payload Order order, // 注解标明这是 消息体
  48. @Headers Map<String, Object> headers, // 注解标明这是 headers
  49. Channel channel) throws Exception{
  50. System.out.println(order);
  51. System.out.println(headers);
  52. Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  53. channel.basicAck(deliveryTag, false);
  54. }
  55. }