title: 广播消息示例 date: 2017/12/21
categories: 文档翻译
Broadcasting
What is broadcasting
Broadcasting is sending a message to all subscribers of a topic. If you want all subscribers receive messages about a topic, broadcasting is a good choice.
广播
什么是广播
广播就是向一个主题的所有订阅者发送同一条消息。如果你想让一个主题的所有订阅者收到消息,广播是一个很好的选择。
Producer example
public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();for (int i = 0; i < 100; i++){Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}}
生产者示例
public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();for (int i = 0; i < 100; i++){Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}}
Consumer example
public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//set to broadcast modeconsumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}}
消费者示例
public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//set to broadcast modeconsumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}}
