消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
两者区别
拉模式是主动 , Consumer主动去拉消息
推模式是被动, Consumer被动的接收消息.
通常情况下,用推模式比较简单。 实际上RocketMQ的推模式也是由拉模式封装出来的。 4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
拉模式
Consumer主动发送请求到MQ去拉取消息
老版本
需要自己管理Offset偏移量,不建议使用.
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* 拉模式
* DefaultMQPullConsumer 这个已经过时了,
*/
public class PullConsumer {
/**
* 偏移量队列Map
*/
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
consumer.start();
//从 topic拿的MessageQueues集合 ,这个MessageQueues是生产者发送消息和消费者接收消息订阅的最小单位
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
//拉取消息
PullResult pullResult =
/*参数1 : mq:其中一个队列
*参数2 : subExpression: tag的过滤,如果为null就是表示不过滤
* 参数3 : 队列偏移量指针, 这个是消费位置
* 参数4 :最大拉取多少条消息, 32就是 最大拉取32条消息
*/
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
/**
* 获取队列的偏移量
*
* @param mq 队列
* @return 偏移量
*/
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
/**
* 将偏移量设置到队列上
*
* @param mq 队列
* @param offset 偏移量
*/
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}
新版本
优先使用新版本 新版本也可以手动管理偏移量.下面案例是自动偏移量的不需要咱们管理
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 拉模式
* 这种模式不用管Offset ,优先推荐使用这个
*/
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
litePullConsumer.start();
try {
//循环获取
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
//打印拉取结果
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
推模式
相当于被动的方式,由Broker收到消息之后主动推送给消费者.
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 推模式
*/
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
/**
*参数2表示按tag过滤的过滤表达式,如果* 代表接收所有的tag
*/
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
/**
* 注册了消息监听
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 消费消息 ,这里的消息是Broker主动往这里推的
* @param msgs
* @param context
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//这里是执行业务逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回给Broker状态, 表明消费者消费是否成功
// CONSUME_SUCCESS: 表示消费成功
// RECONSUME_LATER:表示消费失败,重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
输出:
ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=3, storeSize=214, queueOffset=396, sysFlag=0, bornTimestamp=1634462026024, bornHost=/172.16.10.1:58143, storeTimestamp=1634462054202, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F000000000009AA51, commitLogOffset=633425, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=397, KEYS=OrderID188, CONSUME_START_TIME=1634462026031, UNIQ_KEY=AC100A0130FC18B4AAC2561831280001, CLUSTER=rocketmq-cluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
生产者代码
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 简单样例:单向发送
* Producer通过NameServer找到broker,把消息发送给broker,
* 然后broker把消息推送给了消费者
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
for (int i = 0; i < 20; i++)
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID188", // keys3
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
//不知道消息是否发送成功,反正Producer发送完了就不管了 .
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}