1.1 错乱的消息顺序
原因:
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。 但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取, 则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分 区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消 息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列 
1.2 顺序消息
1.2.1订单步骤实体类
/*** 订单创建的步骤* @author zhengpei* @date 2022/1/3*/public class OrderStep {private long orderId;private String desc;// get set toString}
1.2.2发送消息
package cn.com.beyond.rocketmqdemo.controller;import cn.com.beyond.rocketmqdemo.model.OrderStep;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import java.util.ArrayList;import java.util.List;public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("localhost:9876");producer.start();List<OrderStep> orderList = new Producer().buildOrders();//设置消息进入到指定的消息队列中for (final OrderStep order : orderList) {Message msg = new Message("topic1", order.toString().getBytes());//发送时要指定对应的消息队列选择器SendResult result = producer.send(msg, new MessageQueueSelector() {//设置当前消息发送时使用哪一个消息队列@Overridepublic MessageQueue select(List<MessageQueue> list, Messagemessage, Object o) {//根据发送的信息不同,选择不同的消息队列//根据id来选择一个消息队列的对象,并返回->id得到int值long orderId = order.getOrderId();long mqIndex = orderId % list.size();return list.get((int) mqIndex);}}, null);System.out.println(result);}producer.shutdown();}/*** 生成模拟订单数据*/private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(2L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(3L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(2L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(3L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(2L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(3L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}}
1.2.3接收消息
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列consumer.registerMessageListener(new MessageListenerOrderly() {//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println(Thread.currentThread().getName()+"。消息:"+ new String(msg.getBody())+"。queueId:"+msg.getQueueId());}return ConsumeOrderlyStatus.SUCCESS;}});
1.2.4使用SpringBoot 保证消息顺序消费demo
@GetMapping("/sequentialConsumption")public String sequentialConsumption(){List<OrderStep> orderList = buildOrders();//设置消息进入到指定的消息队列中for (final OrderStep order : orderList) {template.syncSendOrderly("topic:" ,order,String.valueOf(order.getOrderId()));}return JSON.toJSONString(orderList);}
1.3 事务消息
- 正常事务过程
2. 事务补偿过程
1.4 事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
2. 回滚状态:不允许进入队列,此消息等同于未发送过
3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
4. 注意:事务消息仅与生产者有关,与消费者无关1.5 事务消息
提交状态
回滚状态//事务消息使用的生产者是TransactionMQProducerTransactionMQProducer producer = new TransactionMQProducer("group1");producer.setNamesrvAddr("localhost:9876");//添加本地事务对应的监听producer.setTransactionListener(new TransactionListener() {//正常事务过程public LocalTransactionState executeLocalTransaction(Message message, Object o){return LocalTransactionState.COMMIT_MESSAGE;}//事务补偿过程public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {return null;}});producer.start();Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF8"));SendResult result = producer.sendMessageInTransaction(msg,null);System.out.println("返回结果:"+result);producer.shutdown();
中间状态producer.setTransactionListener(new TransactionListener() {//正常事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg,Object arg) {return LocalTransactionState.ROLLBACK_MESSAGE;}//事务补偿@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return null;}});
快读读写:public static void main(String[] args) throws Exception {TransactionMQProducer producer=new TransactionMQProducer("group1");producer.setNamesrvAddr("localhost:9876");producer.setTransactionListener(new TransactionListener() {//正常事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg,Object arg) {return LocalTransactionState.UNKNOW;}//事务补偿 正常执行UNKNOW才会触发@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("事务补偿");return LocalTransactionState.COMMIT_MESSAGE;}});producer.start();Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF8"));SendResult result = producer.sendMessageInTransaction(msg, null);System.out.println("返回结果:" + result);//事务补偿生产者一定要一直启动着//producer.shutdown();}
1.零拷贝技术
2.磁盘的读写方式是预留一块位置顺序读写,空间换取时间
- 提交状态:允许进入队列,此消息与非事务消息无区别
集群搭建见附件
RocketMQ-02.docx
