1.1 错乱的消息顺序

原因:
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的 queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。 但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取, 则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分 区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消 息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列
image.png
image.png

1.2 顺序消息

1.2.1订单步骤实体类

  1. /**
  2. * 订单创建的步骤
  3. * @author zhengpei
  4. * @date 2022/1/3
  5. */
  6. public class OrderStep {
  7. private long orderId;
  8. private String desc;
  9. // get set toString
  10. }

1.2.2发送消息

  1. package cn.com.beyond.rocketmqdemo.controller;
  2. import cn.com.beyond.rocketmqdemo.model.OrderStep;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.common.message.MessageQueue;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. public class Producer {
  11. public static void main(String[] args) throws Exception {
  12. DefaultMQProducer producer = new DefaultMQProducer("group1");
  13. producer.setNamesrvAddr("localhost:9876");
  14. producer.start();
  15. List<OrderStep> orderList = new Producer().buildOrders();
  16. //设置消息进入到指定的消息队列中
  17. for (final OrderStep order : orderList) {
  18. Message msg = new Message("topic1", order.toString().getBytes());
  19. //发送时要指定对应的消息队列选择器
  20. SendResult result = producer.send(msg, new MessageQueueSelector() {
  21. //设置当前消息发送时使用哪一个消息队列
  22. @Override
  23. public MessageQueue select(List<MessageQueue> list, Message
  24. message, Object o) {
  25. //根据发送的信息不同,选择不同的消息队列
  26. //根据id来选择一个消息队列的对象,并返回->id得到int值
  27. long orderId = order.getOrderId();
  28. long mqIndex = orderId % list.size();
  29. return list.get((int) mqIndex);
  30. }
  31. }, null);
  32. System.out.println(result);
  33. }
  34. producer.shutdown();
  35. }
  36. /**
  37. * 生成模拟订单数据
  38. */
  39. private List<OrderStep> buildOrders() {
  40. List<OrderStep> orderList = new ArrayList<OrderStep>();
  41. OrderStep orderDemo = new OrderStep();
  42. orderDemo.setOrderId(1L);
  43. orderDemo.setDesc("创建");
  44. orderList.add(orderDemo);
  45. orderDemo = new OrderStep();
  46. orderDemo.setOrderId(2L);
  47. orderDemo.setDesc("创建");
  48. orderList.add(orderDemo);
  49. orderDemo = new OrderStep();
  50. orderDemo.setOrderId(1L);
  51. orderDemo.setDesc("付款");
  52. orderList.add(orderDemo);
  53. orderDemo = new OrderStep();
  54. orderDemo.setOrderId(3L);
  55. orderDemo.setDesc("创建");
  56. orderList.add(orderDemo);
  57. orderDemo = new OrderStep();
  58. orderDemo.setOrderId(2L);
  59. orderDemo.setDesc("付款");
  60. orderList.add(orderDemo);
  61. orderDemo = new OrderStep();
  62. orderDemo.setOrderId(3L);
  63. orderDemo.setDesc("付款");
  64. orderList.add(orderDemo);
  65. orderDemo = new OrderStep();
  66. orderDemo.setOrderId(2L);
  67. orderDemo.setDesc("完成");
  68. orderList.add(orderDemo);
  69. orderDemo = new OrderStep();
  70. orderDemo.setOrderId(1L);
  71. orderDemo.setDesc("推送");
  72. orderList.add(orderDemo);
  73. orderDemo = new OrderStep();
  74. orderDemo.setOrderId(3L);
  75. orderDemo.setDesc("完成");
  76. orderList.add(orderDemo);
  77. orderDemo = new OrderStep();
  78. orderDemo.setOrderId(1L);
  79. orderDemo.setDesc("完成");
  80. orderList.add(orderDemo);
  81. return orderList;
  82. }
  83. }

1.2.3接收消息

  1. //使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
  2. consumer.registerMessageListener(new MessageListenerOrderly() {
  3. //使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
  4. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
  5. ConsumeOrderlyContext consumeOrderlyContext) {
  6. for (MessageExt msg : list) {
  7. System.out.println(Thread.currentThread().getName()+"。消息:"
  8. + new String(msg.getBody())+"。queueId:"+msg.getQueueId());
  9. }
  10. return ConsumeOrderlyStatus.SUCCESS;
  11. }
  12. });

1.2.4使用SpringBoot 保证消息顺序消费demo

  1. @GetMapping("/sequentialConsumption")
  2. public String sequentialConsumption(){
  3. List<OrderStep> orderList = buildOrders();
  4. //设置消息进入到指定的消息队列中
  5. for (final OrderStep order : orderList) {
  6. template.syncSendOrderly("topic:" ,order,String.valueOf(order.getOrderId()));
  7. }
  8. return JSON.toJSONString(orderList);
  9. }

1.3 事务消息

  1. 正常事务过程
    2. 事务补偿过程
    image.png

    1.4 事务消息状态

    1. 提交状态:允许进入队列,此消息与非事务消息无区别
      2. 回滚状态:不允许进入队列,此消息等同于未发送过
      3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
      4. 注意:事务消息仅与生产者有关,与消费者无关

      1.5 事务消息

      提交状态
      1. //事务消息使用的生产者是TransactionMQProducer
      2. TransactionMQProducer producer = new TransactionMQProducer("group1");
      3. producer.setNamesrvAddr("localhost:9876");
      4. //添加本地事务对应的监听
      5. producer.setTransactionListener(new TransactionListener() {
      6. //正常事务过程
      7. public LocalTransactionState executeLocalTransaction(Message message, Object o)
      8. {
      9. return LocalTransactionState.COMMIT_MESSAGE;
      10. }
      11. //事务补偿过程
      12. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
      13. return null;
      14. }
      15. });
      16. producer.start();
      17. Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF8"));
      18. SendResult result = producer.sendMessageInTransaction(msg,null);
      19. System.out.println("返回结果:"+result);
      20. producer.shutdown();
      回滚状态
      1. producer.setTransactionListener(new TransactionListener() {
      2. //正常事务
      3. @Override
      4. public LocalTransactionState executeLocalTransaction(Message msg,
      5. Object arg) {
      6. return LocalTransactionState.ROLLBACK_MESSAGE;
      7. }
      8. //事务补偿
      9. @Override
      10. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      11. return null;
      12. }
      13. });
      中间状态
      1. public static void main(String[] args) throws Exception {
      2. TransactionMQProducer producer=new TransactionMQProducer("group1");
      3. producer.setNamesrvAddr("localhost:9876");
      4. producer.setTransactionListener(new TransactionListener() {
      5. //正常事务
      6. @Override
      7. public LocalTransactionState executeLocalTransaction(Message msg,
      8. Object arg) {
      9. return LocalTransactionState.UNKNOW;
      10. }
      11. //事务补偿 正常执行UNKNOW才会触发
      12. @Override
      13. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      14. System.out.println("事务补偿");
      15. return LocalTransactionState.COMMIT_MESSAGE;
      16. }
      17. });
      18. producer.start();
      19. Message msg = new Message("topic13", "hello rocketmq".getBytes("UTF8"));
      20. SendResult result = producer.sendMessageInTransaction(msg, null);
      21. System.out.println("返回结果:" + result);
      22. //事务补偿生产者一定要一直启动着
      23. //producer.shutdown();
      24. }
      快读读写:
      1.零拷贝技术
      2.磁盘的读写方式是预留一块位置顺序读写,空间换取时间

集群搭建见附件
RocketMQ-02.docx