1、消息发送方(发布者)
1)添加maven依赖
<!-- springboot rabbitmq 使用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)配置RabbitMQ配置(application.properties)
#RabbitMQ 服务配置,不写默认走本地ip
spring.rabbitmq.host=192.168.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3)创建发送方法
package com.example.provide.rabbitmq;
import com.alibaba.fastjson.JSON;
import com.example.provide.dto.UserDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import static org.springframework.integration.jmx.JmxHeaders.OPERATION_NAME;
/*
* @auth yuesf
* @data 2019/11/4
*/
@Component
public class Sender {
private static final Logger logger = LoggerFactory.getLogger(Sender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void storeInfoWindQSend(User user) {
String message = JSON.toJSONString(user);
logger.info("RabbitMQ: 发送消息={}", message);
//指定交换机和路由的routingkey
rabbitTemplate.convertAndSend("demo.direct.exchange", "demo.direct", message);
logger.info("发送消息完成 message={}", message);
}
}
2、消息接收方(订阅者)
1)添加maven依赖
<!-- springboot rabbitmq 使用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)配置RabbitMQ配置(application.properties)
#RabbitMQ 服务配置,不写默认走本地ip
spring.rabbitmq.host=192.168.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3)声明RabbitMQ
示例中使用的直连交换机,声明一个交换机,一个队列。交换机与队列绑定关系
package com.example.consume.listener;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
* Rabbitmq的配置示例
* @auth yuesf
* @data 2019/11/4
*/
@Configuration
public class RabbitConfigDemo {
/**
* 示例交换机
*
* @return
*/
@Bean
public DirectExchange demoExchange() {
return new DirectExchange("demo.direct.exchange", true, false);
}
/**
* 示例队列
*
* @return
*/
@Bean
public Queue demoQueue() {
return new Queue("demo.queue", true, false, false);
}
/**
* 交换机与队列的绑定关系
*
* @param demoQueue
* @param demoExchange
* @return
*/
@Bean
public Binding bindingDemoQueue(@Qualifier("demoQueue") Queue demoQueue,
@Qualifier("demoExchange") DirectExchange demoExchange) {
return BindingBuilder.bind(demoQueue).to(demoExchange).with("demo.direct");
}
}
4)监听方法
package com.example.consume.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*
* @auth yuesf
* @data 2019/11/4
*/
@Component
public class RabbitDemoListener {
private static final Logger logger = LoggerFactory.getLogger(RabbitDemoListener.class);
@RabbitListener(queues = "demo.queue")
public void goodsListenerProcess(Object message) {
logger.info("接收消息 message={}", message);
}
}
3、手动ACK指定搭配其他注解使用方式(订阅者)
配置类:
//配置工厂类
@Slf4j
@Configuration
public class RabbitConfig {
@Bean(name = "oneConnectionFactory")
public ConnectionFactory oneConnectionFactory(
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password) {
log.info("mq队列连接信息 host={}, port={}, username={}", host, port, username);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = "oneFactory")
public SimpleRabbitListenerContainerFactory oneFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("oneConnectionFactory") ConnectionFactory oneConnectionFactory) {
log.info("初始化比分 scoreFactory 实例");
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//手动
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, oneConnectionFactory);
return factory;
}
}
监听类:
//监听类
@Component
public class Listener {
//指定交换机、队列、路由routingKey
//ignoreDeclarationExceptions已有交换机可能会有异常忽略掉,从源码上看不存在交换机和队列会自动创建
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "demo.queue", durable = "true"),
exchange = @Exchange(
value = "demoExchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.Direct
),
key = {"demo.direct"}),
containerFactory = "oneFactory")
public void listen(String msg, Channel channel, Message message) throws IOException {
try {
User user = JSON.parseObject(msg, User.class);
// 确认收到消息,只确认当前消费者的一个消息收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.info("消息已经回滚过,拒绝接收消息 : {}", msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.info("消息即将返回队列重新处理 :{}", msg);
//设置消息重新回到队列处理,requeue表示是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
log.error("消息消费异常, msg={},e={}", msg,e.getStackTrace());
}
}
}
配置文件
spring:
rabbitmq:
host: mq.dev.qiuhui.com
port: 5672
username: admin
password: TY111111
listener:
simple:
#指定消息确认模式为手动确认
acknowledge-mode: manual