概念
广播消息是:我需要这个消息被所有的消费者都消费到,不管你是哪个集群.
广播模式和生产者是没什么关系的,是在消费者这里设置
代码
操作说明
只需要在消费者里面设置 consumer.setMessageModel(MessageModel.BROADCASTING); 即可,
设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式,此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费,
这里可以设置两种模式: 默认是CLUSTERING(“CLUSTERING”),也就是集群模式, BROADCASTING(“BROADCASTING”) 是广播模式.
消费者
package org.apache.rocketmq.example.broadcast;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 广播模式*/public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {//根据情况修改消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费/**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")* BROADCASTING("BROADCASTING") 广播模式* CLUSTERING("CLUSTERING") 集群模式*/consumer.setMessageModel(MessageModel.BROADCASTING);//根据情况修改消费的topicconsumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}}
生产者
package org.apache.rocketmq.example.broadcast;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();try {{Message msg = new Message("TopicTest", // 发送的topic"TagA", //tags"OrderID", // keys"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}
测试说明
我这里启动了四个消费者, 消费者topic配置和consumerGroup配置见下面表格,
然后启动生产者,生产者往 TopicTest 这个 topic 里面发送消息,此时只有 相同的topic 接收到了消息,即使这相同的topic的consumerGroup不同,也接收到了消息,
如果两个消费者都是同样的topic,同样的consumerGroup ,这两个消费者也都分别接收到了消息.
| 消费者名字 | topic | consumerGroup | 是否接收到消息 | 
|---|---|---|---|
| PushConsumer1 | TopicTest | defaultGroup | 接收到了 | 
| PushConsumer2 | TopicTest | defaultGroup | 接收到了 | 
| PushConsumer3 | TopicTest | defaultGroup22 | 接收到了 | 
| PushConsumer4 | TopicTest2 | defaultGroup | 没 | 
