RabbitMQ整合Spring AMQP
RabbitAdmin操作以及SpringAMQP声明方式
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQConfig示例
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("139.9.62.232:30004");
connectionFactory.setUsername("xinzhang");
connectionFactory.setPassword("Xinzhang123");
connectionFactory.setVirtualHost("xinzhang");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 必须为true, spring容器启动时将rabbitAdmin注入
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public Exchange xztest01Exchange() {
return ExchangeBuilder.topicExchange("xztest01").build();
}
@Bean
public Exchange xztest02Exchange() {
return ExchangeBuilder.topicExchange("xztest02").build();
}
@Bean
public Queue xztest001Queue() {
return QueueBuilder.durable("xztest001").build();
}
@Bean
public Queue xztest002Queue() {
return QueueBuilder.durable("xztest002").build();
}
@Bean
public Binding xztest001QueueBindingWithTest(@Qualifier("xztest001Queue") Queue xztest001Queue,
@Qualifier("xztest01Exchange") Exchange xztest01Exchange) {
return BindingBuilder.bind(xztest001Queue).to(xztest01Exchange).with("test.01.#").noargs();
}
@Bean
public Binding xztest002QueueBindingWithTest(@Qualifier("xztest002Queue") Queue xztest002Queue,
@Qualifier("xztest02Exchange") Exchange xztest02Exchange) {
return BindingBuilder.bind(xztest002Queue).to(xztest02Exchange).with("test.02.#").noargs();
}
}
RabbitAdmin相关操作
@SpringBootTest(classes = ConsumerApplication.class)
@RunWith(SpringRunner.class)
public class ConsumerTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testRabbitAdmin() {
TopicExchange xztestExchange = new TopicExchange("xztest", true, false);
rabbitAdmin.declareExchange(xztestExchange);
rabbitAdmin.declareQueue(new Queue("xztest01", true, false, false));
rabbitAdmin.declareBinding(new Binding("xztest01", DestinationType.QUEUE, "xztest", "test.#", null));
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("xztest02", true)).to(xztestExchange).with("test02.#"));
}
}
RabbitTemplate操作
配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
使用示例
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testRabbitTemplate() {
MessageProperties messageProperties = new MessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
headers.put("desc", "test信息");
Message message = new Message("hello! rabbitmq!".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("xztest01", "test.01.save", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("添加额外的设置");
message.getMessageProperties().getHeaders().put("desc", "信息2");
return message;
}
});
rabbitTemplate.send("xztest01", "test.01.save2", message);
}
SimpleMessageListenerContainer简单消息监听容器
- 可以监听多个队列, 自动启用, 自动声明
- 设置事务特性, 事务管理器, 事务属性, 事务容量(并发), 是否开启事务, 回滚消息等
- 设置消费者数量, 最小最大数量, 批量消费
- 设置消息确认和自动确认模式, 是否重回队列, 异常捕获handler函数
- 设置消费者标签生成策略, 是否独占模式, 消费者属性等
- 设置具体的监听器, 消息转换器等
配置示例
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
connectionFactory);
// 监听的队列
container.setQueues(xztest001Queue());
// 设置并发消费者数量
container.setConcurrentConsumers(3);
// 设置最多的并发消费者数量
container.setMaxConcurrentConsumers(5);
// 重回队列
container.setDefaultRequeueRejected(false);
// 设置自动ack
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 自定义consumerTag的生成方式
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queueName) {
return queueName + "_" + UUID.randomUUID().toString();
}
});
// 消息监听方法
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("接收到消息: " + new String(message.getBody()));
}
});
return container;
}
MessageListenerAdapter消息监听适配器
- 使用自定义的MessageDelegate消息监听对象, 可以自定义消息转化器
- 可以使队列名称与方法进行一一对应
messageContainer配置示例
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
connectionFactory);
// 设置消息监听适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
// 使队列与方法名一一对应
// HashMap<String, String> queueOrTagToMethodName = new HashMap<>(2);
// queueOrTagToMethodName.put("xztest001","method1");
// adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
...
}
MessageDelegate
public class MessageDelegate {
/**
* MessageListenerAdapter默认的方法
*/
// public void handleMessage(byte[] messageBody) {
// System.out.println("接收到消息: " + new String(messageBody));
// }
/**
* 自定义的方法, 有了转换器, 参数转成了String
*/
public void consumeMessage(String message) {
System.out.println("接收到消息: " + message);
}
}
TextMessageConverter
package top.xinzhang0618.consumer.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* TextMessageConverter
*
* @author xinzhang
* @author Shenzhen Greatonce Co Ltd
* @version 2020/3/2
*/
public class TextMessageConverter implements MessageConverter {
/**
* java对象转成message对象
*/
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
/**
* message对象转成java对象
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// 此处可利用messageProperties做相关的判断
// MessageProperties messageProperties = message.getMessageProperties();
return new String(message.getBody());
}
}
MessageConverter消息转换器
可以设置多种类型的消息转换器, 代码示例如下:
// 1.自定义文本转换器, 对应java参数类型: String message
TextMessageConverter textMessageConverter = new TextMessageConverter();
// 2.支持json格式的转换器, 对应java参数类型: Map messageBody
// 必需: messageProperties.setContentType("application/json");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 3.Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper 支持java对象转换, 对应java参数类型: java类
// 除contentType声明为json外, 必需: messageProperties.getHeaders().put("__TypeId__","类的全路径");
DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
//4.支持java对象多映射转换, 略
//5.多种converter一起使用, 需要设置对应的contentType
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
converter.addDelegate("text",textMessageConverter);
converter.addDelegate("xml/text",textMessageConverter);
converter.addDelegate("html/text",textMessageConverter);
converter.addDelegate("text/plain",textMessageConverter);
converter.addDelegate("json",jackson2JsonMessageConverter);
converter.addDelegate("application/json",jackson2JsonMessageConverter);
adapter.setMessageConverter(converter);