1. 消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

两者区别

拉模式是主动 , Consumer主动去拉消息
推模式是被动, Consumer被动的接收消息.

通常情况下,用推模式比较简单。 实际上RocketMQ的推模式也是由拉模式封装出来的。 4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

拉模式

Consumer主动发送请求到MQ去拉取消息

老版本

需要自己管理Offset偏移量,不建议使用.

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
  3. import org.apache.rocketmq.client.consumer.PullResult;
  4. import org.apache.rocketmq.client.exception.MQClientException;
  5. import org.apache.rocketmq.common.message.MessageQueue;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.Set;
  9. /**
  10. * 拉模式
  11. * DefaultMQPullConsumer 这个已经过时了,
  12. */
  13. public class PullConsumer {
  14. /**
  15. * 偏移量队列Map
  16. */
  17. private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
  18. public static void main(String[] args) throws MQClientException {
  19. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
  20. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  21. consumer.start();
  22. //从 topic拿的MessageQueues集合 ,这个MessageQueues是生产者发送消息和消费者接收消息订阅的最小单位
  23. Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
  24. for (MessageQueue mq : mqs) {
  25. System.out.printf("Consume from the queue: %s%n", mq);
  26. SINGLE_MQ:
  27. while (true) {
  28. try {
  29. //拉取消息
  30. PullResult pullResult =
  31. /*参数1 : mq:其中一个队列
  32. *参数2 : subExpression: tag的过滤,如果为null就是表示不过滤
  33. * 参数3 : 队列偏移量指针, 这个是消费位置
  34. * 参数4 :最大拉取多少条消息, 32就是 最大拉取32条消息
  35. */
  36. consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
  37. System.out.printf("%s%n", pullResult);
  38. putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
  39. switch (pullResult.getPullStatus()) {
  40. case FOUND:
  41. break;
  42. case NO_MATCHED_MSG:
  43. break;
  44. case NO_NEW_MSG:
  45. break SINGLE_MQ;
  46. case OFFSET_ILLEGAL:
  47. break;
  48. default:
  49. break;
  50. }
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. }
  56. consumer.shutdown();
  57. }
  58. /**
  59. * 获取队列的偏移量
  60. *
  61. * @param mq 队列
  62. * @return 偏移量
  63. */
  64. private static long getMessageQueueOffset(MessageQueue mq) {
  65. Long offset = OFFSE_TABLE.get(mq);
  66. if (offset != null)
  67. return offset;
  68. return 0;
  69. }
  70. /**
  71. * 将偏移量设置到队列上
  72. *
  73. * @param mq 队列
  74. * @param offset 偏移量
  75. */
  76. private static void putMessageQueueOffset(MessageQueue mq, long offset) {
  77. OFFSE_TABLE.put(mq, offset);
  78. }
  79. }

新版本

优先使用新版本 新版本也可以手动管理偏移量.下面案例是自动偏移量的不需要咱们管理

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
  3. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  4. import org.apache.rocketmq.common.message.MessageExt;
  5. import java.util.List;
  6. /**
  7. * 拉模式
  8. * 这种模式不用管Offset ,优先推荐使用这个
  9. */
  10. public class LitePullConsumerSubscribe {
  11. public static volatile boolean running = true;
  12. public static void main(String[] args) throws Exception {
  13. DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
  14. litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  15. litePullConsumer.subscribe("TopicTest", "*");
  16. litePullConsumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  17. litePullConsumer.start();
  18. try {
  19. //循环获取
  20. while (running) {
  21. List<MessageExt> messageExts = litePullConsumer.poll();
  22. //打印拉取结果
  23. System.out.printf("%s%n", messageExts);
  24. }
  25. } finally {
  26. litePullConsumer.shutdown();
  27. }
  28. }
  29. }

推模式

相当于被动的方式,由Broker收到消息之后主动推送给消费者.

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. /**
  11. * 推模式
  12. */
  13. public class PushConsumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
  16. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  17. /**
  18. *参数2表示按tag过滤的过滤表达式,如果* 代表接收所有的tag
  19. */
  20. consumer.subscribe("TopicTest", "*");
  21. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  22. consumer.setConsumeTimestamp("20181109221800");
  23. /**
  24. * 注册了消息监听
  25. */
  26. consumer.registerMessageListener(new MessageListenerConcurrently() {
  27. /**
  28. * 消费消息 ,这里的消息是Broker主动往这里推的
  29. * @param msgs
  30. * @param context
  31. * @return
  32. */
  33. @Override
  34. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  35. //这里是执行业务逻辑
  36. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  37. // 返回给Broker状态, 表明消费者消费是否成功
  38. // CONSUME_SUCCESS: 表示消费成功
  39. // RECONSUME_LATER:表示消费失败,重新消费
  40. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  41. }
  42. });
  43. consumer.start();
  44. System.out.printf("Consumer Started.%n");
  45. }
  46. }

输出:

  1. ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=3, storeSize=214, queueOffset=396, sysFlag=0, bornTimestamp=1634462026024, bornHost=/172.16.10.1:58143, storeTimestamp=1634462054202, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F000000000009AA51, commitLogOffset=633425, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=397, KEYS=OrderID188, CONSUME_START_TIME=1634462026031, UNIQ_KEY=AC100A0130FC18B4AAC2561831280001, CLUSTER=rocketmq-cluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]

生产者代码

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. /**
  7. * 简单样例:单向发送
  8. * Producer通过NameServer找到broker,把消息发送给broker,
  9. * 然后broker把消息推送给了消费者
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws MQClientException, InterruptedException {
  13. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  14. //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
  15. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. producer.start();
  17. for (int i = 0; i < 20; i++)
  18. try {
  19. {
  20. Message msg = new Message("TopicTest", // 发送的topic
  21. "TagA", //tags
  22. "OrderID188", // keys3
  23. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
  24. );
  25. //同步传递消息,消息会发给集群中的一个Broker节点。
  26. //这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
  27. //不知道消息是否发送成功,反正Producer发送完了就不管了 .
  28. producer.sendOneway(msg);
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. producer.shutdown();
  34. }
  35. }