整合 spring
pom.xml
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
bean 注入(spring 方式)
设置一个 配置类
@Configurationpublic class RabbitmqConfig {}
注入 ConnectionFactory
/*** Rabbitmq connectionFactory** @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("127.0.0.1:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}
注入 rabbitadmin
- 底层实现是从 spring 容器中获取 Exchange 、Binding、 RoutingKey 和 Queue 的 @Bean 声明
setAutoStartup必须为true,否则 Spring 不会加载 RabbitAdmin 类/*** 底层实现是从 spring 容器中获取 Exchange Binding RoutingKey 和 Queue 的 @Bean 声明** @param connectionFactory* @return*/@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
注入 rabbitmqTemplate
/*** rabbitmq 模板** @param connectionFactory* @return*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}
注入 exchange、 queue、 binding ```java // 下面的这些 bean 都会被 rabbitAdmin 获取 @Bean public TopicExchange exchange1() { // String name, boolean durable, boolean autoDelete, Map
arguments return new TopicExchange(“topic01”, true, false); }
@Bean
public Queue queue01() {
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map
@Bean
public Queue queue02() {
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map
@Bean public Binding binding01() { return BindingBuilder .bind(queue01()) // 队列 .to(exchange1()) // 交换机 .with(“routingKey01”); // 指定 routing key }
@Bean public Binding binding02() { return BindingBuilder .bind(queue01()) .to(exchange1()) .with(“routingKey02”); }
<a name="E7bwd"></a>### 简单消息监听容器```java/*** 简单消息监听容器,为多个 queue 设置统一的 consumer 配置等等** @param connectionFactory* @return*/@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container =new SimpleMessageListenerContainer(connectionFactory);container.setQueues(queue01(), queue02());// 设置并行消费者属性container.setConcurrentConsumers(1);// 设置最大并行消费者数量container.setMaxConcurrentConsumers(5);// 设置重回队列container.setDefaultRequeueRejected(false);/*设置响应 ack格式AcknowledgeMode.NONE:自动确认AcknowledgeMode.AUTO:根据情况确认AcknowledgeMode.MANUAL:手动确认*/container.setAcknowledgeMode(AcknowledgeMode.MANUAL);// qos 中的 prefetchCountcontainer.setPrefetchCount(1);// 设置消费者tag名称container.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String s) {return s + "_" + UUID.randomUUID().toString().replace("-", "");}});// 简单的设置 消费逻辑container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());System.err.println("消费者进行了消费" + msg);// 手动 ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}});return container;}
使用 MessageListenerAdapter
@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 略去其他配置// 不使用默认的消费逻辑/*container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {String msg = new String(message.getBody());System.err.println("消费者进行了消费" + msg);// 手动 ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}});*/// 消息监听适配器,可以更加灵活的给消费者添加自定义的逻辑// 设置 Delegate(委托) 对象MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());/*public class MessageListenerAdapter extends AbstractAdaptableMessageListener {private final Map<String, String> queueOrTagToMethodName= new HashMap<String, String>();// 默认是处理传入对象的 defaultListenerMethod, 即 handleMessage//Out-of-the-box value for the default listener method: "handleMessage".public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";// ...}// 则这里会处理 MessageDelegate#handleMessage()// 即要求 MessageDelegate 实现一个 handleMessage() 方法*/// 1. 可以修改 defaultListenerMethod 的值// 比如这里设置后会处理 MessageDelegate#newMethod()// adapter.setDefaultListenerMethod("newMethod");// 2. 指定 MessageConverter, 即 defaultListenerMethod 方法的参数形式// 默认是 byte[] message,这里后续重写后参数为 Stringadapter.setMessageConverter(new TextMessageConverter());// 3. 指定队列和方法匹配器, 让进入 queue 的消息进入对应的 method 进行处理// 注意参数会被上面自己指定的 MessageConverter 影响,默认是 byte[]Map<String, String> queueOrTagToMethodName = new HashMap<>();// queue01 的消息会进入 MessageDelegate#method01(),queueOrTagToMethodName.put("queue01", "method01");// queue02 的消息会进入 MessageDelegate#method02()queueOrTagToMethodName.put("queue02", "method02");adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);container.setMessageListener(adapter);return container;}
MessageListenerAdapter 设置的 Delegate
private static class MessageDelegate {// 默认会进入的方法public void handleMessage(byte[] messageBody) {System.err.println(new String(messageBody));}// 进行 setDefaultListenerMethod 后默认会进入的方法public void newMethod(byte[] messageBody) {System.err.println(new String(messageBody));}// queue 绑定,参数会被 MessageConverter 影响public void method01(String convertMessage){System.err.println("method01: " + convertMessage);}public void method02(String convertMessage){System.err.println("method02: " +convertMessage);}}
MessageListenerAdapter 设置的 MessageConverter
private static class TextMessageConverter implements MessageConverter {/*** java 对象转 Message*/@Overridepublic Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {return new Message(o.toString().getBytes(), messageProperties);}/*** 返回值是 DefaultListenerMethod 对应 方法的参数类型** @param message* @return* @throws MessageConversionException*/@Overridepublic Object fromMessage(Message message) throws MessageConversionException {// 可以返回不同的类型,让该 Converter 复用return new String(message.getBody());}}
其他MessageConverter 消息转换器
- json转换器 Jackson2JsonMessageConverter:可以进行 java 对象的转换功能
- producer 发送的内容是对象的 json 字符串的字节数组
- producer 发送的时候需要设置
properties中的ContentType为application/json```java // 支持 json 格式的转换器
// 注意,由于后面使用了 jackson2JsonMessageConverter // 所以要求 Delegate 类的 defaultListenerMethod() 的形参是 Map 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2Method”);
// 给 adapter 添加 jackson2JsonMessageConverter Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
- Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper 映射器:可以进行 java 对象的映射关系- **producer 发送的内容是对象的 json 字符串的字节数组**- **producer 发送的时候需要设置 `properties` 中的 `ContentType` 为 `application/json`**- **producer 发送的时候需要设置一个 **`**headers**`**,键值为 **`**"__TypedId__": "pojo全类名"**````java//支持 java 对象转换// 注意,由于后面使用了 jackson2JsonMessageConverter 和 javaTypeMapper// 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的 pojo 类型MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());adapter.setDefaultListenerMethod("json2PojoMethod");// 给 jackson2JsonMessageConverter 添加一个 javaTypeMapperJackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);// 给 adapter 添加 jackson2JsonMessageConverteradapter.setMessageConverter(jackson2JsonMessageConverter);container.setMessageListener(adapter);
- Jackson2JsonMessageConverter + DefaultJackson2JavaTypeMapper+ idClassMapping
- producer 发送的内容是对象的 json 字符串的字节数组
- producer 发送的时候需要设置
properties中的ContentType为application/json - producer 发送的时候需要设置一个
**headers**,键值为**"__TypedId__": "配置的全类名"**- 注意这里只要下面
idClassMapping配置好的类名即可 ```java // java 对象多映射转换
- 注意这里只要下面
// 和上面类似, 由于使用了 IdClassMapping // 要求 Delegate 类的 defaultListenerMethod() 的形参是 对应的pojo 类型 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod(“json2MorePojoMethod”);
// 给 jackson2JsonMessageConverter 添加一个 javaTypeMapper Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
// 给 javaTypeMapper 添加 实体类和类名 的映射关系, 可以添加多个
Map
javaTypeMapper.setIdClassMapping(idClassMapping);
adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
- 全局转换器 可以配置多个转换器,根据 `Content-Type` 进行对应转换器投递```java// 全局的转换器MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());// 消息会转发 Delegate 类的 defaultListenerMethod()adapter.setDefaultListenerMethod("moreMethod");ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();// 根据 ContentType 来将消息投放到对应的转换器TextMessageConverter textMessageConverter = new TextMessageConverter();converter.addDelegate("text", textMessageConverter);converter.addDelegate("text/plain", textMessageConverter);Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();converter.addDelegate("json", jackson2JsonMessageConverter);converter.addDelegate("application/json", jackson2JsonMessageConverter);// 还可以设置其他处理图片、pdf等的转换器// 将全局 convert 设置进 adapteradapter.setMessageConverter(converter);container.setMessageListener(adapter);return container;
- 自定义二进制转换器
- 需要继承 MessageConverter,注意实现的 toMessage 会对应 adapter 的 setDefaultListenerMethod 方法的参数值
整合 springboot
application.properties
spring.rabbitmq.addresses=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.http.encoding.charset=UTF-8spring.jackson.date-format=yyy-MM-dd HH:mm:ssspring.jackson.time-zone=UTCspring.jackson.default-property-inclusion=NON_NULL# producer 端配置spring.rabbitmq.publisher-confirm-type=simplespring.rabbitmq.publisher-returns=true# 保证监听有效spring.rabbitmq.template.mandatory=true# consumer 端配置spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10# 签收模式 auto 自动 manual 手动spring.rabbitmq.listener.simple.acknowledge-mode=manual# 单个线程同时消费消息个数 (qos)spring.rabbitmq.listener.simple.prefetch=1
producer
可以给
RabbitTemplate绑定 publisher-confirms 和 publiser-returns- 前提是开启
spring.rabbitmq.template.mandatory=true保证监听有效 - 可以配置其他属性,比如发送重试,超时时间,次数,间隔
CorrelationData用于可靠性投递 ```java @Component public class SpringbootProducer { @Autowired private RabbitTemplate rabbitTemplate; /**- avoid
java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate */ @PostConstruct public void initCallback(){ // 注册 confirm callback // 用于监听 broker 端返回的确认请求 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {}
});
// 注册 return callback // 保证消息对broker端是可达的,如果出现路由键不可达,则需要对不可达的消息进行后续的处理 // 保证消息的路由成功 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Overridepublic void returnedMessage(org.springframework.amqp.core.Message message, int i, String s, String s1, String s2) {}
}); } public void send(Object payload, HashMap
properties, String exchangeName, String routingKey) throws Exception{ MessageHeaders messageHeaders = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(payload, messageHeaders); CorrelationData correlationData = new CorrelationData(“100”); System.out.println(correlationData); rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData); } }
- 前提是开启
<a name="6ZLrz"></a>## consumer- 使用 `@RabbitMQListener` 注解- 可以配置 `@QueueBinding` `@Queue` `@Exchange` 等直接通过该注解直接完成消费端的交换机、队列、绑定路由、配置监听功能等```javaimport com.example.rabbitmqapi.model.Order;import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class SpringbootConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "queue1", durable = "true", autoDelete = "false"),exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),key = {"springboot.normal"},ignoreDeclarationExceptions = "false",declare = "true")})@RabbitHandlerpublic void onNormalMessage(Message message, Channel channel) throws Exception {// 消息体Object payLoadBody = message.getPayload();MessageHeaders messageHeaders = message.getHeaders();long deliveryTag = (long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);// 手动 ack// 需要 spring.rabbitmq.listener.simple.acknowledge-mode=manualchannel.basicAck(deliveryTag, false);}@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "queue2", durable = "true", autoDelete = "false"),exchange = @Exchange(value = "exchange1", durable = "true", autoDelete = "false", type = "topic"),key = {"springboot.pojo"},ignoreDeclarationExceptions = "false",declare = "true")})@RabbitHandlerpublic void onPojoMessage(@Payload Order order, // 注解标明这是 消息体@Headers Map<String, Object> headers, // 注解标明这是 headersChannel channel) throws Exception{System.out.println(order);System.out.println(headers);Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}}
