说明

这里使用的是自定义消息队列选择器保证消息的有序, 其实可以使用SelectMessageQueueByHash 来保证消息的有序,

使用SelectMessageQueueByHash 保证顺序有序的帖子看: https://zjj1994.blog.csdn.net/article/details/120883178

怎么保证全局有序?

保证全局有序有个最简单的方式,就是topic里面只有一个队列,这样就可以保证全局有序,但是有人会这样用么? 肯定不会的,因为这样用的话,性能吞吐量安全性都会非常的差.
所以都是保证局部有序,而不是全局有序.

保证顺序有序使用场景

订单支付,一个订单下来必须是有顺序的,比如说 必须先支付 然后再营销 ,然后进入 物流发订单 , 这样顺序是不能乱的.
或者聊天功能: 我们所有人发的顺序需要保证有序,不能说是你先发的,然后再别人后面才到.
所以这就是要保证局部有序.

案例

生产者发送10个订单,每个订单里面有六个步骤,每个步骤都会发送一个消息过去, 这六个步骤要求得是顺序性的.
consumer要保证消费的顺序是一样的.
所有的MQ只能保证在一个queue里面消息是有序的, 如果是Kafka的话就是Partition.
RocketMQ保证的是局部有序,而不是全局有序
什么是顺序局部有序? 顺序局部有序需要你的生产者和消费者一起配合才能做到.

消费者

注意: 注册Listener的时候 ,registerMessageListener 需要使用 MessageListenerOrderly ,这种取出来的消息是有序的. 如果用的是别的MessageListener,比如说MessageListenerConcurrently就不能保证有序了.

  1. package org.apache.rocketmq.example.ordermessage;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. public class Consumer {
  11. public static void main(String[] args) throws MQClientException {
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
  13. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  14. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  15. consumer.subscribe("OrderTopicTest", "*");
  16. /**
  17. *消费者端注册一个监听器,MessageListenerOrderly ,
  18. * 从队列里面拿
  19. */
  20. consumer.registerMessageListener(new MessageListenerOrderly() {
  21. /***
  22. *
  23. * @param msgs
  24. * @param context
  25. * @return
  26. */
  27. @Override
  28. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  29. context.setAutoCommit(true);
  30. for (MessageExt msg : msgs) {
  31. System.out.println("收到消息内容 " + new String(msg.getBody()));
  32. }
  33. return ConsumeOrderlyStatus.SUCCESS; // 返回成功消费标识
  34. }
  35. });
  36. // MessageListenerConcurrently 是乱拿的,保证不了消费顺序
  37. // 这样是保证不了最终消费顺序的。
  38. // consumer.registerMessageListener(new MessageListenerConcurrently() {
  39. // @Override
  40. // public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  41. // for(MessageExt msg:msgs){
  42. // System.out.println("收到消息内容 "+new String(msg.getBody()));
  43. // }
  44. // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  45. // }
  46. // });
  47. consumer.start();
  48. System.out.printf("Consumer Started.%n");
  49. }
  50. }


自定义消息队列选择器

里面的注解我写的很详细了, 就不多做解释了,
另外需要注意一个坑,就是String字符串在hashCode的时候可能会出现负数,这个需要注意.
为什么会string hashCode 完了之后会出现负数? 看这个博客 :https://zjj1994.blog.csdn.net/article/details/120875055

  1. package org.apache.rocketmq.example.ordermessage;
  2. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  3. import org.apache.rocketmq.common.message.Message;
  4. import org.apache.rocketmq.common.message.MessageQueue;
  5. import java.util.List;
  6. /**
  7. * 自定义消息队列选择器
  8. */
  9. public class MyMessageQueueSelector implements MessageQueueSelector {
  10. /**
  11. * 选择队列
  12. *
  13. * @param mqs 这个topic下面所有messageQueue
  14. * @param msg 发送的消息
  15. * @param arg 这个参数是 生产者producer 调用 send 方法的时候第三个参数传过来的
  16. * @return
  17. */
  18. @Override
  19. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  20. // orderId取模MessageQueue的Size,获取到一个索引,这样就保证了同一个Order里面的消息存到了同一个队列
  21. String orderId = (String) arg; // 获取传过来的orderId
  22. // 进行hash操作
  23. //下面 & Integer.MAX_VALUE 的目的是复制hashCode出来的是负数
  24. int orderIdHashCode = orderId.hashCode() & Integer.MAX_VALUE;
  25. // hashCode值和队列的长度进行取余数,取出来一个整数
  26. int index = orderIdHashCode % mqs.size();
  27. // 通过上面取余数获取一个队列, 往这个队列里面投递消息,这样就能保证
  28. MessageQueue messageQueue = mqs.get(index);
  29. return messageQueue;
  30. }
  31. }

生产者

生成500个订单, 每个订单都有4个步骤 ,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付的消息 在 下单消息 之前过来, 这样就出现业务bug了.

  1. package org.apache.rocketmq.example.ordermessage;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.common.RemotingHelper;
  8. import org.apache.rocketmq.remoting.exception.RemotingException;
  9. import java.io.UnsupportedEncodingException;
  10. import java.util.UUID;
  11. public class Producer {
  12. public static void main(String[] args) throws UnsupportedEncodingException {
  13. try {
  14. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  15. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. producer.start();
  17. for (int i = 0; i < 500; i++) { // 生成500个订单
  18. //生成50个订单,这里订单号就用uuid来代替了, 实际情况下每个公司的订单id生成方案是不一样的
  19. String orderId = UUID.randomUUID().toString();
  20. // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
  21. for (int j = 1; j <= 4; j++) {
  22. // 实例化一个消息
  23. Message msg =
  24. new Message("OrderTopicTest", "orderTag", "KEY" + orderId,
  25. ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
  26. //实例化 自己编写的消息队列选择器
  27. MyMessageQueueSelector myMessageQueueSelector = new MyMessageQueueSelector();
  28. /*
  29. * MessageQueueSelector 是消息队列选择器, 这个作用是选择消息发送到哪个队列里面去
  30. * @param 参数1 : msg 你发送的消息
  31. * @param 参数2 : selector 消息Queue选择器,
  32. * @param 参数3: args 传给 消息队列选择器 使用的参数,myMessageQueueSelector 中select方法的第三个参数就是这个值传过去的
  33. */
  34. SendResult sendResult = producer.send(msg, myMessageQueueSelector, orderId);
  35. System.out.printf("%s%n", sendResult);
  36. }
  37. }
  38. producer.shutdown();
  39. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }


执行结果

启动两个消费者

先启动两个消费者, 勾选允许多个实例,这样一个消费者就能启动两个了
image.png
一套代码启动了两个实例出来
image.png

启动生产者生产消息

这里不演示了,自行启动

查看消费者控制台

随便查看一个Consumer的控制台发现
image.png
image.png
随便看几个,发现步骤顺序都是有序的.

特殊情况!部分有序

RocketMQ保证消息有序之自定义消息队列选择器 - 图5
8cf0133a-9e8d-44b6-96d9-4f30286f761a 订单中间还夹杂着一个 a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:1的消息 ,但是 8cf0133a-9e8d-44b6-96d9-4f30286f761a 步骤顺序还是有序的

虽然说 同一个订单id 的消息没有挨在一起消费, 但是 他们的顺序还是有序的. 这就是部分有序.

a2c588f8-1ec3-4de1-997b-545c43b15a33 订单也是有序的
RocketMQ保证消息有序之自定义消息队列选择器 - 图6