纯手敲,单词错误见谅。文章借鉴github某为小哥哥得教材!
    1、pom文件引入相关资源

    1. <!--注解开发,便携式 -->
    2. <dependency>
    3. <groupId>org.projectlombok</groupId>
    4. <artifactId>lombok</artifactId>
    5. <optional>true</optional>
    6. </dependency>
    1. <!-- 引入 amqp包,可以使用org.springframework.amqp的相关配置 -->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>
    1. ```
    2. 在系统配置文件中加入连接属性
    3. ```yaml
    1. spring:
    2. application:
    3. name: RabbitMQ-Demo
    4. rabbitmq:
    5. host: k.wuwii.com
    6. port: 5672
    7. username: kronchan
    8. password: 123456
    9. #virtual-host: test
    10. publisher-confirms: true # 开启确认消息是否到达交换器,需要设置 true
    11. publisher-returns: true # 开启确认消息是否到达队列,需要设置 true

    2、创建消费者接收器,实现ChannelAwareMessageListener(可配置ack机制)
    参考文章了解配置ChannelAwareMessageListener:https://www.jianshu.com/p/e8a5517ec688

    1. /**
    2. * 消费者接收器
    3. */
    4. @Slf4j
    5. public class MassageReceiver implements ChannelAwareMessageListener{
    6. @Override
    7. public void onMessage(Message message,Channel channel)throws Execption{
    8. try{
    9. byte[] body = message.getBody();
    10. log,info("消费者receiver {}"new String(body);
    11. }finally{
    12. //确认成功消费,否则消息会转发到其他消费者,或者进行重试
    13. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    14. }
    15. }
    16. }

    2、配置消费者属性

    1. /**
    2. * RabbitMQ 消费者的配置属性
    3. *
    4. * @author moyu
    5. * @version 1.0
    6. * @since <pre>2018/3/19 10:04</pre>
    7. */
    8. @comfiguration
    9. public class RabbitMQConfig{
    10. public final static String QUEUE_NAME="queue.name1";
    11. public final static String ROUTING_KEY="rout-key";
    12. public final static String EXCHANGE_NAME="exchange-name1";
    13. //注入Bean Queue
    14. @Bean
    15. public Queue queue(){
    16. //@1 是否持久化(持久)
    17. boolean durable =true;
    18. //@2仅创建者可以使用的私有队列,断开后自动删除(专属)
    19. boolean exclusive = false;
    20. //@3当所有消费者断开连接后,是否自动删除队列
    21. boolean autoDelet = false;
    22. return new Queue(QUEUE_NAME,durable,exclusive,autoDelet );
    23. }
    24. //注入交换机
    25. @Bean
    26. public topicExchange exchange(){
    27. //是否持久化
    28. boolean durable = true;
    29. //当所有消费者断开连接后,是否自动删除队列
    30. boolean autoDelet = false;
    31. return new TopicExchange(EXCHANGE_NAME,durable,autoDelet);
    32. }
    33. //将队列、交换机 绑定到路由键
    34. @Bean
    35. public Binding bind(Queue queue,TopicExchange exchange){
    36. return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    37. }
    38. //注入消费者接收器
    39. @Bean
    40. public MessageReceiver receiver(){
    41. return new MessageReceiver();
    42. }
    43. //关于SimpleMessageListenerContainer 设置消费队列监听 可以学习 https://www.jianshu.com/p/213827ebc08c
    44. @Bean
    45. public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageReceiver messageReceiver){
    46. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    47. //设置amqp连接工厂
    48. container.setConnectionFactory(conectionFactory);
    49. //设置消费队列监听 而且可以设置多个 使用逗号分隔,并且可以动态加入队列或者删除队列
    50. container.setQueueNames(QUEUE_NAME);
    51. //设置消费者接收器
    52. container.setMessageListener(messageReceiver);
    53. //container.setMaxConcurrentConsumers(1);
    54. //container.setConcurrentConsumers(1); 默认为1
    55. //container.setExposeListenerChannel(true);
    56. //设置为手动签收,默认AUTO , 如果要设置手动应答 basicAck ,就设置为manual
    57. cantainer.setAckNuwledgeMode(AcknuwledgeMode.MANUAL);
    58. return container;
    59. }
    60. }

    二、生产者

    1. @Component
    2. public class MessageSender {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
    6. public void send(){
    7. //消息唯一ID
    8. CorrelationDate correlationId = new CorrelationDate(DDID.randomUUID().toString());
    9. // ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时
    10. // 需要在系统配置文件中设置 publisher-confirms: true
    11. if(!rabbitTemplate.isConfirmListener()){
    12. rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->{
    13. if (ack) {
    14. log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());
    15. } else {
    16. log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());
    17. }
    18. })
    19. }
    20. // ReturnCallback 是在交换器无法将路由键路由到任何一个队列中,会触发这个方法。
    21. // 需要在系统配置文件中设置 publisher-returns: true
    22. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    23. log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());
    24. });
    25. rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);
    26. log.info("Already sent message.");
    27. }
    28. }
    1. ##### 测试发送消息
    2. 先启动系统启动类,消费者开始订阅,启动测试类发送消息。
    3. ```java
    4. @RunWith(SpringRunner.class)
    5. @SpringBootTest
    6. public class SpringbootRabbitmqApplicationTests {
    7. @Autowired
    8. private MessageSender sender;
    9. @Test
    10. public void testReceiver() {
    11. sender.send();
    12. }
    13. }
    1. 可以在消费者接收到信息,并且发送端将打出日志 成功发送消息的记录,也可以测试下 `Publisher Confirms and Returns机制` 主要是测试 `ConfirmCallback` `ReturnCallback` 这两个方法。
    2. * `ConfirmCallback` ,确认消息是否到达交换器,例如我们发送一个消息到一个你没有创建过的 交换器上面去,看看情况,
    3. * `ReturnCallback`,确认消息是否到达队列,我们可以这样测试,定义一个路由键,不会被任何队列订阅到,最后查看结果就可以了。
    1. ##### 公平转发(Fair dispatch
    2. 设置 RabbitMQ 往**空闲的工作线程**中发送任务,避免某些工作线程的任务过高,而部分工作线程空闲的问题。
    3. 在生产者的管道设置参数:
    4. ```java
    5. int prefetchCount = 1;
    6. channel.basicQos(prefetchCount) ;