bean 静态绑定
@Getter@Configuration@ConfigurationProperties(prefix = "application.rabbit")public class RabbitMQProperties {@ApiModelProperty("死信交换机默认配置")private final DefaultMessage defaultMessage = new DefaultMessage();@Data@Componentpublic static class DefaultMessage {private final CustomerExchange exchange = new CustomerExchange();private final CustomerQueue queue = new CustomerQueue();private final CustomerBinding binding = new CustomerBinding();}@Data@Component@ApiModel("消费者交换机")public static class CustomerExchange {@ApiModelProperty("交换机名称")private String name = "defaultExchange";@ApiModelProperty("交换机是否持久化")private Boolean durable = true;@ApiModelProperty("是否自动删除空闲的交换机")private Boolean autoDelete = false;@ApiModelProperty("对交换机属性进行配置")private Map<String, Object> arguments = new HashMap<>(10);}@Data@Component@ApiModel("消费者队列")public static class CustomerQueue {@ApiModelProperty("队列名称")private String name = "defaultQueue";@ApiModelProperty("队列是否持久化")private Boolean durable = true;@ApiModelProperty("是否连接独占")private Boolean exclusive = false;@ApiModelProperty("是否自动删除空闲的交换机")private Boolean autoDelete = false;@ApiModelProperty("对消息属性进行配置")private Map<String, Object> arguments = new HashMap<>(10);}@Data@Componentpublic static class CustomerBinding {/*** 默认路由(#:表示拦截任意字符的路由,*:表示拦截任意的单个字符,其他拦截比如:user.ABC 等自定义路由)*/private String routingKey = "#";}}
yml
rabbit:enabled: falsedefault-message:exchange:name: defaultExchangequeue:name: defaultQueuebinding:routing-key: "#"
@Configuration@EnableRabbit@ConditionalOnExpression("'${application.rabbit.enabled}'.equals('true')")public class RabbitMqConfiguration {private static RabbitMqConfiguration rabbitMqConfiguration;@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate RabbitMQProperties rabbitMQProperties;@Autowiredprivate Map<String, MessageQueueConfigInterface> messageQueueConfigInterfaceMap;@PostConstructpublic void initTestCreateQueue() {messageQueueConfigInterfaceMap.values().forEach(MessageQueueConfigInterface::init);}@Beanpublic DefaultClassMapper classMapper() {DefaultClassMapper classMapper = new DefaultClassMapper();classMapper.setTrustedPackages("*");return classMapper;}/*** 开启多例模式 用于实现多个不同的生产者回调不同** @return RabbitTemplate*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jsonMessageConverter());rabbitTemplate.setMandatory(true);return rabbitTemplate;}/*** producer 消息转换器*/@Beanpublic MessageConverter jsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);messageConverter.setClassMapper(classMapper());return messageConverter;}/*** consumer 消息转换器*/@Beanpublic RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/*** 1. 创建死信交换机* 2. 创建死信队列* 3. 绑定关系* <p>* 死信队列 DLQ Queue (默认消息队列)* 消息来源:* 消息被拒绝(basic.reject或basic.nack)并且requeue=false.* 消息TTL过期* 队列达到最大长度(队列满了,无法再添加数据到mq中)* 处理方式:* 丢弃,如果不是很重要,可以选择丢弃* 记录死信入库,然后做后续的业务分析或处理* 通过死信队列,由负责监听死信的应用程序进行处理** @return Queue*/@Bean@Order(1)public TopicExchange deadExchange() {RabbitMQProperties.CustomerExchange exchange = rabbitMQProperties.getDefaultMessage().getExchange();return new TopicExchange(exchange.getName(), exchange.getDurable(), exchange.getAutoDelete(), exchange.getArguments());}@Bean@Order(2)public Queue deadQueue() {RabbitMQProperties.CustomerQueue queue = rabbitMQProperties.getDefaultMessage().getQueue();return new Queue(queue.getName(), queue.getDurable(), queue.getExclusive(), queue.getAutoDelete(), queue.getArguments());}@Bean@Order(3)public Binding deadBinding() {RabbitMQProperties.CustomerBinding binding = rabbitMQProperties.getDefaultMessage().getBinding();return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(binding.getRoutingKey());}/*** 通用交换机配置*/public final static String EXCHANGE_NAME = "CommonMessageExchange";public final static String QUEUE_NAME = "CommonMessageQueue";public final static String ROUTING_KEY_NAME = "message.ABC";@Beanpublic TopicExchange comExchange() {return new TopicExchange(EXCHANGE_NAME, true, false);}@Beanpublic Queue comQueue() {Map<String, Object> arguments = new HashMap<>(10);// 绑定死信交换机 & 设置超时时间arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());arguments.put("x-message-ttl", 10000);arguments.put("x-queue-mode", "lazy");return new Queue(QUEUE_NAME, true, false, false, arguments);}@Beanpublic Binding comBinding() {return BindingBuilder.bind(comQueue()).to(comExchange()).with(ROUTING_KEY_NAME);}}
interface 接口动态绑定
public interface MessageQueueConfigInterface {void init();}
impl
@Slf4j@Componentpublic class EmailQueueImpl implements MessageQueueConfigInterface {@Autowiredprivate AmqpAdmin amqpAdmin;@Autowiredprivate RabbitMQProperties rabbitMQProperties;// Exchangepublic final static String EMAIL_EXCHANGE_NAME = "EmailMessageExchange";// Queuepublic final static String EMAIL_QUEUE_NAME = "EmailMessageQueue";// RoutingKeypublic final static String EMAIL_ROUTING_KEY_NAME = "message.email";@Overridepublic void init() {Exchange exchange = new DirectExchange(EMAIL_EXCHANGE_NAME);amqpAdmin.declareExchange(exchange);amqpAdmin.declareQueue(new Queue(EMAIL_QUEUE_NAME, true));//创建绑定规则amqpAdmin.declareBinding(new Binding(EMAIL_QUEUE_NAME, Binding.DestinationType.QUEUE, EMAIL_EXCHANGE_NAME, EMAIL_ROUTING_KEY_NAME, getArguments()));}public Map<String, Object> getArguments() {Map<String, Object> arguments = new HashMap<>(10);// 绑定死信交换机 & 设置超时时间arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());arguments.put("x-message-ttl", 10000);arguments.put("x-queue-mode", "lazy");return arguments;}}
@ApiModel("短信交换机队列")@Configuration@ConditionalOnExpression("'${application.rabbit.enabled}'.equals('true')")public class SmsQueueImpl implements MessageQueueConfigInterface{@Autowiredprivate AmqpAdmin amqpAdmin;@Autowiredprivate RabbitMQProperties rabbitMQProperties;// Exchangepublic final static String SMS_EXCHANGE_NAME = "SmsMessageExchange";// Queuepublic final static String SMS_QUEUE_NAME = "SmsMessageQueue";// RoutingKeypublic final static String SMS_ROUTING_KEY_NAME = "message.sms";@Autowiredpublic void init() {Exchange exchange = new DirectExchange(SMS_EXCHANGE_NAME);amqpAdmin.declareExchange(exchange);amqpAdmin.declareQueue(new Queue(SMS_QUEUE_NAME, true));//创建绑定规则amqpAdmin.declareBinding(new Binding(SMS_QUEUE_NAME, Binding.DestinationType.QUEUE, SMS_EXCHANGE_NAME, SMS_ROUTING_KEY_NAME, getArguments()));}public Map<String, Object> getArguments() {Map<String, Object> arguments = new HashMap<>(10);// 绑定死信交换机 & 设置超时时间arguments.put("x-dead-letter-exchange", rabbitMQProperties.getDefaultMessage().getExchange().getName());arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getDefaultMessage().getBinding().getRoutingKey());arguments.put("x-message-ttl", 10000);arguments.put("x-queue-mode", "lazy");return arguments;}}
