代码地址 https://github.com/pengbiaobeyond/rabbitmq
1、之前两节讨论可以总结为对比,virtual host,五种类型(点对点分为普通,公平,发布订阅分为三种,direct,fanout,topic,headers);
2、
消费者如何确保消息一定 能够消费成功?
通过应答形式的设置,默认是自动应答,无论消费是否正常都会将消息全队列中清除,但是可以更改为手动应答,只要不成功就不应答,则消息就会一直保存在队列中;
如果RabbitMQ服务器宕机了,消息会丢失吗?
RabbitMQ服务器支持消息持久化机制,会把消息持久在硬盘上;
3、mq事务
问题产生背景:
生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器,默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
解决方案:
1.AMQP事务机制
2.Confirm模式
事务模式:
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
public class Producer {private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {// 1.获取连接Connection newConnection = MQConnectionUtils.newConnection();// 2.创建通道Channel channel = newConnection.createChannel();// 3.创建队列声明channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务channel.txSelect();String msg = "test_yushengjun110";try {// 4.发送消息channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// int i = 1 / 0;channel.txCommit();// 提交事务System.out.println("生产者发送消息:" + msg);} catch (Exception e) {System.out.println("消息进行回滚操作");channel.txRollback();// 回滚事务} finally {channel.close();newConnection.close();}}}
4、SpringBoot整合RabbitMQ
生产者:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><dependencies><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加springboot对amqp的支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><!--fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency></dependencies>
spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: /
@Componentpublic class FanoutConfig {// 邮件队列private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";// 短信队列private String FANOUT_SMS_QUEUE = "fanout_sms_queue";// 短信队列private String EXCHANGE_NAME = "fanoutExchange";// 1.定义队列邮件@Beanpublic Queue fanOutEamilQueue() {return new Queue(FANOUT_EMAIL_QUEUE);}@Beanpublic Queue fanOutSmsQueue() {return new Queue(FANOUT_SMS_QUEUE);}// 2.定义交换机@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(EXCHANGE_NAME);}// 3.队列与交换机绑定邮件队列@BeanBinding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);}// 4.队列与交换机绑定短信队列@BeanBinding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);}}
@Componentpublic class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(String queueName) {String msg = "my_fanout_msg:" + new Date();System.out.println(msg + ":" + msg);amqpTemplate.convertAndSend(queueName, msg);}}
@RestControllerpublic class ProducerController {@Autowiredprivate FanoutProducer fanoutProducer;@RequestMapping("/sendFanout")public String sendFanout(String queueName) {fanoutProducer.send(queueName);return "success";}}
消费者:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><dependencies><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加springboot对amqp的支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><!--fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency></dependencies>
@Component@RabbitListener(queues = "fanout_eamil_queue")public class FanoutEamilConsumer {@RabbitHandlerpublic void process(String msg) throws Exception {System.out.println("邮件消费者获取生产者消息msg:" + msg);}}@Component@RabbitListener(queues = "fanout_sms_queue")public class FanoutSmsConsumer {@RabbitHandlerpublic void process(String msg) {System.out.println("短信消费者获取生产者消息msg:" + msg);}}
