前言

这里是官方文档的介绍部分,省略了不少内容,官网链接:http://rocketmq.apache.org/docs/motivation/

中文的翻译可以从这里查询到:《Apache RocketMQ用户指南》官方文档

Quick Start

Download & Build from Release

  1. > unzip rocketmq-all-4.7.0-source-release.zip
  2. > cd rocketmq-all-4.7.0/
  3. > mvn -Prelease-all -DskipTests clean install -U
  4. > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

Start Name Server

  1. > nohup sh bin/mqnamesrv &
  2. > tail -f ~/logs/rocketmqlogs/namesrv.log
  3. The Name Server boot success...

Start Broker

  1. > nohup sh bin/mqbroker -n localhost:9876 &
  2. > tail -f ~/logs/rocketmqlogs/broker.log
  3. The broker[%s, 172.30.30.233:10911] boot success...

Send & Receive Messages

在发送和接受消息之前,首先需要给客户端提供name servers地址,可以使用环境变量 NAMESRV_ADDR

  1. > export NAMESRV_ADDR=localhost:9876
  2. > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  3. SendResult [sendStatus=SEND_OK, msgId= ...
  4. > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
  5. ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Shutdown Servers

  1. > sh bin/mqshutdown broker
  2. The mqbroker(36695) is running...
  3. Send shutdown request to mqbroker(36695) OK
  4. > sh bin/mqshutdown namesrv
  5. The mqnamesrv(36664) is running...
  6. Send shutdown request to mqnamesrv(36664) OK

Simple Message Example

rocketmq通过三种方式来发送RocketMQ消息使用: 可靠的同步发送, 可靠的异步发送和单向传输。

Add Dependency

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>

Send Messages Synchronously

可靠的同步传输:可靠的同步传输广泛应用于重要通知消息,短信通知,短信营销系统等..

  1. public class SyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. DefaultMQProducer producer = new
  5. DefaultMQProducer("please_rename_unique_group_name");
  6. // Specify name server addresses.
  7. producer.setNamesrvAddr("localhost:9876");
  8. //Launch the instance.
  9. producer.start();
  10. for (int i = 0; i < 100; i++) {
  11. //Create a message instance, specifying topic, tag and message body.
  12. Message msg = new Message("TopicTest" /* Topic */,
  13. "TagA" /* Tag */,
  14. ("Hello RocketMQ " +
  15. i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  16. );
  17. //Call send message to deliver message to one of brokers.
  18. SendResult sendResult = producer.send(msg);
  19. System.out.printf("%s%n", sendResult);
  20. }
  21. //Shut down once the producer instance is not longer in use.
  22. producer.shutdown();
  23. }
  24. }

Send Messages Asynchronously

可靠的异步传输应用:异步传输通常用于响应时间敏感的业务场景。

  1. public class AsyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // Specify name server addresses.
  6. producer.setNamesrvAddr("localhost:9876");
  7. //Launch the instance.
  8. producer.start();
  9. producer.setRetryTimesWhenSendAsyncFailed(0);
  10. for (int i = 0; i < 100; i++) {
  11. final int index = i;
  12. //Create a message instance, specifying topic, tag and message body.
  13. Message msg = new Message("TopicTest",
  14. "TagA",
  15. "OrderID188",
  16. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  17. producer.send(msg, new SendCallback() {
  18. @Override
  19. public void onSuccess(SendResult sendResult) {
  20. System.out.printf("%-10d OK %s %n", index,
  21. sendResult.getMsgId());
  22. }
  23. @Override
  24. public void onException(Throwable e) {
  25. System.out.printf("%-10d Exception %s %n", index, e);
  26. e.printStackTrace();
  27. }
  28. });
  29. }
  30. //Shut down once the producer instance is not longer in use.
  31. producer.shutdown();
  32. }
  33. }

Send Messages in One-way Mode

单向传输应用:单向传输用于需要中等可靠性的情况,例如日志收集.

  1. public class OnewayProducer {
  2. public static void main(String[] args) throws Exception{
  3. //Instantiate with a producer group name.
  4. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  5. // Specify name server addresses.
  6. producer.setNamesrvAddr("localhost:9876");
  7. //Launch the instance.
  8. producer.start();
  9. for (int i = 0; i < 100; i++) {
  10. //Create a message instance, specifying topic, tag and message body.
  11. Message msg = new Message("TopicTest" /* Topic */,
  12. "TagA" /* Tag */,
  13. ("Hello RocketMQ " +
  14. i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  15. );
  16. //Call send message to deliver message to one of brokers.
  17. producer.sendOneway(msg);
  18. }
  19. //Shut down once the producer instance is not longer in use.
  20. producer.shutdown();
  21. }
  22. }

Order Message

RocketMQ使用FIFO队列提供有序消息. 以下示例演示发送/接收全局和分区有序消息。

Send message sample code

  1. public class OrderedProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. MQProducer producer = new DefaultMQProducer("example_group_name");
  5. //Launch the instance.
  6. producer.start();
  7. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  8. for (int i = 0; i < 100; i++) {
  9. int orderId = i % 10;
  10. //Create a message instance, specifying topic, tag and message body.
  11. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
  12. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  13. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  14. @Override
  15. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  16. Integer id = (Integer) arg;
  17. int index = id % mqs.size();
  18. return mqs.get(index);
  19. }
  20. }, orderId);
  21. System.out.printf("%s%n", sendResult);
  22. }
  23. //server shutdown
  24. producer.shutdown();
  25. }
  26. }

Subscription message sample code

  1. public class OrderedConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. AtomicLong consumeTimes = new AtomicLong(0);
  8. @Override
  9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeOrderlyContext context) {
  11. context.setAutoCommit(false);
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. this.consumeTimes.incrementAndGet();
  14. if ((this.consumeTimes.get() % 2) == 0) {
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. } else if ((this.consumeTimes.get() % 3) == 0) {
  17. return ConsumeOrderlyStatus.ROLLBACK;
  18. } else if ((this.consumeTimes.get() % 4) == 0) {
  19. return ConsumeOrderlyStatus.COMMIT;
  20. } else if ((this.consumeTimes.get() % 5) == 0) {
  21. context.setSuspendCurrentQueueTimeMillis(3000);
  22. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  23. }
  24. return ConsumeOrderlyStatus.SUCCESS;
  25. }
  26. });
  27. consumer.start();
  28. System.out.printf("Consumer Started.%n");
  29. }
  30. }

Broadcasting

广播是向所有用户发送消息。 如果您希望所有订阅者都能收到有关某个主题的消息,则广播是一个不错的选择。

Producer example

  1. public class BroadcastProducer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  4. producer.start();
  5. for (int i = 0; i < 100; i++){
  6. Message msg = new Message("TopicTest",
  7. "TagA",
  8. "OrderID188",
  9. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. }
  13. producer.shutdown();
  14. }
  15. }

Consumer example

  1. public class BroadcastConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. //set to broadcast mode
  6. consumer.setMessageModel(MessageModel.BROADCASTING);
  7. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  11. ConsumeConcurrentlyContext context) {
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  14. }
  15. });
  16. consumer.start();
  17. System.out.printf("Broadcast Consumer Started.%n");
  18. }
  19. }

Schedule example

定时消息与正常消息的不同之处在于,它是在指定的时间后执行。

Application

启动消费者等待传入的订阅消息

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import java.util.List;
  7. public class ScheduledMessageConsumer {
  8. public static void main(String[] args) throws Exception {
  9. // Instantiate message consumer
  10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
  11. // Subscribe topics
  12. consumer.subscribe("TestTopic", "*");
  13. // Register message listener
  14. consumer.registerMessageListener(new MessageListenerConcurrently() {
  15. @Override
  16. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
  17. for (MessageExt message : messages) {
  18. // Print approximate delay time period
  19. System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
  20. + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
  21. }
  22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  23. }
  24. });
  25. // Launch consumer
  26. consumer.start();
  27. }
  28. }

Send scheduled messages

  1. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  2. import org.apache.rocketmq.common.message.Message;
  3. public class ScheduledMessageProducer {
  4. public static void main(String[] args) throws Exception {
  5. // Instantiate a producer to send scheduled messages
  6. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  7. // Launch producer
  8. producer.start();
  9. int totalMessagesToSend = 100;
  10. for (int i = 0; i < totalMessagesToSend; i++) {
  11. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  12. // This message will be delivered to consumer 10 seconds later.
  13. message.setDelayTimeLevel(3);
  14. // Send the message
  15. producer.send(message);
  16. }
  17. // Shutdown producer after use.
  18. producer.shutdown();
  19. }
  20. }

这里消息的延迟是从message里面设置的

Batch Example

批量发送消息可提高单次发送消息的性能. 相同批次的消息应具有:相同的主题,相同的等待消息处理成功但是不支持定时处理. 此外,一个批量的消息的总大小不要错过1MB.

How to use batch

如果您一次只发送不超过1MB的消息,使用批量发送很方便:

  1. String topic = "BatchTest";
  2. List<Message> messages = new ArrayList<>();
  3. messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
  4. messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
  5. messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
  6. try {
  7. producer.send(messages);
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. //handle the error
  11. }

Split into lists

只有在发送大批量时才会增加复杂性,并且您可能不确定是否超出了大小限制(1MiB)。你最好分开列表:

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1000 * 1000;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override public boolean hasNext() {
  9. return currIndex < messages.size();
  10. }
  11. @Override public List<Message> next() {
  12. int nextIndex = currIndex;
  13. int totalSize = 0;
  14. for (; nextIndex < messages.size(); nextIndex++) {
  15. Message message = messages.get(nextIndex);
  16. int tmpSize = message.getTopic().length() + message.getBody().length;
  17. Map<String, String> properties = message.getProperties();
  18. for (Map.Entry<String, String> entry : properties.entrySet()) {
  19. tmpSize += entry.getKey().length() + entry.getValue().length();
  20. }
  21. tmpSize = tmpSize + 20; //for log overhead
  22. if (tmpSize > SIZE_LIMIT) {
  23. //it is unexpected that single message exceeds the SIZE_LIMIT
  24. //here just let it go, otherwise it will block the splitting process
  25. if (nextIndex - currIndex == 0) {
  26. //if the next sublist has no element, add this one and then break, otherwise just break
  27. nextIndex++;
  28. }
  29. break;
  30. }
  31. if (tmpSize + totalSize > SIZE_LIMIT) {
  32. break;
  33. } else {
  34. totalSize += tmpSize;
  35. }
  36. }
  37. List<Message> subList = messages.subList(currIndex, nextIndex);
  38. currIndex = nextIndex;
  39. return subList;
  40. }
  41. }
  42. //then you could split the large list into small ones:
  43. ListSplitter splitter = new ListSplitter(messages);
  44. while (splitter.hasNext()) {
  45. try {
  46. List<Message> listItem = splitter.next();
  47. producer.send(listItem);
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. //handle the error
  51. }
  52. }

Filter Example

消息过滤,在大多数情况下,tag是一种简单而有用的设计,用于选择所需的信息。 消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,您可以使用SQL表达式筛选出消息.

simple tags

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
  2. consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Principle

原则:SQL功能可以通过您在发送消息时放入的属性进行一些计算。 在RocketMQ定义的语法下,您可以实现一些有趣的逻辑。 这是一个例子:
image.png

Grammars

RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

  1. 数字比较, 像 >, >=, <, <=, BETWEEN, =;
  2. 字符比较, 像 =, <>, IN;
  3. IS NULL 或者 IS NOT NULL;
  4. 逻辑运算AND, OR, NOT;

常量类型是:

  1. 数字, 像123, 3.1415;
  2. 字符串, 像‘abc’,必须使用单引号;
  3. NULL, 特殊常数;
  4. 布尔常量, TRUEFALSE;

Usage constraints

只有消费者可以通过SQL92选择消息。对应的接口是:

  1. public void subscribe(final String topic, final MessageSelector messageSelector)

Producer example

发送时,您可以通过putUserProperty方法在消息中放置属性.

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.start();
  3. Message msg = new Message("TopicTest",
  4. tag,
  5. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
  6. );
  7. // Set some properties.
  8. msg.putUserProperty("a", String.valueOf(i));
  9. SendResult sendResult = producer.send(msg);
  10. producer.shutdown();

Consumer example

消费时,使用Message Selector.by Sql通过SQL92选择消息.

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
  2. // only subsribe messages have property a, also a >=0 and a <= 3
  3. consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
  4. consumer.registerMessageListener(new MessageListenerConcurrently() {
  5. @Override
  6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  7. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  8. }
  9. });
  10. consumer.start();

Logappender Example

日志追加示例:RocketMQ logappender提供log4j appender,log4j2 appender和logback appender供业务使用,下面是配置示例.

log4j

  1. log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
  2. log4j.appender.mq.Tag=yourTag
  3. log4j.appender.mq.Topic=yourLogTopic
  4. log4j.appender.mq.ProducerGroup=yourLogGroup
  5. log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
  6. log4j.appender.mq.layout=org.apache.log4j.PatternLayout
  7. log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

如果使用log4j xml配置文件时,将其配置为此,并添加一个异步appender:

  1. <appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
  2. <param name="Tag" value="yourTag" />
  3. <param name="Topic" value="yourLogTopic" />
  4. <param name="ProducerGroup" value="yourLogGroup" />
  5. <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
  6. <layout class="org.apache.log4j.PatternLayout">
  7. <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
  8. </layout>
  9. </appender>
  10. <appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
  11. <param name="BufferSize" value="1024" />
  12. <param name="Blocking" value="false" />
  13. <appender-ref ref="mqAppender1"/>
  14. </appender>

log4j2

当使用log4j2时,config为这个。如果你想要noneblock,只需要为ref配置一个asyncAppender.

  1. <RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
  2. topic="yourLogTopic" tag="yourTag">
  3. <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
  4. </RocketMQ>

logback

在使用logback时,还需要一个asyncAppender.

  1. <appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
  2. <tag>yourTag</tag>
  3. <topic>yourLogTopic</topic>
  4. <producerGroup>yourLogGroup</producerGroup>
  5. <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
  6. <layout>
  7. <pattern>%date %p %t - %m%n</pattern>
  8. </layout>
  9. </appender>
  10. <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
  11. <queueSize>1024</queueSize>
  12. <discardingThreshold>80</discardingThreshold>
  13. <maxFlushTime>2000</maxFlushTime>
  14. <neverBlock>true</neverBlock>
  15. <appender-ref ref="mqAppender1"/>
  16. </appender>

OpenMessaging Example

OpenMessaging,其中包括建立行业准则和消息传递,流式规范,为金融,电子商务,物联网和大数据领域提供通用框架。 设计原则是分布式异构环境中面向云,简单,灵活和独立于语言的设计原则。 符合这些规范将使在所有主要平台和操作系统上开发异构消息传递应用成为可能。
这里暂时用不到,就不举例了,原文如下:http://rocketmq.apache.org/docs/openmessaging-example/

Transaction example

What is transactional message?

它可以看作是两阶段提交消息实现,以确保分布式系统中的最终一致性。事务消息确保本地事务的执行和消息的发送可以原子化地执行。

Usage Constraint

使用约束

  1. 事务性的消息没有shedule计划延迟之类的,也没有批处理的支持。
  2. 为了避免单个消息被检查太多次,并导致半队列消息累积,我们将默认情况下对单个消息的检查次数限制为15次,但是用户可以通过更改代理配置中的“transactionCheckMax”参数来更改此限制(如果已检查了一条消息)“transactionCheckMax”次,默认情况下,broker将丢弃此消息并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。
  3. 事务性消息将在由代理配置中的参数“transactionTimeout”确定的特定时间段后进行检查。用户也可以在发送事务性消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制,此参数优先于“transactionMsgTimeout”参数。
  4. 事务性消息可能被多次检查或使用。
  5. 向用户的目的地Topic提交的消息可能会失败。目前这取决于日志记录。RocketMQ本身的高可用机制保证了高可用性。如果要确保事务消息不会丢失,并且保证事务完整性,建议使用同步双写。机制。
  6. 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ服务器按其生产者id查询客户机。

    Application

    1、 事务状态
    事务性消息有三种状态:

  7. TransactionStatus.CommitTransaction:commit transaction,表示允许消费者消费此消息。

  8. TransactionStatus.RollbackTransaction:回滚事务,表示消息将被删除,不允许消费。
  9. TransactionStatus.Unknown:中间状态,这意味着需要MQ进行检查以确定状态。

2、 发送事务性消息
(1) 创建事务生产者
使用TransactionMQProducer类创建producer客户机,并指定唯一的producerGroup,然后可以设置自定义线程池来处理检查请求。在执行本地事务后,需要根据执行结果回复MQ,回复状态在上一节描述。

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.common.message.MessageExt;
  6. import java.util.List;
  7. public class TransactionProducer {
  8. public static void main(String[] args) throws MQClientException, InterruptedException {
  9. TransactionListener transactionListener = new TransactionListenerImpl();
  10. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
  11. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  12. @Override
  13. public Thread newThread(Runnable r) {
  14. Thread thread = new Thread(r);
  15. thread.setName("client-transaction-msg-check-thread");
  16. return thread;
  17. }
  18. });
  19. producer.setExecutorService(executorService);
  20. producer.setTransactionListener(transactionListener);
  21. producer.start();
  22. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  23. for (int i = 0; i < 10; i++) {
  24. try {
  25. Message msg =
  26. new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
  27. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  28. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  29. System.out.printf("%s%n", sendResult);
  30. Thread.sleep(10);
  31. } catch (MQClientException | UnsupportedEncodingException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. for (int i = 0; i < 100000; i++) {
  36. Thread.sleep(1000);
  37. }
  38. producer.shutdown();
  39. }
  40. }

(2) 实现TransactionListener接口
executeLocalTransaction”方法用于在发送半消息成功时执行本地事务。它返回上一节提到的三种事务状态之一。
checkLocalTransaction”方法用于检查本地事务状态并响应MQ检查请求。它还返回上一节中提到的三种事务状态之一。

  1. import ...
  2. public class TransactionListenerImpl implements TransactionListener {
  3. private AtomicInteger transactionIndex = new AtomicInteger(0);
  4. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  5. @Override
  6. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  7. int value = transactionIndex.getAndIncrement();
  8. int status = value % 3;
  9. localTrans.put(msg.getTransactionId(), status);
  10. return LocalTransactionState.UNKNOW;
  11. }
  12. @Override
  13. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  14. Integer status = localTrans.get(msg.getTransactionId());
  15. if (null != status) {
  16. switch (status) {
  17. case 0:
  18. return LocalTransactionState.UNKNOW;
  19. case 1:
  20. return LocalTransactionState.COMMIT_MESSAGE;
  21. case 2:
  22. return LocalTransactionState.ROLLBACK_MESSAGE;
  23. }
  24. }
  25. return LocalTransactionState.COMMIT_MESSAGE;
  26. }
  27. }