1.1 错乱的消息顺序
原因:
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。 但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取, 则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分 区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消 息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列
1.2 顺序消息
1.2.1订单步骤实体类
/**
* 订单创建的步骤
* @author zhengpei
* @date 2022/1/3
*/
public class OrderStep {
private long orderId;
private String desc;
// get set toString
}
1.2.2发送消息
package cn.com.beyond.rocketmqdemo.controller;
import cn.com.beyond.rocketmqdemo.model.OrderStep;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.List;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<OrderStep> orderList = new Producer().buildOrders();
//设置消息进入到指定的消息队列中
for (final OrderStep order : orderList) {
Message msg = new Message("topic1", order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
@Override
public MessageQueue select(List<MessageQueue> list, Message
message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
long orderId = order.getOrderId();
long mqIndex = orderId % list.size();
return list.get((int) mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(2L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
1.2.3接收消息
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : list) {
System.out.println(Thread.currentThread().getName()+"。消息:"
+ new String(msg.getBody())+"。queueId:"+msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
1.2.4使用SpringBoot 保证消息顺序消费demo
@GetMapping("/sequentialConsumption")
public String sequentialConsumption(){
List<OrderStep> orderList = buildOrders();
//设置消息进入到指定的消息队列中
for (final OrderStep order : orderList) {
template.syncSendOrderly("topic:" ,order,String.valueOf(order.getOrderId()));
}
return JSON.toJSONString(orderList);
}
1.3 事务消息
- 正常事务过程
2. 事务补偿过程
1.4 事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
2. 回滚状态:不允许进入队列,此消息等同于未发送过
3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
4. 注意:事务消息仅与生产者有关,与消费者无关1.5 事务消息
提交状态
回滚状态//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o)
{
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();
中间状态producer.setTransactionListener(new TransactionListener() {
//正常事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//事务补偿
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return null;
}
});
快读读写:public static void main(String[] args) throws Exception {
TransactionMQProducer producer=new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
//正常事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
return LocalTransactionState.UNKNOW;
}
//事务补偿 正常执行UNKNOW才会触发
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("事务补偿");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("返回结果:" + result);
//事务补偿生产者一定要一直启动着
//producer.shutdown();
}
1.零拷贝技术
2.磁盘的读写方式是预留一块位置顺序读写,空间换取时间
- 提交状态:允许进入队列,此消息与非事务消息无区别
集群搭建见附件
RocketMQ-02.docx