概述

官方推荐的是一个系统就用一个topic,然后系统下的类目就用tag来区分,比如说订单系统,整个订单就放到一个topic下,是下单还是支付还是退款等等发消息的时候通过不同的tag来区分.

broker再推送消息给consumer的时候就提前通过tag来判断是否推送到这个consumer了,如果你这个consumer没配置这个tag,那么broker就不会把这个tag推送给你.这样的好处就可以减少网络IO.

SQL92语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:’abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

使用注意!

只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。

代码案例

生产者

  1. package org.apache.rocketmq.example.filter;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. public class TagFilterProducer {
  7. public static void main(String[] args) throws Exception {
  8. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  9. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  10. producer.start();
  11. //发送的时候设置tag
  12. String[] tags = new String[]{"TagA", "TagB", "TagC"};
  13. for (int i = 0; i < 15; i++) {
  14. Message msg = new Message("TagFilterTest",
  15. tags[i % tags.length],
  16. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  17. SendResult sendResult = producer.send(msg);
  18. System.out.printf("%s%n", sendResult);
  19. }
  20. producer.shutdown();
  21. }
  22. }

消费者

  1. package org.apache.rocketmq.example.filter;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.io.IOException;
  9. import java.util.List;
  10. public class TagFilterConsumer {
  11. public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  13. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  14. /* TagA || TagC ,意思是 只有 TagA 或者 TagC 的消息会被接收 */
  15. consumer.subscribe("TagFilterTest", "TagA || TagC");
  16. consumer.registerMessageListener(new MessageListenerConcurrently() {
  17. @Override
  18. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  19. ConsumeConcurrentlyContext context) {
  20. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  22. }
  23. });
  24. consumer.start();
  25. System.out.printf("Consumer Started.%n");
  26. }
  27. }

sql过滤

“TagA || TagC” 的写法一个消息只能有一个tag,如果想用多个tag来过滤的话,”TagA || TagC”这样的写法就做不到了.所以就使用sql过滤.

代码演示

Linux配置

在broker.properties那里配置,配置完了需要重启Broker集群

  1. #是否支持根据属性过滤。如果使用基于标准的sql92模式过滤消息则改参数必须设置为true
  2. enablePropertyFilter=true

生产者

  1. package org.apache.rocketmq.example.filter;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. public class SqlFilterProducer {
  7. public static void main(String[] args) throws Exception {
  8. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  9. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  10. producer.start();
  11. String[] tags = new String[]{"TagA", "TagB", "TagC"};
  12. for (int i = 0; i < 15; i++) {
  13. // 发送消息的时候指定tag
  14. Message msg = new Message("SqlFilterTest",
  15. tags[i % tags.length],
  16. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
  17. );
  18. // 设置用户属性
  19. msg.putUserProperty("a", String.valueOf(i));
  20. SendResult sendResult = producer.send(msg);
  21. System.out.printf("%s%n", sendResult);
  22. }
  23. producer.shutdown();
  24. }
  25. }

消费者

  1. package org.apache.rocketmq.example.filter;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.MessageSelector;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. public class SqlFilterConsumer {
  10. public static void main(String[] args) throws Exception {
  11. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  12. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  13. //sql选择器
  14. /*下面的sql选择器是 tags不为空,并且tags 是 'TagA' 或者'TagB' ,
  15. 并且MQ的UserProperty里面的a属性不能是null 并且 a属性必须在 0~3范围 , 包括0和3
  16. */
  17. consumer.subscribe("SqlFilterTest",
  18. MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
  19. "and (a is not null and a between 0 and 3)"));
  20. consumer.registerMessageListener(new MessageListenerConcurrently() {
  21. @Override
  22. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  23. ConsumeConcurrentlyContext context) {
  24. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  25. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  26. }
  27. });
  28. consumer.start();
  29. System.out.printf("Consumer Started.%n");
  30. }
  31. }

查看效果

先执行消费者, 再执行生产者

控制台消息我就不贴了, 符合sql表达式的消息会被consumer接收到,不符合的就不会被接收到.