一、在springboot中引入rabbitmq的场景之后,并启动springboot主启动类,此时rabbitmq通过RabbitAutoConfiguration类自动装配了那些配置?
1、创建连接并获取rabbitmq的服务,将springboot配置文件有关rabbitmq的配置封装到RabbitProperties中
2、给spring容器中加入RabbitTemplate工具类
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
3、自动装配时给RabbitTemplate在springboot配置文件的配置
@Bean
@ConditionalOnMissingBean
public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(properties);
configurer.setMessageConverter(messageConverter.getIfUnique());
configurer
.setRetryTemplateCustomizers(retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));
return configurer;
}
二、e6yun3.0 自己封装的rabbitmq
1、配置类自动装配了连接rabbitmq服务时所需要的几大件
@Configuration
public class AmqpConfig {
@Bean
@Primary
public ConnectionFactory defaultConnectionFactory(
@Value("${spring.rabbitmq.default.host}") String host,
@Value("${spring.rabbitmq.default.port}") int port,
@Value("${spring.rabbitmq.default.username}") String username,
@Value("${spring.rabbitmq.default.password}") String password,
@Value("${spring.rabbitmq.default.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
2、专门对与消费者进行的配置
@Bean
public SimpleRabbitListenerContainerFactory defaultFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
3、消费者
public class TmsEventMessageReceiver {
private static Logger logger = LoggerFactory.getLogger(TmsEventMessageReceiver.class);
/**
消费者基于注解形式的创建和绑定
*/
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(value = "${e6yun3.event.receiver.queue.name}", // 给存放消息的队列起一个名字
durable = "${e6yun3.event.receiver.queue.durable:true}", // 是否持久化
autoDelete = "${e6yun3.event.receiver.queue.autoDelete:true}",// 消息消费了之后是否进行删除
exclusive = "${e6yun3.event.receiver.queue.exclusive:false}"),// 是否具有排他性
exchange = @Exchange(value = "${e6yun3.event.receiver.exchange}", // 给交换机起一个名字
type = "${e6yun3.event.receiver.exchange.type:topic}"), // 指定交换机类型
key = "${e6yun3.event.receiver.exchange.routing.key}")}, // 用来指定消费者和交换机中间的路由匹配规则
containerFactory = "defaultFactory") // 一般使用默认的连接工厂
public void processor(Message message, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
@Header("amqp_redelivered") boolean redelivered) {
/**
消费者拿到消息之后,对该消息进行处理的业务
*/
byte[] body = message.getBody();
String messageStr = "";
try {
messageStr = new String(body, "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
//如果发送方有压缩,这里要解压
String msgInfo = GZipUtils.gunzip(messageStr, "GB2312");
//业务处理在这里
logger.info(msgInfo);
MqHandleUtils.handleReceiver(channel, deliveryTag);
}
}
4、生产者
/**
该消息发送的类中,两个方法进行了重载,并重载的两个方法的区别就是多了一个文本类型
并且两个方法的发送机制都是调用rabbitTemplate.convertAndSend(exchangeName, routingKey, messageBean);这个方法,需要三个参数
1、交换机的名称
2、消息队列和交换机的匹配规则
3、所要发送的消息 (messageBean)
*/
public void sendMessage(RabbitTemplateEnum rabbitTemplateEnum, String exchangeName, String routingKey, String message, boolean zip) {
//判断是否加密
if(zip) {
message = GZipUtils.gzip(message, GZipUtils.DEFAULT_CHARSET);
}
//构造mq template
RabbitTemplate rabbitTemplate = beanFactory.getBean(rabbitTemplateEnum.getBeanName(), RabbitTemplate.class);
Message messageBean = MessageBuilder.withBody(message.getBytes())
.setHeader("routingKey",routingKey)
.build();
//推送mq
rabbitTemplate.convertAndSend(exchangeName, routingKey, messageBean);
logger.info("推送mq消息routingKey:{},内容:{}",routingKey, message);
}