1. RabbitMQ 整合 Spring AMQP
AMQP 核心组件:
- RabbitAdmin
 - SpringAMQP 声明
 - RabbitTemplate
 - SimpleMessageListenerContainer
 - MessageListenerAdapter
 - 
1.1 SpringAMQP 用户管理组件:RabbitAdmin
1.1.1 描述
 RabbitAdmin 类可以很好的操作 RabbitMQ,在 Spring 中直接进行注入即可:
@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
RabbitMQ 类实现了一些接口,其中的
InitializingBean接口包含afterPropertiesSet()抽象方法,作用是在所有 Bean 加载完成后初始化设置;public interface InitializingBean {void afterPropertiesSet() throws Exception;}
@ManagedResource(description = "Admin Tasks")public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,BeanNameAware, InitializingBean {@Overridepublic void afterPropertiesSet() {synchronized (this.lifecycleMonitor) {// autoStartup == false 时直接返回if (this.running || !this.autoStartup) {return;}}...initialize();...}...//Declares all the exchanges, queues and bindings in the enclosing application context,//if any. It should be safe (but unnecessary) to call this method more than once.public void initialize() {...}}
:::info
autoStartup 必须要设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类;
- RabbitAdmin 底层实现就是从 Spring 容器中获取 Exchange、Binding、RoutingKey 以及 Queue 的 @Bean 声明,然后使用 RabbitTemplate 的 execute 方法执行对应的声明、修改、删除等一系列 RabbitMQ 基础功能操作;
 - RabbitAdmin 经常用来,添加一个交换机、删除一个绑定、清空一个队列里的消息等等; :::
 
1.1.2 代码演示:
虽然新建的是一个 SpringBoot 项目,但是在整合时主要使用 RabbitMQ 和 Spring。
POM 添加依赖
amqp-client:RabbitMQ 的核心包spring-boot-starter-amqp:RabbitMQ 与 Spring 整合的包<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
将 RabbitMQ 注入到 Spring 容器
- 创建一个 RabbitMQ 配置类:RabbitMQConfig
 可以从 Spring 容器获得 ConnectionFactory、RabbitAdmin
@Configuration@ComponentScan({"com.example.rabbitmqspring.*"})public class RabbitMQConfig {@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.12.131:5672");connectionFactory.setUsername("shawn");connectionFactory.setPassword("123");connectionFactory.setVirtualHost("/");return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}}
如何使用 RabbitMQAdmin
我们可以在 JunitTest 中使用 RabbitAdmin 来测试使用效果 ```java @SpringBootTest class ApplicationTests {
@Test void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin;
@Test public void testAdmin() { rabbitAdmin.declareExchange(new DirectExchange(“test.direct”, false, false));
rabbitAdmin.declareExchange(new TopicExchange(“test.topic”, false, false));
rabbitAdmin.declareExchange(new FanoutExchange(“test.fanout”, false, false));
rabbitAdmin.declareQueue(new Queue(“test.direct.queue”, false));
rabbitAdmin.declareQueue(new Queue(“test.topic.queue”, false));
rabbitAdmin.declareQueue(new Queue(“test.fanout.queue”, false));
rabbitAdmin.declareBinding(new Binding(“test.direct.queue”,
Binding.DestinationType.QUEUE,"test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.topic.queue", false)) //直接创建队列.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false)).to(new FanoutExchange("test.fanout", false, false)));//清空队列数据rabbitAdmin.purgeQueue("test.topic.queue", false);}
}
:::info- 可以分别声明 Exchange、Queue、Binding- `rabbitAdmin.declareExchange(new FanoutExchange(...));`- `rabbitAdmin.declareQueue(new Queue(...));`- `rabbitAdmin.declareBinding(new Binding(...)):`- 也可以在声明 Binding 时直接创建 Exchange、Queue、RouteKey- rabbitAdmin.declareBinding(BindingBuilder<br /> .bind(new Queue("test.topic.queue", false)) //直接创建队列<br /> .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系<br /> .with("user.#"));:::<a name="pDyQs"></a>## 1.2 SpringAMQP 声明- 在以前我们使用 RabbitMQ,需要在 RabbitMQ 核心 API 里面声明一个 Exchange、一个 Queue、一个 Binding```javachannel.exchangeDeclare("dlx.exchange", "topic", true, false, null);channel.queueDeclare("dlx.queue", true, false, false, null);channel.queueBind("dlx.queue", "dlx.exchange", "#");
在我们整合了 SpringAMQP 之后,可以使用 SpringAMQP 去声明,即声明 @Bean 方式
在配置类 RabbitMQConfig 中使用 @Bean 声明交换机、队列、绑定
/*** 针对消费者配置* 1. 设置交换机类型* 2. 将队列绑定到交换机FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念HeadersExchange :通过添加属性key-value匹配DirectExchange:按照routingkey分发到指定队列TopicExchange:多关键字匹配*/@Beanpublic TopicExchange exchange01() {return new TopicExchange("topic_exchange01", true, false);}@Beanpublic Queue queue01() {return new Queue("queue01", true);}@Beanpublic Binding binding01() {return BindingBuilder.bind(queue01()).to(exchange01()).with("topic.*");}
1.3 SpringAMQP 消息模板组件:RabbitTemplate
1.3.1 描述
- RabbitTemplate 是我们在与 SpringAQMP 整合的时候进行发送消息的关键类;
 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口
ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到 Spring 容器中,然后直接使用。在与 Spring 整合时需要实例化,但是在与 SpringBoot 整合时,在配置文件里添加配置即可。
1.3.2 代码演示
在配置类 RabbitMQConfig 中声明 RabbitTemplate:
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}
在测试类 ApplicationTests 中使用 RabbitTemplate 发送消息: ```java @Autowired private RabbitTemplate rabbitTemplate;
@Testpublic void testSendMessage() throws Exception {//1 创建消息MessageProperties messageProperties = new MessageProperties();messageProperties.getHeaders().put("desc", "信息描述..");messageProperties.getHeaders().put("type", "自定义消息类型..");Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.err.println("------添加额外的设置---------");message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");return message;}});}@Testpublic void testSendMessage2() throws Exception {//1 创建消息MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("text/plain");Message message = new Message("mq 消息1234".getBytes(), messageProperties);rabbitTemplate.send("topic001", "spring.abc", message);rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");}
:::info- `rabbitTemplate.convertAndSend(exchange, routeKey, messageObject, [MessagePostProcessor]) `- messageObject 可以是 Message 类,也可以是简单的 String;- MessagePostProcessor 是可选的,可以重写其中的方法给 Message 添加额外的配置;:::<a name="MffXu"></a>## 1.4 SpringAMQP 消息容器:SimpleMessageListenerContainer<a name="Ox4A9"></a>### 1.4.1 描述- SimpleMessageListenerContainer,简单消息监听容器;- 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足;- 监听队列(多个队列),自动启动、自动声明功能;- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等;- 设置消费者数量、最小最大数量、批量消费;- 设置消息确认和自动确认模式、是否重回队列、异常捕获 handler 函数;- 设置消费者标签生成策略、是否独占模式、消费者属性等;- 设置具体的监听器、消息转换器等等;:::info- SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接受消息的模式等;- 很多基于 RabbitMQ 的自定义化后端滚控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出 SpringAMQP 非常的强大;:::**SimpleMessageListenerContainer 为什么可以动态感知配置变更?**<a name="ncZmz"></a>### 1.4.2 代码演示- 在配置类 **RabbitMQConfig **中声明消息容器 **SimpleMessageListenerContainer**```java@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 监听多个队列container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());// 设置消费者最小最大数量container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(5);// 不开启重回队列container.setDefaultRequeueRejected(false);// 设置消息自动确认模式container.setAcknowledgeMode(AcknowledgeMode.AUTO);// 暴露监听通道container.setExposeListenerChannel(true);// 设置消费者标签生成策略container.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return queue + "_" + UUID.randomUUID().toString();}});// 设置具体的监听器、消息转换器container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());System.err.println("----------消费者: " + msg);}});return container;}
1.5 SpringAMQP 消息适配器:MessageListenerAdapter
1.5.1 描述
MessageListenerAdapter,消息监听适配器- MessageListenerAdapter 类中的变量 
defaultListenerMethod表示监听方法名称,默认方法名称是"handleMessage"。 - 可用 
adapter.setDefaultListenerMethod("consumeMessage");设置自定义的监听方法名称。 MessageDelegate委托对象:实际真实的委托对象,自定义的类用于处理消息。queueOrTagToMethodName队列标识与方法名称组成的集合,可以将队列和方法名绑定,指定队列中的消息会被所绑定的方法处理
1.5.2 代码演示
在配置类 RabbitMQConfig 中的消息容器 Bean 中通过消息适配器 adapter 使用自定义的消息监听器,不再是使用上文中
new ChannelAwareMessageListener()并重写其 onMessage() 方法的做法。MessageListenerAdapter 适配器有两种使用方式:
方式一:
**adapter.setDefaultListenerMethod("xxxxx");**- “xxxxx” 方法名称要与 MessageDelegate 类中自定义的方法对应;
 MessageDelegate 类中自定义方法的传入参数默认是 byte[] 字节数组类型,如果想改变方法的入参,可以设置一个自定义的消息转换器,通过自定义一个
TextMessageConverter实现 MessageConverter 接口;@Configuration@ComponentScan({"com.example.rabbitmqspring"})public class RabbitMQConfig {...@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {...// 适配器使用方式1: 默认是有自己的方法名字的:handleMessage// 可以自己指定一个方法的名字: consumeMessage// 也可以添加一个转换器: 从字节数组转换为StringMessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("consumeMessage");adapter.setMessageConverter(new TextMessageConverter());container.setMessageListener(adapter);return container;}}
```java public class MessageDelegate { // 使用默认方法名称 public void handleMessage(byte[] messageBody) { System.err.println(“默认方法, 消息内容:” + new String(messageBody)); }
// 使用自定义的方法名称 public void consumeMessage(byte[] messageBody) { System.err.println(“字节数组方法, 消息内容:” + new String(messageBody)); }
// 改变自定义方法的入参类型
public void consumeMessage(String messageBody) { System.err.println(“字符串方法, 消息内容:” + messageBody); } }java public class TextMessageConverter implements MessageConverter {// Convert a Java object to a Message. @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { System.out.println(“TextMessageConverter.toMessage…”); return new Message(object.toString().getBytes(), messageProperties); }
// Convert from a Message to a Java object. // 对接收到的message进行过滤:ContentType: text/plain的message Body类型改变为String类型 @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println(“TextMessageConverter.fromMessage…”); String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains(“text”)) {
return new String(message.getBody());
} return message.getBody(); }
}
```java// 测试类@SpringBootTestclass ApplicationTests {...@Testpublic void testSendMessageForText() throws Exception {//1 创建消息MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("text/plain");Message message = new Message("mq 消息1234".getBytes(), messageProperties);rabbitTemplate.send("topic001", "spring.abc", message);}}// 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息// 运行结果:字符串方法, 消息内容:mq 消息1234TextMessageConverter.fromMessage...
方式二:
**adapter.setQueueOrTagToMethodName(Map<String, String> xxxx);**- 我们的 Queue 名称 和 Method 名称,也可以进行一一的匹配
 - map.put(“queue01”, “method01”); 可以指定队列中的消息会被所绑定的方法处理
 “method01” 方法名称要与 MessageDelegate 类中自定义的方法对应;
@Configuration@ComponentScan({"com.example.rabbitmqspring"})public class RabbitMQConfig {...@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {...// 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setMessageConverter(new TextMessageConverter());Map<String, String> queueOrTagToMethodName = new HashMap<>();queueOrTagToMethodName.put("queue001", "method1");queueOrTagToMethodName.put("queue002", "method2");adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);container.setMessageListener(adapter);return container;}}
```java public class MessageDelegate { public void method1(String messageBody) { System.err.println(“method1 收到消息内容:” + new String(messageBody)); }
public void method2(String messageBody) { System.err.println(“method2 收到消息内容:” + new String(messageBody)); } }
java // 测试类 @SpringBootTest class ApplicationTests { … @Test public void testSendMessageForText() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(“text/plain”); Message message = new Message(“mq 消息1234”.getBytes(), messageProperties);rabbitTemplate.send(“topic001”, “spring.abc”, message); rabbitTemplate.send(“topic002”, “rabbit.abc”, message); } }
// 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息 // 运行结果: TextMessageConverter.fromMessage… method1 收到消息内容:mq 消息1234 TextMessageConverter.fromMessage… method2 收到消息内容:mq 消息1234
<a name="kcs1A"></a>## 1.6 SpringAMQP 消息转换器:MessageConverter<a name="SVKw8"></a>### 1.6.1 描述- MessageConverter:消息转换器;- 我们在发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter;- 如何自定义常用转换器:- 需要实现 MessageConverter 这个接口;- 需要重写两个方法:- toMessage:Convert a Java object to a Message;- fromMessage:Convert from a Message to a Java object;- 常用几种转换器:- 上节中使用到的 String 转换器;- Json 转换器:Jackson2JsonMessageConverter 进行 Java 对象的转换功能;- DefaultJackson2JavaTypeMapper 映射器:Java 对象的映射关系;- 自定义二进制转换器:如图片类型、PDF、PPT、流媒体;<a name="FwqjS"></a>### 1.6.2 代码演示- 为了便于演示,创建了两个实体类:**entity.Order**、**entity.Packaged**;- **Json转换器(Map作为入参):Jackson2JsonMessageConverter**- 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 发送;- **RabbitMQConfig **中在适配器 adapter 中添加 Json 转换器;- 自定义监听方法,将 Message 消息体作为 Map 类型的入参;- 最终实现:`**Order Object => Json String => Message => Order 属性/值所对应的 Map**`;```java@Configuration@ComponentScan({"com.example.rabbitmqspring"})public class RabbitMQConfig {...@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {...// 1.1 支持json格式的转换器MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("consumeMessage");Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();adapter.setMessageConverter(jackson2JsonMessageConverter);container.setMessageListener(adapter);return container;}}
public class MessageDelegate {public void consumeMessage(Map messageBody) {System.err.println("map方法, 消息内容:" + messageBody);System.err.println("map方法, 消息content:"+ messageBody.get("content"));}}
// 测试类@SpringBootTestclass ApplicationTests {...@Testpublic void testSendJsonMessage() throws Exception {Order order = new Order();order.setId("001");order.setName("消息订单");order.setContent("描述信息");ObjectMapper mapper = new ObjectMapper();String json = mapper.writeValueAsString(order);System.err.println("order 4 json: " + json);MessageProperties messageProperties = new MessageProperties();//这里注意一定要修改contentType为 application/jsonmessageProperties.setContentType("application/json");Message message = new Message(json.getBytes(), messageProperties);rabbitTemplate.send("topic001", "spring.order", message);}}// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息// 运行结果:order 4 json: {"id":"001","name":"消息订单","content":"描述信息"}map方法, 消息内容:{id=001, name=消息订单, content=描述信息}map方法, 消息content:描述信息
支持 Java 对象的转换(Java Object 作为入参):DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
- 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 配置 Header 并发送;
 - RabbitMQConfig 中创建 javaTypeMapper,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;
 - 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;
 最终实现:
**Order Object => Json String => Message => Order Object**;@Configuration@ComponentScan({"com.example.rabbitmqspring"})public class RabbitMQConfig {...@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {...// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("consumeMessage");Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();//否则会抛出异常The class '...' is not in the trusted packages: [java.util, java.lang].// If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).//默认只支持java.util和java.lang包下的类javaTypeMapper.setTrustedPackages("*");jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);adapter.setMessageConverter(jackson2JsonMessageConverter);container.setMessageListener(adapter);return container;}}
public class MessageDelegate {public void consumeMessage(Order order) {System.err.println("order对象, 消息内容, id: " + order.getId() +", name: " + order.getName() +", content: "+ order.getContent());}}
```java // 测试类 @SpringBootTest class ApplicationTests { … @Test public void testSendJsonMessage() throws Exception {
Order order = new Order(); order.setId(“001”); order.setName(“消息订单”); order.setContent(“描述信息”); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order);
System.err.println(“order 4 json: “ + json);
MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType(“application/json”); Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.order”, message); } }
// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息 // 运行结果: order 4 json: {“id”:”001”,”name”:”消息订单”,”content”:”描述信息”} order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
- **支持 Java 对象多映射转换:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter**- 测试类中将 Java 对象 Order/Packaged 转换成 Json String,然后封装成 Message 配置 Header 并发送;- **RabbitMQConfig **中创建 javaTypeMapper 并配置标签与对应的类路径,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;- 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;- 最终实现:`**Order/Packaged Object => Json String => Message => Order/Packaged Object**`;```java@Configuration@ComponentScan({"com.example.rabbitmqspring"})public class RabbitMQConfig {...@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {...//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("consumeMessage");Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();idClassMapping.put("order", Order.class); //转换时根据Message Header中的标签进行对应转换idClassMapping.put("packaged", Packaged.class);javaTypeMapper.setIdClassMapping(idClassMapping);jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);adapter.setMessageConverter(jackson2JsonMessageConverter);container.setMessageListener(adapter);return container;}}
public class MessageDelegate {public void consumeMessage(Order order) {System.err.println("order对象, 消息内容, id: " + order.getId() +", name: " + order.getName() +", content: "+ order.getContent());}public void consumeMessage(Packaged pack) {System.err.println("package对象, 消息内容, id: " + pack.getId() +", name: " + pack.getName() +", content: "+ pack.getDescription());}}
// 测试类@SpringBootTestclass ApplicationTests {...@Testpublic void testSendMappingMessage() throws Exception {ObjectMapper mapper = new ObjectMapper();Order order = new Order();order.setId("001");order.setName("订单消息");order.setContent("订单描述信息");String json1 = mapper.writeValueAsString(order);System.err.println("order 4 json: " + json1);MessageProperties messageProperties1 = new MessageProperties();//这里注意一定要修改contentType为 application/jsonmessageProperties1.setContentType("application/json");messageProperties1.getHeaders().put("__TypeId__", "order"); //对应标签,不再是类路径Message message1 = new Message(json1.getBytes(), messageProperties1);rabbitTemplate.send("topic001", "spring.order", message1);Packaged pack = new Packaged();pack.setId("002");pack.setName("包裹消息");pack.setDescription("包裹描述信息");String json2 = mapper.writeValueAsString(pack);System.err.println("pack 4 json: " + json2);MessageProperties messageProperties2 = new MessageProperties();//这里注意一定要修改contentType为 application/jsonmessageProperties2.setContentType("application/json");messageProperties2.getHeaders().put("__TypeId__", "packaged"); //对应标签,不再是类路径Message message2 = new Message(json2.getBytes(), messageProperties2);rabbitTemplate.send("topic001", "spring.pack", message2);}}// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息// 运行结果:order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
全局的转换器:ContentTypeDelegatingMessageConverter
RabbitMQConfig 中创建全局的转换器,可以创建很多小的转换器添加到全局转换器,然后将全局转换器添加到适配器 adapter; ```java @Configuration @ComponentScan({“com.example.rabbitmqspring”}) public class RabbitMQConfig { … @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { … //1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“consumeMessage”);
//全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate(“text”, textConvert); convert.addDelegate(“html/text”, textConvert); convert.addDelegate(“xml/text”, textConvert); convert.addDelegate(“text/plain”, textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate(“json”, jsonConvert); convert.addDelegate(“application/json”, jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate(“image/png”, imageConverter); convert.addDelegate(“image”, imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate(“application/pdf”, pdfConverter);
adapter.setMessageConverter(convert);container.setMessageListener(adapter);return container;}
}
- **自定义二进制转换器,图片转换器:**- **RabbitMQConfig **中将自定义图片转换器添加到全局转换器,并将标签与子转换器相对应,标签值要与 Message 的 ContentType 类型保持一致;- 测试类中,从本地读取照片存储为 byte[] 类型,设置 Message 的 ContentType 和 Header,发送 Message;- 图片转换器需要自定义,与文本转换器一样,实现 MessageConverter 接口,重写两个方法,fromMessage 方法中将接收的 Message 从 byte[] 转换成 File 存在本地作为图片文件;- 接收到的 Message 转换成 FIle 后,委托类 MessageDelegate 自定义消息监听方法会被调用,File 类型作为入参;- 最终实现:图片 File => Byte[] => Message =>Byte[] => File =》图片```javapublic class ImageMessageConverter implements MessageConverter {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {throw new MessageConversionException(" convert error ! ");}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {System.err.println("-----------Image MessageConverter----------");Object _extName = message.getMessageProperties().getHeaders().get("extName");String extName = _extName == null ? "png" : _extName.toString();byte[] body = message.getBody();String fileName = UUID.randomUUID().toString();String path = "d:/010_test/" + fileName + "." + extName;File f = new File(path);try {Files.copy(new ByteArrayInputStream(body), f.toPath());} catch (IOException e) {e.printStackTrace();}return f;}}
public class MessageDelegate {public void consumeMessage(File file) {System.err.println("文件对象 方法, 消息内容:" + file.getName());}}
// 测试类@SpringBootTestclass ApplicationTests {...@Testpublic void testSendExtConverterMessage() throws Exception {byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType("image/png");messageProperties.getHeaders().put("extName", "png");Message message = new Message(body, messageProperties);rabbitTemplate.send("", "image_queue", message);// byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));// MessageProperties messageProperties = new MessageProperties();// messageProperties.setContentType("application/pdf");// Message message = new Message(body, messageProperties);// rabbitTemplate.send("", "pdf_queue", message);}}// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息// 运行结果:order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
自定义二进制转换器,PDF 转换器: ```java public class PDFMessageConverter implements MessageConverter {
@Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------PDF MessageConverter----------");byte[] body = message.getBody();String fileName = UUID.randomUUID().toString();String path = "d:/010_test/" + fileName + ".pdf";File f = new File(path);try {Files.copy(new ByteArrayInputStream(body), f.toPath());} catch (IOException e) {e.printStackTrace();}return f;
}
}
<a name="BUTM3"></a># 2. RabbitMQ 与 SpringBoot2.x 整合<a name="eaSSg"></a>## 2.1 基本配置- Pom 添加依赖```xml<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.2 生产端整合
创建生产端项目 rabbitmq-springboot-producer
2.2.1 配置文件
application.properties 文件
publisher-confirms,需要实现一个监听器用于监听 Broker 端给我们返回的确认消息:RabbitTemplate.ConfirmCallback;publisher-returns,保证消息对 Broker 端是可达的,如果出现路由键不可达的情况,则需要实现监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback;- 注意,在发送消息的时候对 template 进行配置 
**mandatory=true**保证监听有效。默认 false,当 Broker 收到路由不可达的消息,会自动删除该消息,此时上面的监听器就无法生效; - 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等;
spring:rabbitmq:addresses: 192.168.12.132:5672username: shawnpassword: 123virtual-host: /connection-timeout: 15000#开启Publisher Confirm机制#publisher-confirms: true 新版本此配置已过时,使用下面配置替换publisher-confirm-type: correlated#开启Publisher Return机制publisher-returns: true#启用强制消息,设置为false收不到Publisher Return机制返回的消息template:mandatory: true
2.2.2 RabbitMQConfig(可选)
可以参考与 Spring 整合中的配置类,就是以 @Bean 的方式申明交换机、队列与绑定关系。@Configuration@ComponentScan({"com.example.rabbitmq"})public class RabbitMQConfig {...}
2.2.3 发送消息
为了便于测试,将发送消息的逻辑封装成一个 Service 类,这样可以在测试类中调用: 
由于没有在 RabbitMQConfig 配置交换机、队列、绑定,我们可以直接在控制台创建交换机、队列、绑定:
- 交换机:exchange-1、exchange-2
 - 队列:queue-1、queue-2
 - RouteKey:springboot.#
 
- 实现了 ConfirmCallback,来完善 Broker 收到消息反馈给生产端的消息确认机制;
 - 实现了 ReturnCallback,完善了 Broker 收到路由不到队列的消息时的后续处理,Return 机制确保消息可被路由;
 - 如果需要自定义消息体属性 Properties,可以在 Service 类中自己定义一个 send 方法,接收 Object 类型消息内容和 Map 类型的 message properties 作为入参,封装成 springframework.messaging 的 Message(非 SpringAMQP 的 Message) 并进行发送消息;
 如果不需要自定义消息属性,直接使用默认属性就满足,那么可以在自己定义一个 sendOrder 方法,接收 Java 对象 Order,然后使用 rabbitTemplate.convertAndSend() 方法直接发送对象作为消息;
注意,convertAndSend() 只能直接发送 String 对象,想发送自定义的 Order 对象需要对象实现序列化接口; ```java @Component public class RabbitSender {
//自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认 final ConfirmCallback confirmCallback = new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: "+ correlationData);System.err.println("ack: "+ ack);if(!ack){System.err.println("异常处理...");}
} };
//回调函数: return返回 final ReturnCallback returnCallback = new ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {System.err.println("return exchange: " + exchange + ", routingKey: "+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
} };
//发送消息方法调用: 构建Message消息 public void send(Object message, Map
properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一,作为 deliveryTag 唯一标识消息,这里简单的使用一个long数字 CorrelationData correlationData = new CorrelationData(“1234567890”); rabbitTemplate.convertAndSend(“exchange-1”, “springboot.abc”, msg, correlationData); } //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData(“0987654321”); rabbitTemplate.convertAndSend(“exchange-2”, “springboot.def”, order, correlationData); }
}
```java//测试类@SpringBootTestclass ApplicationTests {@Testvoid contextLoads() {}@Autowiredprivate RabbitSender rabbitSender;private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");@Testpublic void testSender1() throws Exception {Map<String, Object> properties = new HashMap<>();properties.put("number", "12345");properties.put("send_time", simpleDateFormat.format(new Date()));rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);}@Testpublic void testSender2() throws Exception {Order order = new Order("001", "第一个订单");rabbitSender.sendOrder(order);}}# 测试结果:############### testSender1() #################### 消息可到达队列correlationData: CorrelationData [id=1234567890]ack: true#将RabbitSender.send()中RouteKey改为spring.abc,消息不可到达队列return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTEcorrelationData: CorrelationData [id=1234567890]ack: true############### testSender2() #################### 消息可到达队列correlationData: CorrelationData [id=0987654321]ack: true
public class Order implements Serializable {private String id;private String name;...}
2.3 消费端整合
创建生产端项目 rabbitmq-springboot-consumer
2.3.1 配置文件
application.yaml 文件
- 首先配置手工确认模式,用于 ACK 的手工确认,这样可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理;
 - 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况;
 - 消费端监听使用 @RabbitListener 注解,@RabbitListener 是一个组合注解,里面可以注解配置;@Queue、@QueueBinding、@Exchange 直接通过这个组合注解一次性解决消费端交换机、队列、绑定、路由并且配置监听功能等;
- 建议:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
spring:rabbitmq:host: 192.168.12.132port: 5672username: shawnpassword: 123virtual-host: /connection-timeout: 15000#上面跟生产端基本保持一致listener:simple:acknowledge-mode: manual # 手动ackconcurrency: 1 # 监听消息的个数max-concurrency: 5# 自定义mq配置 用于声明交换机、队列、绑定路由的参数order:queue:name: queue-2durable: trueexchange:name: exchange-2durable: truetype: topicignoreDeclarationExceptions: truekey: springboot.*
2.3.2 监听消息
 
 - 建议:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
 
在测试前,先把之前在控制台创建的交换机、队列删除,因为这里先启动消费端服务,由消费端创建交换机、队列、绑定;
- RabbitReceiver Service 中 onMessage、onOrderMessage 方法是自己定义的方法,方法名随便取,关键是 @RabbitListener 声明交换机、队列、绑定等配置,@RabbitHandler 声明自定义的方法监听消息并处理消息;
 - onMessage 接收 springframework.messaging 的 Message(非 SpringAMQP 的 Message);
 onOrderMessage 接收 Java 对象 Order,与生产者的 Order 对象一样;
@Componentpublic class RabbitReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1",durable="true"),exchange = @Exchange(value = "exchange-1",durable="true",type= "topic",ignoreDeclarationExceptions = "true"),key = "springboot.*"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端Payload: " + message.getPayload());Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}* @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable="${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable="${spring.rabbitmq.listener.order.exchange.durable}",type= "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,Channel channel,@Headers Map<String, Object> headers) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}}
```
测试结果:
######### testSender1()
消费端Payload: Hello RabbitMQ For Spring Boot!
######### testSender2()
消费端order: 001
<a name="mArPm"></a># 3. RabbitMQ 与 SpringCloudStream 整合SpringCloudStream 不能保证消息的100%可靠性,用于和 Kafka 兼顾,目的是高性能的消息通信。<a name="T9iH2"></a>## 3.1 SpringCloudStream 整体架构<br />使用 SpringCloudStream,消息的生产端和消费端可以是两种不同的消息队列。<br /><br />Spring Cloud Stream 通过定义绑定器 Binder 作为中间层,完美地实现了应用程序与消息中间件之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或是更换其他消息中间件产品时,只需要更换对应的 Binder 绑定器而不需要修改任何的应用逻辑。<br />上图中黄色的为 RabbitMQ 的部分,绿色的部分为 Spring Cloud Stream 在生产者和消费者添加了一层中间件。<a name="g1KAw"></a>## 3.2 Barista 接口Barista 接口:定义作为后面类的参数,定义通道类型(决定通道是用于发送还是接收消息)和通道名称(作为配置用)。- @EnableBinding:value 参数指定用于定义绑定消息通道的接口,在应用启动时实现对定义消息通道的绑定;- @Output:输出注解,用于定义发送消息接口;- @Input:输入注解,用于定义消息的消费者接口;- @StreamListener:用于定义监听方法的注解;<a name="bfz9y"></a>## 3.3 整合<a name="b7tJH"></a>### 3.3.1 添加依赖```xml<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
3.3.2 生产端
Barista 接口:定义输出通道,添加绑定关系
public interface Barista {String OUTPUT_CHANNEL = "output_channel";// 注解@Output声明了它是一个输出类型的通道,名字是output_channel。// 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道// 类型是output,发布的主题名为mydest。@Output(Barista.OUTPUT_CHANNEL)MessageChannel logoutput();}
配置:
spring:cloud:stream:bindings:output_channel:destination: exchange-3group: queue-3binder: rabbit_clusterbinders:rabbit_cluster:type: rabbitenviroment:spring:rabbitmq:address: 192.168.58.129:5672username: orcaspassword: 1224virtual-host: /
@EnableBinding(Barista.class)@Servicepublic class RabbitmqSender {@Autowiredprivate Barista barista;// 发送消息public String sendMessage(Object message, Map<String, Object> properties) throws Exception {try{MessageHeaders mhs = new MessageHeaders(properties);Message msg = MessageBuilder.createMessage(message, mhs);boolean sendStatus = barista.logoutput().send(msg);System.err.println("--------------sending -------------------");System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);}catch (Exception e){System.err.println("-------------error-------------");e.printStackTrace();throw new RuntimeException(e.getMessage());}return null;}}
3.3.3 消费端
Barista
public interface Barista {String INPUT_CHANNEL = "input_channel";// 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。// 这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,// 它的类型是input,订阅的主题是position2处声明的mydest这个主题@Input(Barista.INPUT_CHANNEL)SubscribableChannel loginput();}
配置:
spring:cloud:stream:bindings:input_channel:destination: exchange-3group: queue-3binder: rabbit_clusterconsumer:concurrency: 1rabbit:bindings:input_channle:consumer:requeue-rejected: falseacknowledge-mode: MANUALrecovery-interval: 3000durable-subscription: truemax-concurrency: 5binders:rabbit_cluster:type: rabbitenviroment:spring:rabbitmq:address: 192.168.58.129:5672username: orcaspassword: 1224virtual-host: /
接收:
@EnableBinding(Barista.class)@Servicepublic class RabbitmqReceiver {@StreamListener(Barista.INPUT_CHANNEL)public void receiver(Message message) throws Exception {Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);System.out.println("Input Stream 1 接受数据:" + message);System.out.println("消费完毕------------");channel.basicAck(deliveryTag, false);}}
