纯手敲,单词错误见谅。文章借鉴github某为小哥哥得教材!
1、pom文件引入相关资源
<!--注解开发,便携式 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
<!-- 引入 amqp包,可以使用org.springframework.amqp的相关配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
```在系统配置文件中加入连接属性```yaml
spring:application:name: RabbitMQ-Demorabbitmq:host: k.wuwii.comport: 5672username: kronchanpassword: 123456#virtual-host: testpublisher-confirms: true # 开启确认消息是否到达交换器,需要设置 truepublisher-returns: true # 开启确认消息是否到达队列,需要设置 true
2、创建消费者接收器,实现ChannelAwareMessageListener(可配置ack机制)
参考文章了解配置ChannelAwareMessageListener:https://www.jianshu.com/p/e8a5517ec688
/*** 消费者接收器*/@Slf4jpublic class MassageReceiver implements ChannelAwareMessageListener{@Overridepublic void onMessage(Message message,Channel channel)throws Execption{try{byte[] body = message.getBody();log,info("消费者receiver {}",new String(body);}finally{//确认成功消费,否则消息会转发到其他消费者,或者进行重试channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}}
2、配置消费者属性
/*** RabbitMQ 消费者的配置属性** @author moyu* @version 1.0* @since <pre>2018/3/19 10:04</pre>*/@comfigurationpublic class RabbitMQConfig{public final static String QUEUE_NAME="queue.name1";public final static String ROUTING_KEY="rout-key";public final static String EXCHANGE_NAME="exchange-name1";//注入Bean Queue@Beanpublic Queue queue(){//@1 是否持久化(持久)boolean durable =true;//@2仅创建者可以使用的私有队列,断开后自动删除(专属)boolean exclusive = false;//@3当所有消费者断开连接后,是否自动删除队列boolean autoDelet = false;return new Queue(QUEUE_NAME,durable,exclusive,autoDelet );}//注入交换机@Beanpublic topicExchange exchange(){//是否持久化boolean durable = true;//当所有消费者断开连接后,是否自动删除队列boolean autoDelet = false;return new TopicExchange(EXCHANGE_NAME,durable,autoDelet);}//将队列、交换机 绑定到路由键@Beanpublic Binding bind(Queue queue,TopicExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}//注入消费者接收器@Beanpublic MessageReceiver receiver(){return new MessageReceiver();}//关于SimpleMessageListenerContainer 设置消费队列监听 可以学习 https://www.jianshu.com/p/213827ebc08c@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageReceiver messageReceiver){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();//设置amqp连接工厂container.setConnectionFactory(conectionFactory);//设置消费队列监听 而且可以设置多个 使用逗号分隔,并且可以动态加入队列或者删除队列container.setQueueNames(QUEUE_NAME);//设置消费者接收器container.setMessageListener(messageReceiver);//container.setMaxConcurrentConsumers(1);//container.setConcurrentConsumers(1); 默认为1//container.setExposeListenerChannel(true);//设置为手动签收,默认AUTO , 如果要设置手动应答 basicAck ,就设置为manualcantainer.setAckNuwledgeMode(AcknuwledgeMode.MANUAL);return container;}}
二、生产者
@Componentpublic class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final Logger log = LoggerFactory.getLogger(MessageSender.class);public void send(){//消息唯一IDCorrelationDate correlationId = new CorrelationDate(DDID.randomUUID().toString());// ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时// 需要在系统配置文件中设置 publisher-confirms: trueif(!rabbitTemplate.isConfirmListener()){rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->{if (ack) {log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());}})}// ReturnCallback 是在交换器无法将路由键路由到任何一个队列中,会触发这个方法。// 需要在系统配置文件中设置 publisher-returns: truerabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);log.info("Already sent message.");}}
##### 测试发送消息先启动系统启动类,消费者开始订阅,启动测试类发送消息。```java@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringbootRabbitmqApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void testReceiver() {sender.send();}}
可以在消费者接收到信息,并且发送端将打出日志 成功发送消息的记录,也可以测试下 `Publisher Confirms and Returns机制` 主要是测试 `ConfirmCallback` 和 `ReturnCallback` 这两个方法。* `ConfirmCallback` ,确认消息是否到达交换器,例如我们发送一个消息到一个你没有创建过的 交换器上面去,看看情况,* `ReturnCallback`,确认消息是否到达队列,我们可以这样测试,定义一个路由键,不会被任何队列订阅到,最后查看结果就可以了。
##### 公平转发(Fair dispatch)设置 RabbitMQ 往**空闲的工作线程**中发送任务,避免某些工作线程的任务过高,而部分工作线程空闲的问题。在生产者的管道设置参数:```javaint prefetchCount = 1;channel.basicQos(prefetchCount) ;
