纯手敲,单词错误见谅。文章借鉴github某为小哥哥得教材!
1、pom文件引入相关资源
<!--注解开发,便携式 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 引入 amqp包,可以使用org.springframework.amqp的相关配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
```
在系统配置文件中加入连接属性
```yaml
spring:
application:
name: RabbitMQ-Demo
rabbitmq:
host: k.wuwii.com
port: 5672
username: kronchan
password: 123456
#virtual-host: test
publisher-confirms: true # 开启确认消息是否到达交换器,需要设置 true
publisher-returns: true # 开启确认消息是否到达队列,需要设置 true
2、创建消费者接收器,实现ChannelAwareMessageListener(可配置ack机制)
参考文章了解配置ChannelAwareMessageListener:https://www.jianshu.com/p/e8a5517ec688
/**
* 消费者接收器
*/
@Slf4j
public class MassageReceiver implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message,Channel channel)throws Execption{
try{
byte[] body = message.getBody();
log,info("消费者receiver {}",new String(body);
}finally{
//确认成功消费,否则消息会转发到其他消费者,或者进行重试
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
2、配置消费者属性
/**
* RabbitMQ 消费者的配置属性
*
* @author moyu
* @version 1.0
* @since <pre>2018/3/19 10:04</pre>
*/
@comfiguration
public class RabbitMQConfig{
public final static String QUEUE_NAME="queue.name1";
public final static String ROUTING_KEY="rout-key";
public final static String EXCHANGE_NAME="exchange-name1";
//注入Bean Queue
@Bean
public Queue queue(){
//@1 是否持久化(持久)
boolean durable =true;
//@2仅创建者可以使用的私有队列,断开后自动删除(专属)
boolean exclusive = false;
//@3当所有消费者断开连接后,是否自动删除队列
boolean autoDelet = false;
return new Queue(QUEUE_NAME,durable,exclusive,autoDelet );
}
//注入交换机
@Bean
public topicExchange exchange(){
//是否持久化
boolean durable = true;
//当所有消费者断开连接后,是否自动删除队列
boolean autoDelet = false;
return new TopicExchange(EXCHANGE_NAME,durable,autoDelet);
}
//将队列、交换机 绑定到路由键
@Bean
public Binding bind(Queue queue,TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
//注入消费者接收器
@Bean
public MessageReceiver receiver(){
return new MessageReceiver();
}
//关于SimpleMessageListenerContainer 设置消费队列监听 可以学习 https://www.jianshu.com/p/213827ebc08c
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageReceiver messageReceiver){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
//设置amqp连接工厂
container.setConnectionFactory(conectionFactory);
//设置消费队列监听 而且可以设置多个 使用逗号分隔,并且可以动态加入队列或者删除队列
container.setQueueNames(QUEUE_NAME);
//设置消费者接收器
container.setMessageListener(messageReceiver);
//container.setMaxConcurrentConsumers(1);
//container.setConcurrentConsumers(1); 默认为1
//container.setExposeListenerChannel(true);
//设置为手动签收,默认AUTO , 如果要设置手动应答 basicAck ,就设置为manual
cantainer.setAckNuwledgeMode(AcknuwledgeMode.MANUAL);
return container;
}
}
二、生产者
@Component
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
public void send(){
//消息唯一ID
CorrelationDate correlationId = new CorrelationDate(DDID.randomUUID().toString());
// ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时
// 需要在系统配置文件中设置 publisher-confirms: true
if(!rabbitTemplate.isConfirmListener()){
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->{
if (ack) {
log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());
} else {
log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());
}
})
}
// ReturnCallback 是在交换器无法将路由键路由到任何一个队列中,会触发这个方法。
// 需要在系统配置文件中设置 publisher-returns: true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);
log.info("Already sent message.");
}
}
##### 测试发送消息
先启动系统启动类,消费者开始订阅,启动测试类发送消息。
```java
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {
@Autowired
private MessageSender sender;
@Test
public void testReceiver() {
sender.send();
}
}
可以在消费者接收到信息,并且发送端将打出日志 成功发送消息的记录,也可以测试下 `Publisher Confirms and Returns机制` 主要是测试 `ConfirmCallback` 和 `ReturnCallback` 这两个方法。
* `ConfirmCallback` ,确认消息是否到达交换器,例如我们发送一个消息到一个你没有创建过的 交换器上面去,看看情况,
* `ReturnCallback`,确认消息是否到达队列,我们可以这样测试,定义一个路由键,不会被任何队列订阅到,最后查看结果就可以了。
##### 公平转发(Fair dispatch)
设置 RabbitMQ 往**空闲的工作线程**中发送任务,避免某些工作线程的任务过高,而部分工作线程空闲的问题。
在生产者的管道设置参数:
```java
int prefetchCount = 1;
channel.basicQos(prefetchCount) ;