RabbitMQ整合Spring AMQP
RabbitAdmin操作以及SpringAMQP声明方式
依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
RabbitMQConfig示例
@Configurationpublic class RabbitMQConfig {@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("139.9.62.232:30004");connectionFactory.setUsername("xinzhang");connectionFactory.setPassword("Xinzhang123");connectionFactory.setVirtualHost("xinzhang");return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 必须为true, spring容器启动时将rabbitAdmin注入rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}@Beanpublic Exchange xztest01Exchange() {return ExchangeBuilder.topicExchange("xztest01").build();}@Beanpublic Exchange xztest02Exchange() {return ExchangeBuilder.topicExchange("xztest02").build();}@Beanpublic Queue xztest001Queue() {return QueueBuilder.durable("xztest001").build();}@Beanpublic Queue xztest002Queue() {return QueueBuilder.durable("xztest002").build();}@Beanpublic Binding xztest001QueueBindingWithTest(@Qualifier("xztest001Queue") Queue xztest001Queue,@Qualifier("xztest01Exchange") Exchange xztest01Exchange) {return BindingBuilder.bind(xztest001Queue).to(xztest01Exchange).with("test.01.#").noargs();}@Beanpublic Binding xztest002QueueBindingWithTest(@Qualifier("xztest002Queue") Queue xztest002Queue,@Qualifier("xztest02Exchange") Exchange xztest02Exchange) {return BindingBuilder.bind(xztest002Queue).to(xztest02Exchange).with("test.02.#").noargs();}}
RabbitAdmin相关操作
@SpringBootTest(classes = ConsumerApplication.class)@RunWith(SpringRunner.class)public class ConsumerTest {@Autowiredprivate RabbitAdmin rabbitAdmin;@Testpublic void testRabbitAdmin() {TopicExchange xztestExchange = new TopicExchange("xztest", true, false);rabbitAdmin.declareExchange(xztestExchange);rabbitAdmin.declareQueue(new Queue("xztest01", true, false, false));rabbitAdmin.declareBinding(new Binding("xztest01", DestinationType.QUEUE, "xztest", "test.#", null));rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("xztest02", true)).to(xztestExchange).with("test02.#"));}}
RabbitTemplate操作
配置
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}
使用示例
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testRabbitTemplate() {MessageProperties messageProperties = new MessageProperties();Map<String, Object> headers = messageProperties.getHeaders();headers.put("desc", "test信息");Message message = new Message("hello! rabbitmq!".getBytes(), messageProperties);rabbitTemplate.convertAndSend("xztest01", "test.01.save", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.out.println("添加额外的设置");message.getMessageProperties().getHeaders().put("desc", "信息2");return message;}});rabbitTemplate.send("xztest01", "test.01.save2", message);}
SimpleMessageListenerContainer简单消息监听容器
- 可以监听多个队列, 自动启用, 自动声明
- 设置事务特性, 事务管理器, 事务属性, 事务容量(并发), 是否开启事务, 回滚消息等
- 设置消费者数量, 最小最大数量, 批量消费
- 设置消息确认和自动确认模式, 是否重回队列, 异常捕获handler函数
- 设置消费者标签生成策略, 是否独占模式, 消费者属性等
- 设置具体的监听器, 消息转换器等
配置示例
@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 监听的队列container.setQueues(xztest001Queue());// 设置并发消费者数量container.setConcurrentConsumers(3);// 设置最多的并发消费者数量container.setMaxConcurrentConsumers(5);// 重回队列container.setDefaultRequeueRejected(false);// 设置自动ackcontainer.setAcknowledgeMode(AcknowledgeMode.AUTO);// 自定义consumerTag的生成方式container.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queueName) {return queueName + "_" + UUID.randomUUID().toString();}});// 消息监听方法container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("接收到消息: " + new String(message.getBody()));}});return container;}
MessageListenerAdapter消息监听适配器
- 使用自定义的MessageDelegate消息监听对象, 可以自定义消息转化器
- 可以使队列名称与方法进行一一对应
messageContainer配置示例
@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 设置消息监听适配器MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("consumeMessage");adapter.setMessageConverter(new TextMessageConverter());// 使队列与方法名一一对应// HashMap<String, String> queueOrTagToMethodName = new HashMap<>(2);// queueOrTagToMethodName.put("xztest001","method1");// adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);container.setMessageListener(adapter);...}
MessageDelegate
public class MessageDelegate {/*** MessageListenerAdapter默认的方法*/// public void handleMessage(byte[] messageBody) {// System.out.println("接收到消息: " + new String(messageBody));// }/*** 自定义的方法, 有了转换器, 参数转成了String*/public void consumeMessage(String message) {System.out.println("接收到消息: " + message);}}
TextMessageConverter
package top.xinzhang0618.consumer.spring;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.MessageConversionException;import org.springframework.amqp.support.converter.MessageConverter;/*** TextMessageConverter** @author xinzhang* @author Shenzhen Greatonce Co Ltd* @version 2020/3/2*/public class TextMessageConverter implements MessageConverter {/*** java对象转成message对象*/@Overridepublic Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {return new Message(o.toString().getBytes(), messageProperties);}/*** message对象转成java对象*/@Overridepublic Object fromMessage(Message message) throws MessageConversionException {// 此处可利用messageProperties做相关的判断// MessageProperties messageProperties = message.getMessageProperties();return new String(message.getBody());}}
MessageConverter消息转换器
可以设置多种类型的消息转换器, 代码示例如下:
// 1.自定义文本转换器, 对应java参数类型: String messageTextMessageConverter textMessageConverter = new TextMessageConverter();// 2.支持json格式的转换器, 对应java参数类型: Map messageBody// 必需: messageProperties.setContentType("application/json");Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 3.Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper 支持java对象转换, 对应java参数类型: java类// 除contentType声明为json外, 必需: messageProperties.getHeaders().put("__TypeId__","类的全路径");DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);//4.支持java对象多映射转换, 略//5.多种converter一起使用, 需要设置对应的contentTypeContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();converter.addDelegate("text",textMessageConverter);converter.addDelegate("xml/text",textMessageConverter);converter.addDelegate("html/text",textMessageConverter);converter.addDelegate("text/plain",textMessageConverter);converter.addDelegate("json",jackson2JsonMessageConverter);converter.addDelegate("application/json",jackson2JsonMessageConverter);adapter.setMessageConverter(converter);
