概述
官方推荐的是一个系统就用一个topic,然后系统下的类目就用tag来区分,比如说订单系统,整个订单就放到一个topic下,是下单还是支付还是退款等等发消息的时候通过不同的tag来区分.
broker再推送消息给consumer的时候就提前通过tag来判断是否推送到这个consumer了,如果你这个consumer没配置这个tag,那么broker就不会把这个tag推送给你.这样的好处就可以减少网络IO.
SQL92语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
 - 字符比较,比如:=,<>,IN;
 - IS NULL 或者 IS NOT NULL;
 - 逻辑符号 AND,OR,NOT;
 
常量支持类型为:
- 数值,比如:123,3.1415;
 - 字符,比如:’abc’,必须用单引号包裹起来;
 - NULL,特殊的常量
 - 布尔值,TRUE 或 FALSE
 
使用注意!
只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
代码案例
生产者
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class TagFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();//发送的时候设置tagString[] tags = new String[]{"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}}
消费者
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;import java.util.List;public class TagFilterConsumer {public static void main(String[] args) throws InterruptedException, MQClientException, IOException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");/* TagA || TagC ,意思是 只有 TagA 或者 TagC 的消息会被接收 */consumer.subscribe("TagFilterTest", "TagA || TagC");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
sql过滤
“TagA || TagC” 的写法一个消息只能有一个tag,如果想用多个tag来过滤的话,”TagA || TagC”这样的写法就做不到了.所以就使用sql过滤.
代码演示
Linux配置
在broker.properties那里配置,配置完了需要重启Broker集群
#是否支持根据属性过滤。如果使用基于标准的sql92模式过滤消息则改参数必须设置为true。enablePropertyFilter=true
生产者
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class SqlFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();String[] tags = new String[]{"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {// 发送消息的时候指定tagMessage msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置用户属性msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}}
消费者
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.MessageSelector;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class SqlFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");//sql选择器/*下面的sql选择器是 tags不为空,并且tags 是 'TagA' 或者'TagB' ,并且MQ的UserProperty里面的a属性不能是null 并且 a属性必须在 0~3范围 , 包括0和3*/consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
查看效果
先执行消费者, 再执行生产者
控制台消息我就不贴了, 符合sql表达式的消息会被consumer接收到,不符合的就不会被接收到.
