概念

广播消息是:我需要这个消息被所有的消费者都消费到,不管你是哪个集群.

广播模式和生产者是没什么关系的,是在消费者这里设置

代码

操作说明

只需要在消费者里面设置 consumer.setMessageModel(MessageModel.BROADCASTING); 即可,

设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式,此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费,

这里可以设置两种模式: 默认是CLUSTERING(“CLUSTERING”),也就是集群模式, BROADCASTING(“BROADCASTING”) 是广播模式.

消费者

  1. package org.apache.rocketmq.example.broadcast;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  10. import java.util.List;
  11. /**
  12. * 广播模式
  13. */
  14. public class PushConsumer {
  15. public static void main(String[] args) throws InterruptedException, MQClientException {
  16. //根据情况修改消费者组
  17. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultGroup");
  18. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  19. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  20. //设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式
  21. //此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费
  22. /**这里可以设置两种模式: 默认都是CLUSTERING("CLUSTERING")
  23. * BROADCASTING("BROADCASTING") 广播模式
  24. * CLUSTERING("CLUSTERING") 集群模式
  25. */
  26. consumer.setMessageModel(MessageModel.BROADCASTING);
  27. //根据情况修改消费的topic
  28. consumer.subscribe("TopicTest", "*");
  29. consumer.registerMessageListener(new MessageListenerConcurrently() {
  30. @Override
  31. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  32. ConsumeConcurrentlyContext context) {
  33. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  35. }
  36. });
  37. consumer.start();
  38. System.out.printf("Broadcast Consumer Started.%n");
  39. }
  40. }

生产者

  1. package org.apache.rocketmq.example.broadcast;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. public class Producer {
  7. public static void main(String[] args) throws MQClientException, InterruptedException {
  8. DefaultMQProducer producer = new DefaultMQProducer("defaultGroup");
  9. //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
  10. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  11. producer.start();
  12. try {
  13. {
  14. Message msg = new Message("TopicTest", // 发送的topic
  15. "TagA", //tags
  16. "OrderID", // keys
  17. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
  18. );
  19. //同步传递消息,消息会发给集群中的一个Broker节点。
  20. //这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
  21. //不知道消息是否发送成功,反正Producer发送完了就不管了 .
  22. producer.sendOneway(msg);
  23. }
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. producer.shutdown();
  28. }
  29. }

测试说明

我这里启动了四个消费者, 消费者topic配置和consumerGroup配置见下面表格,

然后启动生产者,生产者往 TopicTest 这个 topic 里面发送消息,此时只有 相同的topic 接收到了消息,即使这相同的topic的consumerGroup不同,也接收到了消息,

如果两个消费者都是同样的topic,同样的consumerGroup ,这两个消费者也都分别接收到了消息.

消费者名字 topic consumerGroup 是否接收到消息
PushConsumer1 TopicTest defaultGroup 接收到了
PushConsumer2 TopicTest defaultGroup 接收到了
PushConsumer3 TopicTest defaultGroup22 接收到了
PushConsumer4 TopicTest2 defaultGroup