代码地址 https://github.com/pengbiaobeyond/rabbitmq

    1、之前两节讨论可以总结为对比,virtual host,五种类型(点对点分为普通,公平,发布订阅分为三种,direct,fanout,topic,headers);

    2、
    消费者如何确保消息一定 能够消费成功?
    通过应答形式的设置,默认是自动应答,无论消费是否正常都会将消息全队列中清除,但是可以更改为手动应答,只要不成功就不应答,则消息就会一直保存在队列中;

    如果RabbitMQ服务器宕机了,消息会丢失吗?
    RabbitMQ服务器支持消息持久化机制,会把消息持久在硬盘上;

    3、mq事务
    问题产生背景:
    生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器,默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
    解决方案:
    1.AMQP事务机制
    2.Confirm模式
    事务模式:
    txSelect 将当前channel设置为transaction模式
    txCommit 提交当前事务
    txRollback 事务回滚

    1. public class Producer {
    2. private static final String QUEUE_NAME = "test_queue";
    3. public static void main(String[] args) throws IOException, TimeoutException {
    4. // 1.获取连接
    5. Connection newConnection = MQConnectionUtils.newConnection();
    6. // 2.创建通道
    7. Channel channel = newConnection.createChannel();
    8. // 3.创建队列声明
    9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    10. // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务
    11. channel.txSelect();
    12. String msg = "test_yushengjun110";
    13. try {
    14. // 4.发送消息
    15. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    16. // int i = 1 / 0;
    17. channel.txCommit();// 提交事务
    18. System.out.println("生产者发送消息:" + msg);
    19. } catch (Exception e) {
    20. System.out.println("消息进行回滚操作");
    21. channel.txRollback();// 回滚事务
    22. } finally {
    23. channel.close();
    24. newConnection.close();
    25. }
    26. }
    27. }

    4、SpringBoot整合RabbitMQ
    生产者:

    1. <parent>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-parent</artifactId>
    4. <version>2.0.0.RELEASE</version>
    5. </parent>
    6. <dependencies>
    7. <!-- springboot-web组件 -->
    8. <dependency>
    9. <groupId>org.springframework.boot</groupId>
    10. <artifactId>spring-boot-starter-web</artifactId>
    11. </dependency>
    12. <!-- 添加springbootamqp的支持 -->
    13. <dependency>
    14. <groupId>org.springframework.boot</groupId>
    15. <artifactId>spring-boot-starter-amqp</artifactId>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.apache.commons</groupId>
    19. <artifactId>commons-lang3</artifactId>
    20. </dependency>
    21. <!--fastjson -->
    22. <dependency>
    23. <groupId>com.alibaba</groupId>
    24. <artifactId>fastjson</artifactId>
    25. <version>1.2.49</version>
    26. </dependency>
    27. </dependencies>
    1. spring:
    2. rabbitmq:
    3. ####连接地址
    4. host: 127.0.0.1
    5. ####端口号
    6. port: 5672
    7. ####账号
    8. username: guest
    9. ####密码
    10. password: guest
    11. ### 地址
    12. virtual-host: /


    1. @Component
    2. public class FanoutConfig {
    3. // 邮件队列
    4. private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
    5. // 短信队列
    6. private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
    7. // 短信队列
    8. private String EXCHANGE_NAME = "fanoutExchange";
    9. // 1.定义队列邮件
    10. @Bean
    11. public Queue fanOutEamilQueue() {
    12. return new Queue(FANOUT_EMAIL_QUEUE);
    13. }
    14. @Bean
    15. public Queue fanOutSmsQueue() {
    16. return new Queue(FANOUT_SMS_QUEUE);
    17. }
    18. // 2.定义交换机
    19. @Bean
    20. FanoutExchange fanoutExchange() {
    21. return new FanoutExchange(EXCHANGE_NAME);
    22. }
    23. // 3.队列与交换机绑定邮件队列
    24. @Bean
    25. Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
    26. return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
    27. }
    28. // 4.队列与交换机绑定短信队列
    29. @Bean
    30. Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
    31. return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
    32. }
    33. }


    1. @Component
    2. public class FanoutProducer {
    3. @Autowired
    4. private AmqpTemplate amqpTemplate;
    5. public void send(String queueName) {
    6. String msg = "my_fanout_msg:" + new Date();
    7. System.out.println(msg + ":" + msg);
    8. amqpTemplate.convertAndSend(queueName, msg);
    9. }
    10. }


    1. @RestController
    2. public class ProducerController {
    3. @Autowired
    4. private FanoutProducer fanoutProducer;
    5. @RequestMapping("/sendFanout")
    6. public String sendFanout(String queueName) {
    7. fanoutProducer.send(queueName);
    8. return "success";
    9. }
    10. }

    消费者:

    1. <parent>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-parent</artifactId>
    4. <version>2.0.0.RELEASE</version>
    5. </parent>
    6. <dependencies>
    7. <!-- springboot-web组件 -->
    8. <dependency>
    9. <groupId>org.springframework.boot</groupId>
    10. <artifactId>spring-boot-starter-web</artifactId>
    11. </dependency>
    12. <!-- 添加springbootamqp的支持 -->
    13. <dependency>
    14. <groupId>org.springframework.boot</groupId>
    15. <artifactId>spring-boot-starter-amqp</artifactId>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.apache.commons</groupId>
    19. <artifactId>commons-lang3</artifactId>
    20. </dependency>
    21. <!--fastjson -->
    22. <dependency>
    23. <groupId>com.alibaba</groupId>
    24. <artifactId>fastjson</artifactId>
    25. <version>1.2.49</version>
    26. </dependency>
    27. </dependencies>


    1. @Component
    2. @RabbitListener(queues = "fanout_eamil_queue")
    3. public class FanoutEamilConsumer {
    4. @RabbitHandler
    5. public void process(String msg) throws Exception {
    6. System.out.println("邮件消费者获取生产者消息msg:" + msg);
    7. }
    8. }
    9. @Component
    10. @RabbitListener(queues = "fanout_sms_queue")
    11. public class FanoutSmsConsumer {
    12. @RabbitHandler
    13. public void process(String msg) {
    14. System.out.println("短信消费者获取生产者消息msg:" + msg);
    15. }
    16. }