bean 静态绑定

    1. @Getter
    2. @Configuration
    3. @ConfigurationProperties(prefix = "application.rabbit")
    4. public class RabbitMQProperties {
    5. @ApiModelProperty("死信交换机默认配置")
    6. private final DefaultMessage defaultMessage = new DefaultMessage();
    7. @Data
    8. @Component
    9. public static class DefaultMessage {
    10. private final CustomerExchange exchange = new CustomerExchange();
    11. private final CustomerQueue queue = new CustomerQueue();
    12. private final CustomerBinding binding = new CustomerBinding();
    13. }
    14. @Data
    15. @Component
    16. @ApiModel("消费者交换机")
    17. public static class CustomerExchange {
    18. @ApiModelProperty("交换机名称")
    19. private String name = "defaultExchange";
    20. @ApiModelProperty("交换机是否持久化")
    21. private Boolean durable = true;
    22. @ApiModelProperty("是否自动删除空闲的交换机")
    23. private Boolean autoDelete = false;
    24. @ApiModelProperty("对交换机属性进行配置")
    25. private Map<String, Object> arguments = new HashMap<>(10);
    26. }
    27. @Data
    28. @Component
    29. @ApiModel("消费者队列")
    30. public static class CustomerQueue {
    31. @ApiModelProperty("队列名称")
    32. private String name = "defaultQueue";
    33. @ApiModelProperty("队列是否持久化")
    34. private Boolean durable = true;
    35. @ApiModelProperty("是否连接独占")
    36. private Boolean exclusive = false;
    37. @ApiModelProperty("是否自动删除空闲的交换机")
    38. private Boolean autoDelete = false;
    39. @ApiModelProperty("对消息属性进行配置")
    40. private Map<String, Object> arguments = new HashMap<>(10);
    41. }
    42. @Data
    43. @Component
    44. public static class CustomerBinding {
    45. /**
    46. * 默认路由(#:表示拦截任意字符的路由,*:表示拦截任意的单个字符,其他拦截比如:user.ABC 等自定义路由)
    47. */
    48. private String routingKey = "#";
    49. }
    50. }

    yml

    1. rabbit:
    2. enabled: false
    3. default-message:
    4. exchange:
    5. name: defaultExchange
    6. queue:
    7. name: defaultQueue
    8. binding:
    9. routing-key: "#"
    1. @Configuration
    2. @EnableRabbit
    3. @ConditionalOnExpression("'${application.rabbit.enabled}'.equals('true')")
    4. public class RabbitMqConfiguration {
    5. private static RabbitMqConfiguration rabbitMqConfiguration;
    6. @Autowired
    7. private CachingConnectionFactory connectionFactory;
    8. @Autowired
    9. private ObjectMapper objectMapper;
    10. @Autowired
    11. private RabbitMQProperties rabbitMQProperties;
    12. @Autowired
    13. private Map<String, MessageQueueConfigInterface> messageQueueConfigInterfaceMap;
    14. @PostConstruct
    15. public void initTestCreateQueue() {
    16. messageQueueConfigInterfaceMap.values().forEach(MessageQueueConfigInterface::init);
    17. }
    18. @Bean
    19. public DefaultClassMapper classMapper() {
    20. DefaultClassMapper classMapper = new DefaultClassMapper();
    21. classMapper.setTrustedPackages("*");
    22. return classMapper;
    23. }
    24. /**
    25. * 开启多例模式 用于实现多个不同的生产者回调不同
    26. *
    27. * @return RabbitTemplate
    28. */
    29. @Bean
    30. @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    31. public RabbitTemplate rabbitTemplate() {
    32. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    33. rabbitTemplate.setMessageConverter(jsonMessageConverter());
    34. rabbitTemplate.setMandatory(true);
    35. return rabbitTemplate;
    36. }
    37. /**
    38. * producer 消息转换器
    39. */
    40. @Bean
    41. public MessageConverter jsonMessageConverter() {
    42. Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
    43. messageConverter.setClassMapper(classMapper());
    44. return messageConverter;
    45. }
    46. /**
    47. * consumer 消息转换器
    48. */
    49. @Bean
    50. public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    51. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    52. factory.setConnectionFactory(connectionFactory);
    53. factory.setMessageConverter(new Jackson2JsonMessageConverter());
    54. return factory;
    55. }
    56. /**
    57. * 1. 创建死信交换机
    58. * 2. 创建死信队列
    59. * 3. 绑定关系
    60. * <p>
    61. * 死信队列 DLQ Queue (默认消息队列)
    62. * 消息来源:
    63. * 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    64. * 消息TTL过期
    65. * 队列达到最大长度(队列满了,无法再添加数据到mq中)
    66. * 处理方式:
    67. * 丢弃,如果不是很重要,可以选择丢弃
    68. * 记录死信入库,然后做后续的业务分析或处理
    69. * 通过死信队列,由负责监听死信的应用程序进行处理
    70. *
    71. * @return Queue
    72. */
    73. @Bean
    74. @Order(1)
    75. public TopicExchange deadExchange() {
    76. RabbitMQProperties.CustomerExchange exchange = rabbitMQProperties.getDefaultMessage().getExchange();
    77. return new TopicExchange(exchange.getName(), exchange.getDurable(), exchange.getAutoDelete(), exchange.getArguments());
    78. }
    79. @Bean
    80. @Order(2)
    81. public Queue deadQueue() {
    82. RabbitMQProperties.CustomerQueue queue = rabbitMQProperties.getDefaultMessage().getQueue();
    83. return new Queue(queue.getName(), queue.getDurable(), queue.getExclusive(), queue.getAutoDelete(), queue.getArguments());
    84. }
    85. @Bean
    86. @Order(3)
    87. public Binding deadBinding() {
    88. RabbitMQProperties.CustomerBinding binding = rabbitMQProperties.getDefaultMessage().getBinding();
    89. return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(binding.getRoutingKey());
    90. }
    91. /**
    92. * 通用交换机配置
    93. */
    94. public final static String EXCHANGE_NAME = "CommonMessageExchange";
    95. public final static String QUEUE_NAME = "CommonMessageQueue";
    96. public final static String ROUTING_KEY_NAME = "message.ABC";
    97. @Bean
    98. public TopicExchange comExchange() {
    99. return new TopicExchange(EXCHANGE_NAME, true, false);
    100. }
    101. @Bean
    102. public Queue comQueue() {
    103. Map<String, Object> arguments = new HashMap<>(10);
    104. // 绑定死信交换机 & 设置超时时间
    105. arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());
    106. arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());
    107. arguments.put("x-message-ttl", 10000);
    108. arguments.put("x-queue-mode", "lazy");
    109. return new Queue(QUEUE_NAME, true, false, false, arguments);
    110. }
    111. @Bean
    112. public Binding comBinding() {
    113. return BindingBuilder.bind(comQueue()).to(comExchange()).with(ROUTING_KEY_NAME);
    114. }
    115. }

    interface 接口动态绑定

    1. public interface MessageQueueConfigInterface {
    2. void init();
    3. }

    impl

    1. @Slf4j
    2. @Component
    3. public class EmailQueueImpl implements MessageQueueConfigInterface {
    4. @Autowired
    5. private AmqpAdmin amqpAdmin;
    6. @Autowired
    7. private RabbitMQProperties rabbitMQProperties;
    8. // Exchange
    9. public final static String EMAIL_EXCHANGE_NAME = "EmailMessageExchange";
    10. // Queue
    11. public final static String EMAIL_QUEUE_NAME = "EmailMessageQueue";
    12. // RoutingKey
    13. public final static String EMAIL_ROUTING_KEY_NAME = "message.email";
    14. @Override
    15. public void init() {
    16. Exchange exchange = new DirectExchange(EMAIL_EXCHANGE_NAME);
    17. amqpAdmin.declareExchange(exchange);
    18. amqpAdmin.declareQueue(new Queue(EMAIL_QUEUE_NAME, true));
    19. //创建绑定规则
    20. amqpAdmin.declareBinding(new Binding(EMAIL_QUEUE_NAME, Binding.DestinationType.QUEUE, EMAIL_EXCHANGE_NAME, EMAIL_ROUTING_KEY_NAME, getArguments()));
    21. }
    22. public Map<String, Object> getArguments() {
    23. Map<String, Object> arguments = new HashMap<>(10);
    24. // 绑定死信交换机 & 设置超时时间
    25. arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());
    26. arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());
    27. arguments.put("x-message-ttl", 10000);
    28. arguments.put("x-queue-mode", "lazy");
    29. return arguments;
    30. }
    31. }
    1. @ApiModel("短信交换机队列")
    2. @Configuration
    3. @ConditionalOnExpression("'${application.rabbit.enabled}'.equals('true')")
    4. public class SmsQueueImpl implements MessageQueueConfigInterface{
    5. @Autowired
    6. private AmqpAdmin amqpAdmin;
    7. @Autowired
    8. private RabbitMQProperties rabbitMQProperties;
    9. // Exchange
    10. public final static String SMS_EXCHANGE_NAME = "SmsMessageExchange";
    11. // Queue
    12. public final static String SMS_QUEUE_NAME = "SmsMessageQueue";
    13. // RoutingKey
    14. public final static String SMS_ROUTING_KEY_NAME = "message.sms";
    15. @Autowired
    16. public void init() {
    17. Exchange exchange = new DirectExchange(SMS_EXCHANGE_NAME);
    18. amqpAdmin.declareExchange(exchange);
    19. amqpAdmin.declareQueue(new Queue(SMS_QUEUE_NAME, true));
    20. //创建绑定规则
    21. amqpAdmin.declareBinding(new Binding(SMS_QUEUE_NAME, Binding.DestinationType.QUEUE, SMS_EXCHANGE_NAME, SMS_ROUTING_KEY_NAME, getArguments()));
    22. }
    23. public Map<String, Object> getArguments() {
    24. Map<String, Object> arguments = new HashMap<>(10);
    25. // 绑定死信交换机 & 设置超时时间
    26. arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());
    27. arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());
    28. arguments.put("x-message-ttl", 10000);
    29. arguments.put("x-queue-mode", "lazy");
    30. return arguments;
    31. }
    32. }