序言
metaq 支持两种消息过滤,一个是 tag 过滤;一个是 SQL 过滤。本文用于确定这两种过滤是在服务端还是在客户端,并附上相关代码。
一、tag 过滤
1. 使用方式
使用方式为在 subscribe 时除了 topic 外,设置 tag 即可。
consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});
2. 客户端代码
2.1 初始化代码
初始化时,其会将 subExpression 封装为 SubscriptionData 设置在RebalanceImpl中
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)public void subscribe(String topic, String subExpression) throws MQClientException {try {SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}}
2.2 拉取消息代码
拉取入口在 org.apache.rocketmq.client.impl.consumer.RebalanceService 中,其经过PullMessageService->netty 去拉取代码。其 rpc 调用码为 org.apache.rocketmq.common.protocol.RequestCode#PULL_MESSAGE。会带入 subscription 参数。
// org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
3. 服务端代码
服务端代码见org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean),具体逻辑待补充,但既然 subscription 带入到服务端,则过滤是在服务端执行
//org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
二、SQL 过滤
1. 使用方式
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});
2. 客户端代码
2.1 初始化代码
其方式和 tag 过滤一致,封装为 SubscriptionData 存储,并在后服务端通信时带给服务端
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {try {if (messageSelector == null) {subscribe(topic, SubscriptionData.SUB_ALL);return;}SubscriptionData subscriptionData = FilterAPI.build(topic,messageSelector.getExpression(), messageSelector.getExpressionType());this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {throw new MQClientException("subscription exception", e);}}
三、结论
两种过滤方式都是在服务端过滤,对客户端性能无影响,但是要注意SQL 过滤的服务端默认处理。
