消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
两者区别
拉模式是主动 , Consumer主动去拉消息
推模式是被动, Consumer被动的接收消息.
通常情况下,用推模式比较简单。 实际上RocketMQ的推模式也是由拉模式封装出来的。 4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
拉模式
Consumer主动发送请求到MQ去拉取消息
老版本
需要自己管理Offset偏移量,不建议使用.
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;import org.apache.rocketmq.client.consumer.PullResult;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageQueue;import java.util.HashMap;import java.util.Map;import java.util.Set;/*** 拉模式* DefaultMQPullConsumer 这个已经过时了,*/public class PullConsumer {/*** 偏移量队列Map*/private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");consumer.start();//从 topic拿的MessageQueues集合 ,这个MessageQueues是生产者发送消息和消费者接收消息订阅的最小单位Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {//拉取消息PullResult pullResult =/*参数1 : mq:其中一个队列*参数2 : subExpression: tag的过滤,如果为null就是表示不过滤* 参数3 : 队列偏移量指针, 这个是消费位置* 参数4 :最大拉取多少条消息, 32就是 最大拉取32条消息*/consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}/*** 获取队列的偏移量** @param mq 队列* @return 偏移量*/private static long getMessageQueueOffset(MessageQueue mq) {Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}/*** 将偏移量设置到队列上** @param mq 队列* @param offset 偏移量*/private static void putMessageQueueOffset(MessageQueue mq, long offset) {OFFSE_TABLE.put(mq, offset);}}
新版本
优先使用新版本 新版本也可以手动管理偏移量.下面案例是自动偏移量的不需要咱们管理
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 拉模式* 这种模式不用管Offset ,优先推荐使用这个*/public class LitePullConsumerSubscribe {public static volatile boolean running = true;public static void main(String[] args) throws Exception {DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);litePullConsumer.subscribe("TopicTest", "*");litePullConsumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");litePullConsumer.start();try {//循环获取while (running) {List<MessageExt> messageExts = litePullConsumer.poll();//打印拉取结果System.out.printf("%s%n", messageExts);}} finally {litePullConsumer.shutdown();}}}
推模式
相当于被动的方式,由Broker收到消息之后主动推送给消费者.
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 推模式*/public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");/***参数2表示按tag过滤的过滤表达式,如果* 代表接收所有的tag*/consumer.subscribe("TopicTest", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeTimestamp("20181109221800");/*** 注册了消息监听*/consumer.registerMessageListener(new MessageListenerConcurrently() {/*** 消费消息 ,这里的消息是Broker主动往这里推的* @param msgs* @param context* @return*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//这里是执行业务逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 返回给Broker状态, 表明消费者消费是否成功// CONSUME_SUCCESS: 表示消费成功// RECONSUME_LATER:表示消费失败,重新消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
输出:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=3, storeSize=214, queueOffset=396, sysFlag=0, bornTimestamp=1634462026024, bornHost=/172.16.10.1:58143, storeTimestamp=1634462054202, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F000000000009AA51, commitLogOffset=633425, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=397, KEYS=OrderID188, CONSUME_START_TIME=1634462026031, UNIQ_KEY=AC100A0130FC18B4AAC2561831280001, CLUSTER=rocketmq-cluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
生产者代码
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 简单样例:单向发送* Producer通过NameServer找到broker,把消息发送给broker,* 然后broker把消息推送给了消费者*/public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();for (int i = 0; i < 20; i++)try {{Message msg = new Message("TopicTest", // 发送的topic"TagA", //tags"OrderID188", // keys3"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}
