使用RocketMQ原生API操作RocketMQ

普通消息

生产者

创建生产者使用new DefaultMQProducer()新建,参数为生产者名称,
创建消息,创建消息一定要设置Topic,Tag和key-value的设置可选,Topic相当于一节分类,Tag相当于二级分类

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. //1.新建生产者实例
  4. DefaultMQProducer producer = new DefaultMQProducer("producer1");
  5. //2.设置name server地址
  6. producer.setNamesrvAddr("192.168.64.141:9876");
  7. //3.启动生产者,连接服务器
  8. producer.start();
  9. //4.将消息数据封装至Message对象并发送
  10. while (true) {
  11. System.out.print("请输入消息: ");
  12. String s = new Scanner(System.in).nextLine();
  13. /*
  14. topic主题---一级分类
  15. tag标签-----二级分类(可选)
  16. */
  17. //封装至Message对象
  18. Message msg = new Message("Topic1", "TagA",s.getBytes(StandardCharsets.UTF_8));
  19. //发送
  20. SendResult result = producer.send(msg);
  21. System.out.println("result==" + result.toString());
  22. }
  23. }
  24. }

消费者

  1. push 和 pull

消费者有两种模式:push 和 pull。
push 模式由服务器主动向消费者发送消息;pull 模式由消费者主动向服务器请求消息。
在消费者处理能力有限时,为了减轻消费者的压力,可以采用pull模式。多数情况下都采用 pull 模式。

  1. NameServer

消费者需要向 NameServer 询问 Topic 的路由信息。

  1. Topic

从指定的Topic接收消息。Topic相当于是一级分类。

  1. Tag

Topic 相当于是一级分类,Tag 相当于是2级分类。
多个 Tag 可以这样写: TagA || TagB || TagC
不指定 Tag,或者说接收所有的 Tag,可以写星号: *

消费者接受消息时需要设置消息监听器,消息监听器会启动多个线程,进行并行的梳理多条消息
消息监听器需要返回接受消息状态其中:ConsumeConcurrentlyStatus.RECONSUME_LATER;表示消费消息失败会进行再次重试,最多会重试18次(18个演示级别),重试时间会越来越长.
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;表示成功消费消息

  1. ublic class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. //新建消费者实例
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
  5. //设置name server
  6. consumer.setNamesrvAddr("192.168.64.141:9876");
  7. //订阅消息
  8. consumer.subscribe("Topic1", "TagA");
  9. /*
  10. 设置消息监听器
  11. MessageListenerConcurrently---会启动多个线程,可以并行的梳理多条消息
  12. */
  13. consumer.registerMessageListener(new MessageListenerConcurrently() {
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
  16. ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  17. for (MessageExt msg : list) {
  18. System.out.println("收到消息" + new String(msg.getBody()));
  19. }
  20. // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. /*
  22. 消息处理失败时告知服务器稍后重新发送消息
  23. 消息处理多次失败,最多会重试18次(18个演示级别),重试时间会越来越长
  24. */
  25. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  26. }
  27. });
  28. //启动
  29. consumer.start();
  30. System.out.println("接收数据");
  31. }
  32. }

延时消息

延时消息的发送使用setDelayTimeLevel(arg);方法,参数为延时的时间级别如下所示:

级别 延时时长
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

创建生产者

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. //1.新建生产者实例
  4. DefaultMQProducer producer = new DefaultMQProducer("producer1");
  5. //2.设置name server地址
  6. producer.setNamesrvAddr("192.168.64.141:9876");
  7. //3.启动生产者,连接服务器
  8. producer.start();
  9. //4.将消息数据封装至Message对象并发送
  10. while (true) {
  11. System.out.print("请输入消息: ");
  12. String s = new Scanner(System.in).nextLine();
  13. /*
  14. topic主题---一级分类
  15. tag标签-----二级分类(可选)
  16. */
  17. //封装至Message对象
  18. Message msg = new Message("Topic1", "TagA", s.getBytes(StandardCharsets.UTF_8));
  19. if (Math.random() < 0.5){
  20. msg.setDelayTimeLevel(3);
  21. System.out.println("这条消息延时10秒");
  22. }
  23. //发送
  24. SendResult result = producer.send(msg);
  25. System.out.println("result==" + result.toString());
  26. }
  27. }
  28. }

消费者

延时消息消费者消费消息与普通消息一致

顺序消息

顺序消息是消息队列RocketMQ版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。

  1. 同一组有序的消息序列,消息序列必须发送至同一个队列,按照FIFO的方式处理
  2. 消费者必须使用同一个线程处理所有消息

顺序消息发送需要借助队列选择其对象进行消息发送.

创建生产者:

  1. public class Producer {
  2. static String[] msgs = {
  3. "15103111039,创建",
  4. "15103111065,创建",
  5. "15103111039,付款",
  6. "15103117235,创建",
  7. "15103111065,付款",
  8. "15103117235,付款",
  9. "15103111065,完成",
  10. "15103111039,推送",
  11. "15103117235,完成",
  12. "15103111039,完成"
  13. };
  14. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  15. //创建生产者实例
  16. DefaultMQProducer producer = new DefaultMQProducer("producer2");
  17. //指定name server 地址
  18. producer.setNamesrvAddr("192.168.64.141:9876");
  19. //启动
  20. producer.start();
  21. //发送消息,设置队列选择器对象
  22. for (String msg : msgs) {
  23. //获取消息id
  24. Long orderId = Long.valueOf(msg.split(",")[0]);
  25. //Topic2可以自动创建,队列数量为4
  26. Message message = new Message("Topic2", msg.getBytes(StandardCharsets.UTF_8));
  27. //参数 1. 消息 2. 消息队列选择器 ,选择依据
  28. SendResult result = producer.send(message, new MessageQueueSelector() {
  29. @Override
  30. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  31. /*
  32. 参数:
  33. 1.服务器端Topic2的队列列表
  34. 2.消息
  35. 3.选择依据
  36. */
  37. Long orderId = (Long) o;
  38. int index = (int) (orderId % list.size());
  39. return list.get(index);
  40. }
  41. }, (Object) orderId);
  42. System.out.println(result);
  43. }
  44. }
  45. }

参数说明:

  1. 发送消息
  • producer.send(Message msg, MessageQueueSelector selector, Object arg)
    • msg – 要发送的消息。
    • selector – 消息队列选择器,通过它我们得到目标消息队列来传递消息。
    • arg – 与消息队列选择器一起使用的参数。
  1. 创建消息队列选择器
  • MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    • mqs—服务器端Topic2的队列列表
    • msg—要发送的消息
    • arg—选择依据

创建消费者:

  1. public class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. //创建消费者
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer2");
  5. //设置name server地址
  6. consumer.setNamesrvAddr("192.168.64.141:9876");
  7. //从Topic2订阅消息
  8. consumer.subscribe("Topic2", "*");
  9. //设置消息监听器
  10. consumer.setMessageListener(new MessageListenerOrderly() {
  11. @Override
  12. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  13. for (MessageExt msg : msgs) {
  14. String s = new String(msg.getBody());
  15. System.out.println(s);
  16. }
  17. return ConsumeOrderlyStatus.SUCCESS;
  18. }
  19. });
  20. //启动
  21. consumer.start();
  22. }
  23. }

事务消息

事务消息的原理

image.png
image.png

下面来看 RocketMQ 的事务消息是如何来发送“可靠消息”的,只需要以下三步:

  1. 发送半消息(半消息不会发送给消费者)
  2. 执行本地事务
  3. 提交消息

image.png
image.png
image.png
完成事务消息发送后,消费者就可以以正常的方式来消费数据。

RocketMQ 的自动重发机制在绝大多数情况下,都可以保证消息被正确消费。

假如消息最终消费失败了,还可以由人工处理进行托底。
image.png

上面分析的是正常情况下的执行流程。下面再来看两种错误情况:

事务执行失败时回滚消息
服务器无法得知消息状态时,需要主动回查消息状态

回滚:
image.png
消息回查:
image.png
image.png

  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

    分布式事务消息的优势

    消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
    RocketMQ - 图10

    交互流程

    事务消息交互流程如下图所示。RocketMQ - 图11
    事务消息发送步骤如下:
  1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对消息发送方即生产者集群中任意一生产者实例发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

示例代码:

  1. public class Producer {
  2. public static void main(String[] args) throws MQClientException {
  3. //新建事务消息生产者
  4. TransactionMQProducer producer = new TransactionMQProducer("producer3");
  5. //设置name server
  6. producer.setNamesrvAddr("192.168.64.141:9876");
  7. //设置事务消息监听器
  8. producer.setTransactionListener(new TransactionListener() {
  9. //执行本地事务
  10. @Override
  11. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  12. if (Math.random() < 1) {
  13. System.out.println("测试本地事务无法告知服务器的情况");
  14. return LocalTransactionState.UNKNOW;
  15. }
  16. if (Math.random() < 0.5) {
  17. //撤回消息
  18. System.out.println("执行本地事务失败,参数:" + arg);
  19. return LocalTransactionState.ROLLBACK_MESSAGE;
  20. }
  21. //告知服务器可以投递消息
  22. System.out.println("执行本地事务成功,参数:" + arg);
  23. return LocalTransactionState.COMMIT_MESSAGE;
  24. /*本地事务执行状态需要存储*/
  25. }
  26. //处理rocketmq的反向回查
  27. @Override
  28. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  29. /*
  30. 查询事务状态,发送给服务器
  31. 直接返回unknown,测试服务器一次次反复回查的操作
  32. */
  33. System.out.println("服务器正在回查事务状态");
  34. return LocalTransactionState.UNKNOW;
  35. }
  36. });
  37. //启动
  38. producer.start();
  39. //发送事务消息,出发监听器执行本地事务(业务)
  40. while (true) {
  41. System.out.println("请输入消息:");
  42. String s = new Scanner(System.in).nextLine();
  43. Message message = new Message("Topic4", s.getBytes(StandardCharsets.UTF_8));
  44. //参数 1. 消息 2. 触发执行本地事务使用的业务参数数据
  45. producer.sendMessageInTransaction(message, "触发执行本地事务使用的业务参数数据");
  46. }
  47. }
  48. }

使用事务消息需要设置事务消息监听器,事务消息监听器接口有两个方法分别为:executeLocalTransaction用于执行本地事务,checkLocalTransaction用于处理RocketMQ服务器的反向回查.

  1. executeLocalTransaction接口有二个参数,分别为Message(消息)和arg执行本地事务的参数,该接口需要返回消息发送状态,
    • LocalTransactionState.ROLLBACK_MESSAGE;表示撤回消息,
    • LocalTransactionState.COMMIT_MESSAGE;表示提交消息
    • LocalTransactionState.UNKNOW;本地事务无法告知服务器
  2. checkLocalTransaction接口参数为Message表示发送的消息,返回值与executeLocalTransaction接口返回值相同
  3. sendMessageInTransaction()方法可以进行事务消息发送,该方法有两个参数,第一个为Message消息,第二个为执行本地事务所需要的参数.