整合 spring
pom.xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
bean 注入(spring 方式)
设置一个 配置类
@Configuration
public class RabbitmqConfig {
}
注入 ConnectionFactory
/**
* Rabbitmq connectionFactory
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
注入 rabbitadmin
- 底层实现是从 spring 容器中获取 Exchange 、Binding、 RoutingKey 和 Queue 的 @Bean 声明
setAutoStartup
必须为true
,否则 Spring 不会加载 RabbitAdmin 类/**
* 底层实现是从 spring 容器中获取 Exchange Binding RoutingKey 和 Queue 的 @Bean 声明
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
注入 rabbitmqTemplate
/**
* rabbitmq 模板
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
注入 exchange、 queue、 binding ```java // 下面的这些 bean 都会被 rabbitAdmin 获取 @Bean public TopicExchange exchange1() { // String name, boolean durable, boolean autoDelete, Map
arguments return new TopicExchange(“topic01”, true, false); }
@Bean
public Queue queue01() {
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map
@Bean
public Queue queue02() {
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map
@Bean public Binding binding01() { return BindingBuilder .bind(queue01()) // 队列 .to(exchange1()) // 交换机 .with(“routingKey01”); // 指定 routing key }
@Bean public Binding binding02() { return BindingBuilder .bind(queue01()) .to(exchange1()) .with(“routingKey02”); }
<a name="E7bwd"></a>
### 简单消息监听容器
```java
/**
* 简单消息监听容器,为多个 queue 设置统一的 consumer 配置等等
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue01(), queue02());
// 设置并行消费者属性
container.setConcurrentConsumers(1);
// 设置最大并行消费者数量
container.setMaxConcurrentConsumers(5);
// 设置重回队列
container.setDefaultRequeueRejected(false);
/*
设置响应 ack格式
AcknowledgeMode.NONE:自动确认
AcknowledgeMode.AUTO:根据情况确认
AcknowledgeMode.MANUAL:手动确认
*/
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// qos 中的 prefetchCount
container.setPrefetchCount(1);
// 设置消费者tag名称
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String s) {
return s + "_" + UUID.randomUUID().toString().replace("-", "");
}
});
// 简单的设置 消费逻辑
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("消费者进行了消费" + msg);
// 手动 ack
channel.basicAck(
message.getMessageProperties().getDeliveryTag(),
false);
}
});
return container;
}
使用 MessageListenerAdapter
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 略去其他配置
// 不使用默认的消费逻辑
/*
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("消费者进行了消费" + msg);
// 手动 ack
channel.basicAck(
message.getMessageProperties().getDeliveryTag(),
false);
}
});
*/
// 消息监听适配器,可以更加灵活的给消费者添加自定义的逻辑
// 设置 Delegate(委托) 对象
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
/*
public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
private final Map<String, String> queueOrTagToMethodName
= new HashMap<String, String>();
// 默认是处理传入对象的 defaultListenerMethod, 即 handleMessage
//Out-of-the-box value for the default listener method: "handleMessage".
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
// ...
}
// 则这里会处理 MessageDelegate#handleMessage()
// 即要求 MessageDelegate 实现一个 handleMessage() 方法
*/
// 1. 可以修改 defaultListenerMethod 的值
// 比如这里设置后会处理 MessageDelegate#newMethod()
// adapter.setDefaultListenerMethod("newMethod");
// 2. 指定 MessageConverter, 即 defaultListenerMethod 方法的参数形式
// 默认是 byte[] message,这里后续重写后参数为 String
adapter.setMessageConverter(new TextMessageConverter());
// 3. 指定队列和方法匹配器, 让进入 queue 的消息进入对应的 method 进行处理
// 注意参数会被上面自己指定的 MessageConverter 影响,默认是 byte[]
Map<String, String> queueOrTagToMethodName = new HashMap<>();
// queue01 的消息会进入 MessageDelegate#method01(),
queueOrTagToMethodName.put("queue01", "method01");
// queue02 的消息会进入 MessageDelegate#method02()
queueOrTagToMethodName.put("queue02", "method02");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
return container;
}
MessageListenerAdapter 设置的 Delegate
private static class MessageDelegate {
// 默认会进入的方法
public void handleMessage(byte[] messageBody) {
System.err.println(new String(messageBody));
}
// 进行 setDefaultListenerMethod 后默认会进入的方法
public void newMethod(byte[] messageBody) {
System.err.println(new String(messageBody));
}
// queue 绑定,参数会被 MessageConverter 影响
public void method01(String convertMessage){
System.err.println("method01: " + convertMessage);
}
public void method02(String convertMessage){
System.err.println("method02: " +convertMessage);
}
}
MessageListenerAdapter 设置的 MessageConverter
private static class TextMessageConverter implements MessageConverter {
/**
* java 对象转 Message
*/
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
/**
* 返回值是 DefaultListenerMethod 对应 方法的参数类型
*
* @param message
* @return
* @throws MessageConversionException
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// 可以返回不同的类型,让该 Converter 复用
return new String(message.getBody());
}
}
其他MessageConverter 消息转换器
- json转换器 Jackson2JsonMessageConverter:可以进行 java 对象的转换功能
- producer 发送的内容是对象的 json 字符串的字节数组
- producer 发送的时候需要设置
properties
中的ContentType
为application/json
```java // 支持 json 格式的转换器
// 注意,由于后面使用了 jackson2JsonMessageConverter // 所以要求 Delegate 类的 defaultListenerMethod() 的形参是 Map 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2Method”);
// 给 adapter 添加 jackson2JsonMessageConverter Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
- Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper 映射器:可以进行 java 对象的映射关系
- **producer 发送的内容是对象的 json 字符串的字节数组**
- **producer 发送的时候需要设置 `properties` 中的 `ContentType` 为 `application/json`**
- **producer 发送的时候需要设置一个 **`**headers**`**,键值为 **`**"__TypedId__": "pojo全类名"**`
```java
//支持 java 对象转换
// 注意,由于后面使用了 jackson2JsonMessageConverter 和 javaTypeMapper
// 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的 pojo 类型
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("json2PojoMethod");
// 给 jackson2JsonMessageConverter 添加一个 javaTypeMapper
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
// 给 adapter 添加 jackson2JsonMessageConverter
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
- Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper+ idClassMapping
- producer 发送的内容是对象的 json 字符串的字节数组
- producer 发送的时候需要设置
properties
中的ContentType
为application/json
- producer 发送的时候需要设置一个
**headers**
,键值为**"__TypedId__": "配置的全类名"**
- 注意这里只要下面
idClassMapping
配置好的类名即可 ```java // java 对象多映射转换
- 注意这里只要下面
// 和上面类似, 由于使用了 IdClassMapping // 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的pojo 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2MorePojoMethod”);
// 给 jackson2JsonMessageConverter 添加一个 javaTypeMapper Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
// 给 javaTypeMapper 添加 实体类和类名 的映射关系, 可以添加多个
Map
javaTypeMapper.setIdClassMapping(idClassMapping);
adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
- 全局转换器 可以配置多个转换器,根据 `Content-Type` 进行对应转换器投递
```java
// 全局的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// 消息会转发 Delegate 类的 defaultListenerMethod()
adapter.setDefaultListenerMethod("moreMethod");
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
// 根据 ContentType 来将消息投放到对应的转换器
TextMessageConverter textMessageConverter = new TextMessageConverter();
converter.addDelegate("text", textMessageConverter);
converter.addDelegate("text/plain", textMessageConverter);
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
converter.addDelegate("json", jackson2JsonMessageConverter);
converter.addDelegate("application/json", jackson2JsonMessageConverter);
// 还可以设置其他处理图片、pdf等的转换器
// 将全局 convert 设置进 adapter
adapter.setMessageConverter(converter);
container.setMessageListener(adapter);
return container;
- 自定义二进制转换器
- 需要继承 MessageConverter,注意实现的 toMessage 会对应 adapter 的 setDefaultListenerMethod 方法的参数值
整合 springboot
application.properties
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyy-MM-dd HH:mm:ss
spring.jackson.time-zone=UTC
spring.jackson.default-property-inclusion=NON_NULL
# producer 端配置
spring.rabbitmq.publisher-confirm-type=simple
spring.rabbitmq.publisher-returns=true
# 保证监听有效
spring.rabbitmq.template.mandatory=true
# consumer 端配置
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
# 签收模式 auto 自动 manual 手动
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 单个线程同时消费消息个数 (qos)
spring.rabbitmq.listener.simple.prefetch=1
producer
可以给
RabbitTemplate
绑定 publisher-confirms 和 publiser-returns- 前提是开启
spring.rabbitmq.template.mandatory=true
保证监听有效 - 可以配置其他属性,比如发送重试,超时时间,次数,间隔
CorrelationData
用于可靠性投递 ```java @Component public class SpringbootProducer { @Autowired private RabbitTemplate rabbitTemplate; /**- avoid
java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate */ @PostConstruct public void initCallback(){ // 注册 confirm callback // 用于监听 broker 端返回的确认请求 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
});
// 注册 return callback // 保证消息对broker端是可达的,如果出现路由键不可达,则需要对不可达的消息进行后续的处理 // 保证消息的路由成功 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int i, String s, String s1, String s2) {
}
}); } public void send(Object payload, HashMap
properties, String exchangeName, String routingKey) throws Exception{ MessageHeaders messageHeaders = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(payload, messageHeaders); CorrelationData correlationData = new CorrelationData(“100”); System.out.println(correlationData); rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData); } }
- 前提是开启
<a name="6ZLrz"></a>
## consumer
- 使用 `@RabbitMQListener` 注解
- 可以配置 `@QueueBinding` `@Queue` `@Exchange` 等直接通过该注解直接完成消费端的交换机、队列、绑定路由、配置监听功能等
```java
import com.example.rabbitmqapi.model.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class SpringbootConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue1", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),
key = {"springboot.normal"},
ignoreDeclarationExceptions = "false",
declare = "true"
)
})
@RabbitHandler
public void onNormalMessage(Message message, Channel channel) throws Exception {
// 消息体
Object payLoadBody = message.getPayload();
MessageHeaders messageHeaders = message.getHeaders();
long deliveryTag = (long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// 手动 ack
// 需要 spring.rabbitmq.listener.simple.acknowledge-mode=manual
channel.basicAck(deliveryTag, false);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue2", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),
key = {"springboot.pojo"},
ignoreDeclarationExceptions = "false",
declare = "true"
)
})
@RabbitHandler
public void onPojoMessage(@Payload Order order, // 注解标明这是 消息体
@Headers Map<String, Object> headers, // 注解标明这是 headers
Channel channel) throws Exception{
System.out.println(order);
System.out.println(headers);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}