1. RabbitMQ 整合 Spring AMQP
AMQP 核心组件:
- RabbitAdmin
- SpringAMQP 声明
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
-
1.1 SpringAMQP 用户管理组件:RabbitAdmin
1.1.1 描述
RabbitAdmin 类可以很好的操作 RabbitMQ,在 Spring 中直接进行注入即可:
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
RabbitMQ 类实现了一些接口,其中的
InitializingBean
接口包含afterPropertiesSet()
抽象方法,作用是在所有 Bean 加载完成后初始化设置;public interface InitializingBean {
void afterPropertiesSet() throws Exception;
}
@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
BeanNameAware, InitializingBean {
@Override
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
// autoStartup == false 时直接返回
if (this.running || !this.autoStartup) {
return;
}
}
...
initialize();
...
}
...
//Declares all the exchanges, queues and bindings in the enclosing application context,
//if any. It should be safe (but unnecessary) to call this method more than once.
public void initialize() {
...
}
}
:::info
autoStartup 必须要设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类;
- RabbitAdmin 底层实现就是从 Spring 容器中获取 Exchange、Binding、RoutingKey 以及 Queue 的 @Bean 声明,然后使用 RabbitTemplate 的 execute 方法执行对应的声明、修改、删除等一系列 RabbitMQ 基础功能操作;
- RabbitAdmin 经常用来,添加一个交换机、删除一个绑定、清空一个队列里的消息等等; :::
1.1.2 代码演示:
虽然新建的是一个 SpringBoot 项目,但是在整合时主要使用 RabbitMQ 和 Spring。
POM 添加依赖
amqp-client
:RabbitMQ 的核心包spring-boot-starter-amqp
:RabbitMQ 与 Spring 整合的包<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
将 RabbitMQ 注入到 Spring 容器
- 创建一个 RabbitMQ 配置类:RabbitMQConfig
可以从 Spring 容器获得 ConnectionFactory、RabbitAdmin
@Configuration
@ComponentScan({"com.example.rabbitmqspring.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.12.131:5672");
connectionFactory.setUsername("shawn");
connectionFactory.setPassword("123");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
如何使用 RabbitMQAdmin
我们可以在 JunitTest 中使用 RabbitAdmin 来测试使用效果 ```java @SpringBootTest class ApplicationTests {
@Test void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin;
@Test public void testAdmin() { rabbitAdmin.declareExchange(new DirectExchange(“test.direct”, false, false));
rabbitAdmin.declareExchange(new TopicExchange(“test.topic”, false, false));
rabbitAdmin.declareExchange(new FanoutExchange(“test.fanout”, false, false));
rabbitAdmin.declareQueue(new Queue(“test.direct.queue”, false));
rabbitAdmin.declareQueue(new Queue(“test.topic.queue”, false));
rabbitAdmin.declareQueue(new Queue(“test.fanout.queue”, false));
rabbitAdmin.declareBinding(new Binding(“test.direct.queue”,
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
:::info
- 可以分别声明 Exchange、Queue、Binding
- `rabbitAdmin.declareExchange(new FanoutExchange(...));`
- `rabbitAdmin.declareQueue(new Queue(...));`
- `rabbitAdmin.declareBinding(new Binding(...)):`
- 也可以在声明 Binding 时直接创建 Exchange、Queue、RouteKey
- rabbitAdmin.declareBinding(
BindingBuilder<br /> .bind(new Queue("test.topic.queue", false)) //直接创建队列<br /> .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系<br /> .with("user.#"));
:::
<a name="pDyQs"></a>
## 1.2 SpringAMQP 声明
- 在以前我们使用 RabbitMQ,需要在 RabbitMQ 核心 API 里面声明一个 Exchange、一个 Queue、一个 Binding
```java
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
在我们整合了 SpringAMQP 之后,可以使用 SpringAMQP 去声明,即声明 @Bean 方式
在配置类 RabbitMQConfig 中使用 @Bean 声明交换机、队列、绑定
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange exchange01() {
return new TopicExchange("topic_exchange01", true, false);
}
@Bean
public Queue queue01() {
return new Queue("queue01", true);
}
@Bean
public Binding binding01() {
return BindingBuilder.bind(queue01()).to(exchange01()).with("topic.*");
}
1.3 SpringAMQP 消息模板组件:RabbitTemplate
1.3.1 描述
- RabbitTemplate 是我们在与 SpringAQMP 整合的时候进行发送消息的关键类;
该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口
ConfirmCallback
、返回值确认接口ReturnCallback
等等。同样我们需要进行注入到 Spring 容器中,然后直接使用。在与 Spring 整合时需要实例化,但是在与 SpringBoot 整合时,在配置文件里添加配置即可。
1.3.2 代码演示
在配置类 RabbitMQConfig 中声明 RabbitTemplate:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
在测试类 ApplicationTests 中使用 RabbitTemplate 发送消息: ```java @Autowired private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
:::info
- `rabbitTemplate.convertAndSend(exchange, routeKey, messageObject, [MessagePostProcessor]) `
- messageObject 可以是 Message 类,也可以是简单的 String;
- MessagePostProcessor 是可选的,可以重写其中的方法给 Message 添加额外的配置;
:::
<a name="MffXu"></a>
## 1.4 SpringAMQP 消息容器:SimpleMessageListenerContainer
<a name="Ox4A9"></a>
### 1.4.1 描述
- SimpleMessageListenerContainer,简单消息监听容器;
- 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足;
- 监听队列(多个队列),自动启动、自动声明功能;
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等;
- 设置消费者数量、最小最大数量、批量消费;
- 设置消息确认和自动确认模式、是否重回队列、异常捕获 handler 函数;
- 设置消费者标签生成策略、是否独占模式、消费者属性等;
- 设置具体的监听器、消息转换器等等;
:::info
- SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接受消息的模式等;
- 很多基于 RabbitMQ 的自定义化后端滚控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出 SpringAMQP 非常的强大;
:::
**SimpleMessageListenerContainer 为什么可以动态感知配置变更?**
<a name="ncZmz"></a>
### 1.4.2 代码演示
- 在配置类 **RabbitMQConfig **中声明消息容器 **SimpleMessageListenerContainer**
```java
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 监听多个队列
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
// 设置消费者最小最大数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
// 不开启重回队列
container.setDefaultRequeueRejected(false);
// 设置消息自动确认模式
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 暴露监听通道
container.setExposeListenerChannel(true);
// 设置消费者标签生成策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 设置具体的监听器、消息转换器
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
return container;
}
1.5 SpringAMQP 消息适配器:MessageListenerAdapter
1.5.1 描述
MessageListenerAdapter
,消息监听适配器- MessageListenerAdapter 类中的变量
defaultListenerMethod
表示监听方法名称,默认方法名称是"handleMessage"
。 - 可用
adapter.setDefaultListenerMethod("consumeMessage");
设置自定义的监听方法名称。 MessageDelegate
委托对象:实际真实的委托对象,自定义的类用于处理消息。queueOrTagToMethodName
队列标识与方法名称组成的集合,可以将队列和方法名绑定,指定队列中的消息会被所绑定的方法处理
1.5.2 代码演示
在配置类 RabbitMQConfig 中的消息容器 Bean 中通过消息适配器 adapter 使用自定义的消息监听器,不再是使用上文中
new ChannelAwareMessageListener()
并重写其 onMessage() 方法的做法。MessageListenerAdapter 适配器有两种使用方式:
方式一:
**adapter.setDefaultListenerMethod("xxxxx");**
- “xxxxx” 方法名称要与 MessageDelegate 类中自定义的方法对应;
MessageDelegate 类中自定义方法的传入参数默认是 byte[] 字节数组类型,如果想改变方法的入参,可以设置一个自定义的消息转换器,通过自定义一个
TextMessageConverter
实现 MessageConverter 接口;@Configuration
@ComponentScan({"com.example.rabbitmqspring"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
...
// 适配器使用方式1: 默认是有自己的方法名字的:handleMessage
// 可以自己指定一个方法的名字: consumeMessage
// 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
}
```java public class MessageDelegate { // 使用默认方法名称 public void handleMessage(byte[] messageBody) { System.err.println(“默认方法, 消息内容:” + new String(messageBody)); }
// 使用自定义的方法名称 public void consumeMessage(byte[] messageBody) { System.err.println(“字节数组方法, 消息内容:” + new String(messageBody)); }
// 改变自定义方法的入参类型
public void consumeMessage(String messageBody) { System.err.println(“字符串方法, 消息内容:” + messageBody); } }// Convert a Java object to a Message. @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { System.out.println(“TextMessageConverter.toMessage…”); return new Message(object.toString().getBytes(), messageProperties); }
// Convert from a Message to a Java object. // 对接收到的message进行过滤:ContentType: text/plain的message Body类型改变为String类型 @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println(“TextMessageConverter.fromMessage…”); String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains(“text”)) {
return new String(message.getBody());
} return message.getBody(); }
}
```java
// 测试类
@SpringBootTest
class ApplicationTests {
...
@Test
public void testSendMessageForText() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
}
}
// 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息
// 运行结果:
字符串方法, 消息内容:mq 消息1234
TextMessageConverter.fromMessage...
方式二:
**adapter.setQueueOrTagToMethodName(Map<String, String> xxxx);**
- 我们的 Queue 名称 和 Method 名称,也可以进行一一的匹配
- map.put(“queue01”, “method01”); 可以指定队列中的消息会被所绑定的方法处理
“method01” 方法名称要与 MessageDelegate 类中自定义的方法对应;
@Configuration
@ComponentScan({"com.example.rabbitmqspring"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
...
// 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
}
```java public class MessageDelegate { public void method1(String messageBody) { System.err.println(“method1 收到消息内容:” + new String(messageBody)); }
public void method2(String messageBody) { System.err.println(“method2 收到消息内容:” + new String(messageBody)); } }
rabbitTemplate.send(“topic001”, “spring.abc”, message); rabbitTemplate.send(“topic002”, “rabbit.abc”, message); } }
// 在测试方法中设置 Message 的ContentType:text/plain,并使用 rabbitTemplate 发送消息 // 运行结果: TextMessageConverter.fromMessage… method1 收到消息内容:mq 消息1234 TextMessageConverter.fromMessage… method2 收到消息内容:mq 消息1234
<a name="kcs1A"></a>
## 1.6 SpringAMQP 消息转换器:MessageConverter
<a name="SVKw8"></a>
### 1.6.1 描述
- MessageConverter:消息转换器;
- 我们在发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter;
- 如何自定义常用转换器:
- 需要实现 MessageConverter 这个接口;
- 需要重写两个方法:
- toMessage:Convert a Java object to a Message;
- fromMessage:Convert from a Message to a Java object;
- 常用几种转换器:
- 上节中使用到的 String 转换器;
- Json 转换器:Jackson2JsonMessageConverter 进行 Java 对象的转换功能;
- DefaultJackson2JavaTypeMapper 映射器:Java 对象的映射关系;
- 自定义二进制转换器:如图片类型、PDF、PPT、流媒体;
<a name="FwqjS"></a>
### 1.6.2 代码演示
- 为了便于演示,创建了两个实体类:**entity.Order**、**entity.Packaged**;
- **Json转换器(Map作为入参):Jackson2JsonMessageConverter**
- 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 发送;
- **RabbitMQConfig **中在适配器 adapter 中添加 Json 转换器;
- 自定义监听方法,将 Message 消息体作为 Map 类型的入参;
- 最终实现:`**Order Object => Json String => Message => Order 属性/值所对应的 Map**`;
```java
@Configuration
@ComponentScan({"com.example.rabbitmqspring"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
...
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
}
public class MessageDelegate {
public void consumeMessage(Map messageBody) {
System.err.println("map方法, 消息内容:" + messageBody);
System.err.println("map方法, 消息content:"+ messageBody.get("content"));
}
}
// 测试类
@SpringBootTest
class ApplicationTests {
...
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
}
// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
// 运行结果:
order 4 json: {"id":"001","name":"消息订单","content":"描述信息"}
map方法, 消息内容:{id=001, name=消息订单, content=描述信息}
map方法, 消息content:描述信息
支持 Java 对象的转换(Java Object 作为入参):DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
- 测试类中将 Java 对象 Order 转换成 Json String,然后封装成 Message 配置 Header 并发送;
- RabbitMQConfig 中创建 javaTypeMapper,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;
- 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;
最终实现:
**Order Object => Json String => Message => Order Object**
;@Configuration
@ComponentScan({"com.example.rabbitmqspring"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
...
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//否则会抛出异常The class '...' is not in the trusted packages: [java.util, java.lang].
// If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
//默认只支持java.util和java.lang包下的类
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
}
public class MessageDelegate {
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
}
```java // 测试类 @SpringBootTest class ApplicationTests { … @Test public void testSendJsonMessage() throws Exception {
Order order = new Order(); order.setId(“001”); order.setName(“消息订单”); order.setContent(“描述信息”); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order);
System.err.println(“order 4 json: “ + json);
MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType(“application/json”); Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send(“topic001”, “spring.order”, message); } }
// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息 // 运行结果: order 4 json: {“id”:”001”,”name”:”消息订单”,”content”:”描述信息”} order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
- **支持 Java 对象多映射转换:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter**
- 测试类中将 Java 对象 Order/Packaged 转换成 Json String,然后封装成 Message 配置 Header 并发送;
- **RabbitMQConfig **中创建 javaTypeMapper 并配置标签与对应的类路径,将 javaTypeMapper 配置到 Json 转换器,将 Json 转换器添加到适配器;
- 自定义监听方法,将 Message 消息体作为 Java Object 类型的入参;
- 最终实现:`**Order/Packaged Object => Json String => Message => Order/Packaged Object**`;
```java
@Configuration
@ComponentScan({"com.example.rabbitmqspring"})
public class RabbitMQConfig {
...
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
...
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", Order.class); //转换时根据Message Header中的标签进行对应转换
idClassMapping.put("packaged", Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
}
public class MessageDelegate {
public void consumeMessage(Order order) {
System.err.println("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
public void consumeMessage(Packaged pack) {
System.err.println("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
}
// 测试类
@SpringBootTest
class ApplicationTests {
...
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息");
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order"); //对应标签,不再是类路径
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("002");
pack.setName("包裹消息");
pack.setDescription("包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged"); //对应标签,不再是类路径
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
}
// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
// 运行结果:
order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}
pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}
order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
全局的转换器:ContentTypeDelegatingMessageConverter
RabbitMQConfig 中创建全局的转换器,可以创建很多小的转换器添加到全局转换器,然后将全局转换器添加到适配器 adapter; ```java @Configuration @ComponentScan({“com.example.rabbitmqspring”}) public class RabbitMQConfig { … @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { … //1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“consumeMessage”);
//全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate(“text”, textConvert); convert.addDelegate(“html/text”, textConvert); convert.addDelegate(“xml/text”, textConvert); convert.addDelegate(“text/plain”, textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate(“json”, jsonConvert); convert.addDelegate(“application/json”, jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate(“image/png”, imageConverter); convert.addDelegate(“image”, imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate(“application/pdf”, pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
}
- **自定义二进制转换器,图片转换器:**
- **RabbitMQConfig **中将自定义图片转换器添加到全局转换器,并将标签与子转换器相对应,标签值要与 Message 的 ContentType 类型保持一致;
- 测试类中,从本地读取照片存储为 byte[] 类型,设置 Message 的 ContentType 和 Header,发送 Message;
- 图片转换器需要自定义,与文本转换器一样,实现 MessageConverter 接口,重写两个方法,fromMessage 方法中将接收的 Message 从 byte[] 转换成 File 存在本地作为图片文件;
- 接收到的 Message 转换成 FIle 后,委托类 MessageDelegate 自定义消息监听方法会被调用,File 类型作为入参;
- 最终实现:图片 File => Byte[] => Message =>Byte[] => File =》图片
```java
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/010_test/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
public class MessageDelegate {
public void consumeMessage(File file) {
System.err.println("文件对象 方法, 消息内容:" + file.getName());
}
}
// 测试类
@SpringBootTest
class ApplicationTests {
...
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
// byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("application/pdf");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "pdf_queue", message);
}
}
// 在测试方法中设置 Message 的ContentType:application/json,并使用 rabbitTemplate 发送消息
// 运行结果:
order 4 json: {"id":"001","name":"订单消息","content":"订单描述信息"}
pack 4 json: {"id":"002","name":"包裹消息","description":"包裹描述信息"}
order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
package对象, 消息内容, id: 002, name: 包裹消息, content: 包裹描述信息
自定义二进制转换器,PDF 转换器: ```java public class PDFMessageConverter implements MessageConverter {
@Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/010_test/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
<a name="BUTM3"></a>
# 2. RabbitMQ 与 SpringBoot2.x 整合
<a name="eaSSg"></a>
## 2.1 基本配置
- Pom 添加依赖
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 生产端整合
创建生产端项目 rabbitmq-springboot-producer
2.2.1 配置文件
application.properties 文件
publisher-confirms
,需要实现一个监听器用于监听 Broker 端给我们返回的确认消息:RabbitTemplate.ConfirmCallback;publisher-returns
,保证消息对 Broker 端是可达的,如果出现路由键不可达的情况,则需要实现监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback;- 注意,在发送消息的时候对 template 进行配置
**mandatory=true**
保证监听有效。默认 false,当 Broker 收到路由不可达的消息,会自动删除该消息,此时上面的监听器就无法生效; - 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等;
spring:
rabbitmq:
addresses: 192.168.12.132:5672
username: shawn
password: 123
virtual-host: /
connection-timeout: 15000
#开启Publisher Confirm机制
#publisher-confirms: true 新版本此配置已过时,使用下面配置替换
publisher-confirm-type: correlated
#开启Publisher Return机制
publisher-returns: true
#启用强制消息,设置为false收不到Publisher Return机制返回的消息
template:
mandatory: true
2.2.2 RabbitMQConfig(可选)
可以参考与 Spring 整合中的配置类,就是以 @Bean 的方式申明交换机、队列与绑定关系。@Configuration
@ComponentScan({"com.example.rabbitmq"})
public class RabbitMQConfig {
...
}
2.2.3 发送消息
为了便于测试,将发送消息的逻辑封装成一个 Service 类,这样可以在测试类中调用:
由于没有在 RabbitMQConfig 配置交换机、队列、绑定,我们可以直接在控制台创建交换机、队列、绑定:
- 交换机:exchange-1、exchange-2
- 队列:queue-1、queue-2
- RouteKey:springboot.#
- 实现了 ConfirmCallback,来完善 Broker 收到消息反馈给生产端的消息确认机制;
- 实现了 ReturnCallback,完善了 Broker 收到路由不到队列的消息时的后续处理,Return 机制确保消息可被路由;
- 如果需要自定义消息体属性 Properties,可以在 Service 类中自己定义一个 send 方法,接收 Object 类型消息内容和 Map 类型的 message properties 作为入参,封装成 springframework.messaging 的 Message(非 SpringAMQP 的 Message) 并进行发送消息;
如果不需要自定义消息属性,直接使用默认属性就满足,那么可以在自己定义一个 sendOrder 方法,接收 Java 对象 Order,然后使用 rabbitTemplate.convertAndSend() 方法直接发送对象作为消息;
注意,convertAndSend() 只能直接发送 String 对象,想发送自定义的 Order 对象需要对象实现序列化接口; ```java @Component public class RabbitSender {
//自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认 final ConfirmCallback confirmCallback = new ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: "+ correlationData);
System.err.println("ack: "+ ack);
if(!ack){
System.err.println("异常处理...");
}
} };
//回调函数: return返回 final ReturnCallback returnCallback = new ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
} };
//发送消息方法调用: 构建Message消息 public void send(Object message, Map
properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一,作为 deliveryTag 唯一标识消息,这里简单的使用一个long数字 CorrelationData correlationData = new CorrelationData(“1234567890”); rabbitTemplate.convertAndSend(“exchange-1”, “springboot.abc”, msg, correlationData); } //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData(“0987654321”); rabbitTemplate.convertAndSend(“exchange-2”, “springboot.def”, order, correlationData); }
}
```java
//测试类
@SpringBootTest
class ApplicationTests {
@Test
void contextLoads() {
}
@Autowired
private RabbitSender rabbitSender;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
}
@Test
public void testSender2() throws Exception {
Order order = new Order("001", "第一个订单");
rabbitSender.sendOrder(order);
}
}
# 测试结果:
############### testSender1() ###################
# 消息可到达队列
correlationData: CorrelationData [id=1234567890]
ack: true
#将RabbitSender.send()中RouteKey改为spring.abc,消息不可到达队列
return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTE
correlationData: CorrelationData [id=1234567890]
ack: true
############### testSender2() ###################
# 消息可到达队列
correlationData: CorrelationData [id=0987654321]
ack: true
public class Order implements Serializable {
private String id;
private String name;
...
}
2.3 消费端整合
创建生产端项目 rabbitmq-springboot-consumer
2.3.1 配置文件
application.yaml 文件
- 首先配置手工确认模式,用于 ACK 的手工确认,这样可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理;
- 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况;
- 消费端监听使用 @RabbitListener 注解,@RabbitListener 是一个组合注解,里面可以注解配置;@Queue、@QueueBinding、@Exchange 直接通过这个组合注解一次性解决消费端交换机、队列、绑定、路由并且配置监听功能等;
- 建议:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
spring:
rabbitmq:
host: 192.168.12.132
port: 5672
username: shawn
password: 123
virtual-host: /
connection-timeout: 15000
#上面跟生产端基本保持一致
listener:
simple:
acknowledge-mode: manual # 手动ack
concurrency: 1 # 监听消息的个数
max-concurrency: 5
# 自定义mq配置 用于声明交换机、队列、绑定路由的参数
order:
queue:
name: queue-2
durable: true
exchange:
name: exchange-2
durable: true
type: topic
ignoreDeclarationExceptions: true
key: springboot.*
2.3.2 监听消息
- 建议:由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。
在测试前,先把之前在控制台创建的交换机、队列删除,因为这里先启动消费端服务,由消费端创建交换机、队列、绑定;
- RabbitReceiver Service 中 onMessage、onOrderMessage 方法是自己定义的方法,方法名随便取,关键是 @RabbitListener 声明交换机、队列、绑定等配置,@RabbitHandler 声明自定义的方法监听消息并处理消息;
- onMessage 接收 springframework.messaging 的 Message(非 SpringAMQP 的 Message);
onOrderMessage 接收 Java 对象 Order,与生产者的 Order 对象一样;
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
* @param order
* @param channel
* @param headers
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
```
测试结果:
######### testSender1()
消费端Payload: Hello RabbitMQ For Spring Boot!
######### testSender2()
消费端order: 001
<a name="mArPm"></a>
# 3. RabbitMQ 与 SpringCloudStream 整合
SpringCloudStream 不能保证消息的100%可靠性,用于和 Kafka 兼顾,目的是高性能的消息通信。
<a name="T9iH2"></a>
## 3.1 SpringCloudStream 整体架构
![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330479030-23b7325f-b91f-4a1f-9cef-a1836db0b174.png#clientId=uc6cd2f8e-917d-4&from=paste&height=334&id=uad87721d&margin=%5Bobject%20Object%5D&originHeight=668&originWidth=1450&originalType=url&status=done&style=none&taskId=udd750071-b44c-4228-88d9-3f83168086b&width=725)<br />使用 SpringCloudStream,消息的生产端和消费端可以是两种不同的消息队列。<br />![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330552614-3c72918b-27ec-4559-9a2b-07b1fcd61163.png#clientId=uc6cd2f8e-917d-4&from=paste&id=ua9b55521&margin=%5Bobject%20Object%5D&originHeight=389&originWidth=460&originalType=url&status=done&style=none&taskId=ud0e8b0f6-8ee7-42da-bb25-f73eb80fee0)<br />Spring Cloud Stream 通过定义绑定器 Binder 作为中间层,完美地实现了应用程序与消息中间件之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或是更换其他消息中间件产品时,只需要更换对应的 Binder 绑定器而不需要修改任何的应用逻辑。
![](https://cdn.nlark.com/yuque/0/2021/png/1471554/1621330645210-97a264bc-fe3c-407f-b222-c7430c9b5b32.png#clientId=uc6cd2f8e-917d-4&from=paste&height=302&id=u91b599cd&margin=%5Bobject%20Object%5D&originHeight=403&originWidth=744&originalType=url&status=done&style=none&taskId=u09d6e37c-0103-4e7a-99ca-fbbdbe8cd3a&width=558)<br />上图中黄色的为 RabbitMQ 的部分,绿色的部分为 Spring Cloud Stream 在生产者和消费者添加了一层中间件。
<a name="g1KAw"></a>
## 3.2 Barista 接口
Barista 接口:定义作为后面类的参数,定义通道类型(决定通道是用于发送还是接收消息)和通道名称(作为配置用)。
- @EnableBinding:value 参数指定用于定义绑定消息通道的接口,在应用启动时实现对定义消息通道的绑定;
- @Output:输出注解,用于定义发送消息接口;
- @Input:输入注解,用于定义消息的消费者接口;
- @StreamListener:用于定义监听方法的注解;
<a name="bfz9y"></a>
## 3.3 整合
<a name="b7tJH"></a>
### 3.3.1 添加依赖
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3.3.2 生产端
Barista 接口:定义输出通道,添加绑定关系
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
// 注解@Output声明了它是一个输出类型的通道,名字是output_channel。
// 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道
// 类型是output,发布的主题名为mydest。
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logoutput();
}
配置:
spring:
cloud:
stream:
bindings:
output_channel:
destination: exchange-3
group: queue-3
binder: rabbit_cluster
binders:
rabbit_cluster:
type: rabbit
enviroment:
spring:
rabbitmq:
address: 192.168.58.129:5672
username: orcas
password: 1224
virtual-host: /
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 发送消息
public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.logoutput().send(msg);
System.err.println("--------------sending -------------------");
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
System.err.println("-------------error-------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return null;
}
}
3.3.3 消费端
Barista
public interface Barista {
String INPUT_CHANNEL = "input_channel";
// 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
// 这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,
// 它的类型是input,订阅的主题是position2处声明的mydest这个主题
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
配置:
spring:
cloud:
stream:
bindings:
input_channel:
destination: exchange-3
group: queue-3
binder: rabbit_cluster
consumer:
concurrency: 1
rabbit:
bindings:
input_channle:
consumer:
requeue-rejected: false
acknowledge-mode: MANUAL
recovery-interval: 3000
durable-subscription: true
max-concurrency: 5
binders:
rabbit_cluster:
type: rabbit
enviroment:
spring:
rabbitmq:
address: 192.168.58.129:5672
username: orcas
password: 1224
virtual-host: /
接收:
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受数据:" + message);
System.out.println("消费完毕------------");
channel.basicAck(deliveryTag, false);
}
}