RocketMQ原生API使用

1:项目搭建

**maven

**

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.7.1</version>
  5. </dependency>

其中源码中 example项目有详细的测试代码

生产者消费者需要依赖的是NameServer
多个nameServer分号分隔

2、RocketMQ的编程模型

编码的固定步骤

生产者

  1. 创建生产者 producer,制定生产者组名
  2. 制定nameServer地址
  3. producer启动
  4. 创建消息对象,制定topic, tag ,消息体
  5. 发送消息
  6. 关闭producer

    消费者

  7. 创建consumer,制定消费组名

  8. 制定nameServer地址
  9. 订阅topic, tag
  10. 设置回调函数,处理消息
  11. 启动消费者

3、RocketMQ的消息样例

基本样例

发送方式

1:同步发送

  1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  2. producer.setNamesrvAddr("192.168.232.128:9876");
  3. producer.setNamesrvAddr("localhost:9876");
  4. producer.start();
  5. for (int i = 0; i < 20; i++) {
  6. try {
  7. Message msg = new Message("lite_pull_consumer_test",
  8. "TagA",
  9. "OrderID188",
  10. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //同步传递消息,消息会发给集群中的一个Broker节点。
  12. SendResult sendResult = producer.send(msg);
  13. System.out.printf("%s%n", sendResult);
  14. //返回结果,执行下面的code
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. producer.shutdown();

2:异步发送

  1. DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
  2. producer.setNamesrvAddr("localhost:9876");
  3. producer.start();
  4. producer.setRetryTimesWhenSendAsyncFailed(0);
  5. int messageCount = 100;
  6. //由于是异步发送,这里引入一个countDownLatch,
  7. //保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
  8. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
  9. for (int i = 0; i < messageCount; i++) {
  10. try {
  11. final int index = i;
  12. Message msg = new Message("Jodie_topic_1023",
  13. "TagA",
  14. "OrderID188",
  15. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  16. producer.send(msg, new SendCallback() {
  17. // 成功的回调
  18. @Override
  19. public void onSuccess(SendResult sendResult) {
  20. countDownLatch.countDown();
  21. System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
  22. }
  23. //异常的回调
  24. @Override
  25. public void onException(Throwable e) {
  26. countDownLatch.countDown();
  27. System.out.printf("%-10d Exception %s %n", index, e);
  28. e.printStackTrace();
  29. }
  30. });
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. countDownLatch.await(5, TimeUnit.SECONDS);
  36. producer.shutdown();

3:单向发送

没有结果,没有回调,只管发送出去

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.setNamesrvAddr("localhost:9876");
  3. producer.start();
  4. for (int i = 0; i < 100; i++) {
  5. //Create a message instance, specifying topic, tag and message body.
  6. Message msg = new Message("TopicTest" ,
  7. "TagA" ,
  8. ("Hello RocketMQ " +
  9. i).getBytes(RemotingHelper.DEFAULT_CHARSET)
  10. );
  11. //Call send message to deliver message to one of brokers.
  12. producer.sendOneway(msg);
  13. }
  14. //Wait for sending to complete
  15. Thread.sleep(5000);
  16. producer.shutdown();

消费模式:

1:主动拉模式

  1. //已经删除了,不推荐了
  2. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
  3. consumer.setNamesrvAddr("127.0.0.1:9876");
  4. consumer.start();
  5. Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Jodie_topic_1023");
  6. for (MessageQueue mq : mqs) {
  7. System.out.printf("Consume from the queue: %s%n", mq);
  8. SINGLE_MQ:
  9. while (true) {
  10. try {
  11. PullResult pullResult =
  12. consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
  13. System.out.printf("%s%n", pullResult);
  14. putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
  15. switch (pullResult.getPullStatus()) {
  16. case FOUND:
  17. break;
  18. case NO_MATCHED_MSG:
  19. break;
  20. case NO_NEW_MSG:
  21. break SINGLE_MQ;
  22. case OFFSET_ILLEGAL:
  23. break;
  24. default:
  25. break;
  26. }
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. consumer.shutdown();

2:等待broker推模式push

实际是pull模式封装的

  1. //也已经过期了,替换的类是DefaultLitePullConsumerImpl
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
  3. consumer.setNamesrvAddr("localhost:9876");
  4. consumer.subscribe("lite_pull_consumer_test", "*");
  5. //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  6. //wrong time format 2017_0422_221800
  7. //consumer.setConsumeTimestamp("20181109221800");
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  11. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  13. }
  14. });
  15. consumer.start();
  16. System.out.printf("Consumer Started.%n");

消息类型

1:顺序消息

只保证局部有序,不能保证圈住有序
发送者默认会round robin轮询把消息发送到不同的MessageQueue(分区队列)
消费者也会从messageQueue上拉取,只有一组有序的消息发在同一个MessageQueue才能利用MessageQueue
先进先出保证有序

brokerz中一个队列内的消息是保证有序的
消费者:会从多个消息队列上取消息,所以多个队列上的消息是无序的。
消费者要有序,需要一个队列一个队列的取数据。
消费者注入MessageListenerOrderly 会通过锁队列,一个一个的取消息
MessageListenerConcurrently 不会锁队列,只会随机取一批无法保证顺序

  1. try {
  2. MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  3. producer.start();
  4. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  5. for (int i = 0; i < 100; i++) {
  6. int orderId = i % 10;
  7. Message msg =
  8. new Message("TopicTest", tags[i % tags.length], "KEY" + i,
  9. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  11. @Override
  12. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  13. Integer id = (Integer) arg;
  14. int index = id % mqs.size();
  15. return mqs.get(index);
  16. }
  17. }, orderId);
  18. System.out.printf("%s%n", sendResult);
  19. }
  20. producer.shutdown();
  21. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  22. e.printStackTrace();
  23. }
  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
  2. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  3. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  4. consumer.registerMessageListener(new MessageListenerOrderly() {
  5. AtomicLong consumeTimes = new AtomicLong(0);
  6. @Override
  7. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  8. context.setAutoCommit(true);
  9. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  10. this.consumeTimes.incrementAndGet();
  11. if ((this.consumeTimes.get() % 2) == 0) {
  12. return ConsumeOrderlyStatus.SUCCESS;
  13. } else if ((this.consumeTimes.get() % 3) == 0) {
  14. return ConsumeOrderlyStatus.ROLLBACK;
  15. } else if ((this.consumeTimes.get() % 4) == 0) {
  16. return ConsumeOrderlyStatus.COMMIT;
  17. } else if ((this.consumeTimes.get() % 5) == 0) {
  18. context.setSuspendCurrentQueueTimeMillis(3000);
  19. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  20. }
  21. return ConsumeOrderlyStatus.SUCCESS;
  22. }
  23. });
  24. consumer.start();
  25. System.out.printf("Consumer Started.%n");

2:广播消息

默认是集群状态MessageModel.CLUSTERING: 一个消息只会被消费组中的一个实例消费
广播模式MessageModel.BROADCASTING:消费组里的所有实例都会消费这个消息

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
  2. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  3. consumer.setMessageModel(MessageModel.BROADCASTING);
  4. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  5. consumer.registerMessageListener(new MessageListenerConcurrently() {
  6. @Override
  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  8. ConsumeConcurrentlyContext context) {
  9. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  10. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  11. }
  12. });
  13. consumer.start();
  14. System.out.printf("Broadcast Consumer Started.%n");

3:延迟消息

是等一会
message.setDelayTimeLevel(3); 3:是隔离级别
分别对应:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
开源版本一共18个隔离级别,底层实现是18个队列实现的

  1. public static void main(String[] args) throws Exception {
  2. // Instantiate a producer to send scheduled messages
  3. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  4. // Launch producer
  5. producer.start();
  6. int totalMessagesToSend = 100;
  7. for (int i = 0; i < totalMessagesToSend; i++) {
  8. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  9. // This message will be delivered to consumer 10 seconds later.
  10. message.setDelayTimeLevel(3);
  11. // Send the message
  12. producer.send(message);
  13. }
  14. // Shutdown producer after use.
  15. producer.shutdown();
  16. }

4:批量消息

把多个消息一次发送出去,减少IO
如果批量消息大于1M就不要一个批次了,可以拆分
实际情况是不要大于4M,需要是同一个topic的消息,等

  1. DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
  2. producer.start();
  3. //If you just send messages of no more than 1MiB at a time, it is easy to use batch
  4. //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
  5. String topic = "BatchTest";
  6. List<Message> messages = new ArrayList<>();
  7. messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
  8. messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
  9. messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
  10. producer.send(messages);

5:过滤消息

message的tag属性简单快速过滤消息

tag过滤

生产者:

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.start();
  3. String[] tags = new String[] {"TagA", "TagB", "TagC"};
  4. for (int i = 0; i < 60; i++) {
  5. Message msg = new Message("TagFilterTest",
  6. tags[i % tags.length],
  7. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  8. SendResult sendResult = producer.send(msg);
  9. System.out.printf("%s%n", sendResult);
  10. }
  11. producer.shutdown();

消费者:
RocketMQ最佳实践 ,一个应用可以用一个Topic,不同的业务用tag区分

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  2. //订阅 tagA tagC的消息
  3. consumer.subscribe("TagFilterTest", "TagA || TagC");
  4. consumer.registerMessageListener(new MessageListenerConcurrently() {
  5. @Override
  6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  7. ConsumeConcurrentlyContext context) {
  8. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  10. }
  11. });
  12. consumer.start();
  13. System.out.printf("Consumer Started.%n");

SQL过滤

s生产者,没啥区别的

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.start();
  3. String[] tags = new String[] {"TagA", "TagB", "TagC"};
  4. for (int i = 0; i < 10; i++) {
  5. Message msg = new Message("SqlFilterTest",
  6. tags[i % tags.length],
  7. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
  8. );
  9. msg.putUserProperty("a", String.valueOf(i));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. }
  13. producer.shutdown();

消费者
使用MessageSelector.bySql
按照SQL92标准,
SQL可以使用的tage和生产者加入的属性

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  2. // Don't forget to set enablePropertyFilter=true in broker
  3. consumer.subscribe("SqlFilterTest",
  4. MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
  5. "and (a is not null and a between 0 and 3)"));
  6. consumer.registerMessageListener(new MessageListenerConcurrently() {
  7. @Override
  8. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  9. ConsumeConcurrentlyContext context) {
  10. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }
  13. });
  14. consumer.start();
  15. System.out.printf("Consumer Started.%n");

SQL92语法

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

6:事务消息

image.png
这是事务消息被分成了两个事务:发送时,消费时
只能保证发送消息与本地事务的两个操作的原子性
类: TransactionMQProducer,TransactionListener

  1. TransactionListener transactionListener = new TransactionListenerImpl();
  2. TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
  3. producer.setNamesrvAddr("127.0.0.1:9876");
  4. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  5. @Override
  6. public Thread newThread(Runnable r) {
  7. Thread thread = new Thread(r);
  8. thread.setName("client-transaction-msg-check-thread");
  9. return thread;
  10. }
  11. });
  12. producer.setExecutorService(executorService);
  13. producer.setTransactionListener(transactionListener);
  14. producer.start();
  15. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  16. for (int i = 0; i < 10; i++) {
  17. try {
  18. Message msg =
  19. new Message("TestTopic", tags[i % tags.length], "KEY" + i,
  20. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  21. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  22. System.out.printf("%s%n", sendResult);
  23. Thread.sleep(10);
  24. } catch (MQClientException | UnsupportedEncodingException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. for (int i = 0; i < 100000; i++) {
  29. Thread.sleep(1000);
  30. }
  31. producer.shutdown();

需要有个监听的实现

  1. class TransactionListenerImpl implements TransactionListener {
  2. private AtomicInteger transactionIndex = new AtomicInteger(0);
  3. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  4. //half消息发送成功后回调此方法,执行本地事务
  5. @Override
  6. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  7. String tags = msg.getTags();
  8. //do 本地事务,本地数据库的操作
  9. boolean isSuccess = true;
  10. //本地事务crud成功,就提交消息
  11. if(isSuccess){
  12. //会立刻被消费者消费
  13. return LocalTransactionState.COMMIT_MESSAGE;
  14. }else if(!isSuccess){
  15. //如果本地事务失败,就回滚mq消息
  16. return LocalTransactionState.ROLLBACK_MESSAGE;
  17. }else{
  18. return LocalTransactionState.UNKNOW;
  19. }
  20. }
  21. //COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
  22. @Override
  23. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  24. //回调,查询本地事务的状态吧,算是对上面的一种补充
  25. Integer status = localTrans.get(msg.getTransactionId());
  26. if (null != status) {
  27. switch (status) {
  28. case 0:
  29. return LocalTransactionState.UNKNOW;
  30. case 1:
  31. return LocalTransactionState.COMMIT_MESSAGE;
  32. case 2:
  33. return LocalTransactionState.ROLLBACK_MESSAGE;
  34. default:
  35. return LocalTransactionState.COMMIT_MESSAGE;
  36. }
  37. }
  38. return LocalTransactionState.COMMIT_MESSAGE;
  39. }
  40. }

不支持延迟消息,不支持批量消息
单个消息检查的次数是15次,可以在broker tansactionCheckMax配置
默认超过max会丢弃,也是重写AbstractTransactionCheckListener改变行为
实际检查的次数会在messge保持的用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES

配置文件中transactionMsgTimeout 指定特定时间后被检查
可以在用户属性CHECK_IMMUNITY_TIME_IN_SECONDS 修改时间限制
BrokerConfig.transactionTimeOut参数配置,默认6秒,可在broker.conf配置

msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”);
设置未为10s

事务消息可能不止一次被检查消费

建议使用双重写入机制
事务机制的顺序图:
image.png

在发送half办消息时:其实是吧消息存入 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic
对消费者不可见,最终提交才会转存到对应的topic

4:ACL权限控制

文档说明: docs/cn/acl/user_guide.md
主要在broker.conf配置,
打开acl标志:aclEnable=true
plain_acl.yml进行权限配置,热加载不需要重启

全局白名单,不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.
- 192.168.0.

accounts:
#第一个账户
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY #默认Topic访问策略是拒绝
defaultGroupPerm: SUB #默认Group访问策略是只允许订阅
topicPerms:
- topicA=DENY #topicA拒绝
- topicB=PUB|SUB #topicB允许发布和订阅消息
- topicC=SUB #topicC只允许订阅
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
#第二个账户,只要是来自192.168.1.的IP,就可以访问所有资源
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.

# if it is admin, it could access all resources
admin: true

  1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
  2. producer.setNamesrvAddr("127.0.0.1:9876");
  3. producer.start();
  4. producer.shutdown();
  5. //设置用户和密码
  6. static RPCHook getAclRPCHook() {
  7. return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
  8. }

源码:

rocketmq-all-4.7.1-source-release-带注释.zip