概念
广播消息是:我需要这个消息被所有的消费者都消费到,不管你是哪个集群.
广播模式和生产者是没什么关系的,是在消费者这里设置
代码
操作说明
只需要在消费者里面设置 consumer.setMessageModel(MessageModel.BROADCASTING); 即可,
设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式,此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费,
这里可以设置两种模式: 默认是CLUSTERING(“CLUSTERING”),也就是集群模式, BROADCASTING(“BROADCASTING”) 是广播模式.
消费者
package org.apache.rocketmq.example.broadcast;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 广播模式
*/
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//根据情况修改消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式
//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费
/**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")
* BROADCASTING("BROADCASTING") 广播模式
* CLUSTERING("CLUSTERING") 集群模式
*/
consumer.setMessageModel(MessageModel.BROADCASTING);
//根据情况修改消费的topic
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
生产者
package org.apache.rocketmq.example.broadcast;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID", // keys
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
//不知道消息是否发送成功,反正Producer发送完了就不管了 .
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
测试说明
我这里启动了四个消费者, 消费者topic配置和consumerGroup配置见下面表格,
然后启动生产者,生产者往 TopicTest 这个 topic 里面发送消息,此时只有 相同的topic 接收到了消息,即使这相同的topic的consumerGroup不同,也接收到了消息,
如果两个消费者都是同样的topic,同样的consumerGroup ,这两个消费者也都分别接收到了消息.
消费者名字 | topic | consumerGroup | 是否接收到消息 |
---|---|---|---|
PushConsumer1 | TopicTest | defaultGroup | 接收到了 |
PushConsumer2 | TopicTest | defaultGroup | 接收到了 |
PushConsumer3 | TopicTest | defaultGroup22 | 接收到了 |
PushConsumer4 | TopicTest2 | defaultGroup | 没 |