title: 广播消息示例 date: 2017/12/21

categories: 文档翻译

Broadcasting

What is broadcasting

Broadcasting is sending a message to all subscribers of a topic. If you want all subscribers receive messages about a topic, broadcasting is a good choice.

广播

什么是广播

广播就是向一个主题的所有订阅者发送同一条消息。如果你想让一个主题的所有订阅者收到消息,广播是一个很好的选择。

Producer example

  1. public class BroadcastProducer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  4. producer.start();
  5. for (int i = 0; i < 100; i++){
  6. Message msg = new Message("TopicTest",
  7. "TagA",
  8. "OrderID188",
  9. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. }
  13. producer.shutdown();
  14. }
  15. }

生产者示例

  1. public class BroadcastProducer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  4. producer.start();
  5. for (int i = 0; i < 100; i++){
  6. Message msg = new Message("TopicTest",
  7. "TagA",
  8. "OrderID188",
  9. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  10. SendResult sendResult = producer.send(msg);
  11. System.out.printf("%s%n", sendResult);
  12. }
  13. producer.shutdown();
  14. }
  15. }

Consumer example

  1. public class BroadcastConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. //set to broadcast mode
  6. consumer.setMessageModel(MessageModel.BROADCASTING);
  7. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  11. ConsumeConcurrentlyContext context) {
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  14. }
  15. });
  16. consumer.start();
  17. System.out.printf("Broadcast Consumer Started.%n");
  18. }
  19. }

消费者示例

  1. public class BroadcastConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. //set to broadcast mode
  6. consumer.setMessageModel(MessageModel.BROADCASTING);
  7. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  11. ConsumeConcurrentlyContext context) {
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  14. }
  15. });
  16. consumer.start();
  17. System.out.printf("Broadcast Consumer Started.%n");
  18. }
  19. }