序言

metaq 支持两种消息过滤,一个是 tag 过滤;一个是 SQL 过滤。本文用于确定这两种过滤是在服务端还是在客户端,并附上相关代码。

一、tag 过滤

1. 使用方式

使用方式为在 subscribe 时除了 topic 外,设置 tag 即可。

  1. consumer.subscribe("TopicTest", "*");
  2. consumer.registerMessageListener(new MessageListenerConcurrently() {
  3. @Override
  4. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  5. ConsumeConcurrentlyContext context) {
  6. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  7. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  8. }
  9. });

2. 客户端代码

2.1 初始化代码

初始化时,其会将 subExpression 封装为 SubscriptionData 设置在RebalanceImpl中

  1. //org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)
  2. public void subscribe(String topic, String subExpression) throws MQClientException {
  3. try {
  4. SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
  5. topic, subExpression);
  6. this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  7. if (this.mQClientFactory != null) {
  8. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  9. }
  10. } catch (Exception e) {
  11. throw new MQClientException("subscription exception", e);
  12. }
  13. }

2.2 拉取消息代码

拉取入口在 org.apache.rocketmq.client.impl.consumer.RebalanceService 中,其经过PullMessageService->netty 去拉取代码。其 rpc 调用码为 org.apache.rocketmq.common.protocol.RequestCode#PULL_MESSAGE。会带入 subscription 参数。

  1. // org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
  2. private void pullMessage(final PullRequest pullRequest) {
  3. final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
  4. if (consumer != null) {
  5. DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
  6. impl.pullMessage(pullRequest);
  7. } else {
  8. log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
  9. }
  10. }

3. 服务端代码

服务端代码见org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean),具体逻辑待补充,但既然 subscription 带入到服务端,则过滤是在服务端执行

  1. //org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)

二、SQL 过滤

1. 使用方式

  1. consumer.subscribe("SqlFilterTest",
  2. MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
  3. "and (a is not null and a between 0 and 3)"));
  4. consumer.registerMessageListener(new MessageListenerConcurrently() {
  5. @Override
  6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  7. ConsumeConcurrentlyContext context) {
  8. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  10. }
  11. });

2. 客户端代码

2.1 初始化代码

其方式和 tag 过滤一致,封装为 SubscriptionData 存储,并在后服务端通信时带给服务端

  1. // org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, org.apache.rocketmq.client.consumer.MessageSelector)
  2. public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
  3. try {
  4. if (messageSelector == null) {
  5. subscribe(topic, SubscriptionData.SUB_ALL);
  6. return;
  7. }
  8. SubscriptionData subscriptionData = FilterAPI.build(topic,
  9. messageSelector.getExpression(), messageSelector.getExpressionType());
  10. this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  11. if (this.mQClientFactory != null) {
  12. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  13. }
  14. } catch (Exception e) {
  15. throw new MQClientException("subscription exception", e);
  16. }
  17. }

三、结论

两种过滤方式都是在服务端过滤,对客户端性能无影响,但是要注意SQL 过滤的服务端默认处理。

参考

  1. 通过MetaQ订阅Notify TRADE消息代码例子
  2. 通过MetaQ订阅Notify TRADE消息注意事项
  3. Metaq使用SQL语法做消息过滤简介
  4. 12. 消息过滤(tag)
  5. 16. 消息过滤(SQL)