单向(OneWay)发送
Producer只要把消息往Mq里面一推,Producer就不管了,至于消息是否成功到达MQ,Producer不管,这种吞吐量是最高的,
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/*** 简单样例:同步发送消息* Producer通过NameServer找到broker,把消息发送给broker,* 然后broker把消息推送给了消费者*/public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();for (int i = 0; i < 20; i++)try {{Message msg = new Message("TopicTest", // 发送的topic"TagA", //tags"OrderID188", // keys3"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。// SendResult sendResult = producer.send(msg);// System.out.printf("%s%n", sendResult);//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}}
同步发送
  Producer 往MQ发送消息之后,会等待MQ给他自己响应,然后Producer那里的代码再往下继续执行自己的逻辑.
 好处就是 Producer发送消息有没有发送成功,Producer自己是知道的,如果发送失败了话,Producer内部会有代码进行重试.

SendResult sendResult = producer.send(msg); 就是同步发送
public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();try {{Message msg = new Message("TopicTest", // 发送的topic"TagA", //tags"OrderID188", // keys3"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息,消息会发给集群中的一个Broker节点。//如果发送失败了 ,这里会有重试机制SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
打印结果
SendResult [sendStatus=SEND_OK, msgId=AC100A010AD818B4AAC255F0E3B50000, offsetMsgId=AC100A6600002A9F00000000000974CF, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=380]
其中就能通过sendStatus字段来判断是否发送成功了
异步发送
Producer发送完了消息给MQ之后就继续执行下面的逻辑了,然后Producer会给MQ一个回调函数,
MQ在完成消息之后会回过来请求Producer的回调方法,在回调方法去执行逻辑, 但是这个逻辑什么时候执行Producer就不管了.

 异步模式就是一个双向的交互,Producer既发消息也会接收MQ传过来的回调结果
 MQ既接收消息也会发送 处理结果给Producer 
需要注意一点就是如果Producer服务停了,就无法接收到MQ发过来的回调了.
package org.apache.rocketmq.example.simple;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/*** 简单样例:异步发送消息*/public class AsyncProducer {public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();//重试次数producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) { // 发送100条消息try {final int index = i;Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback 是回调函数producer.send(msg, new SendCallback() {/*** 消息成功执行这里* @param sendResult*/@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}/*** 消息失败执行这里* @param e*/@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});System.out.println("消息发送完成");} catch (Exception e) {e.printStackTrace();}}//保证100条消息的回调都处理完了,再结束producercountDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}}
输出结果:
日志输出太多了,下面只是粘贴了部分的输出结果
消息发送完成消息发送完成消息发送完成18 OK AC100A012F3018B4AAC255F8658700112 OK AC100A012F3018B4AAC255F86581000987 OK AC100A012F3018B4AAC255F8658A0056
三种方式总结
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息
发送指的是消息从Producer发送给MQ, 至于消费者是否消费消息,Producer是不管的.
消费者代码,三种方式公用这一个消费者
package org.apache.rocketmq.example.quickstart;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。*/public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {/*使用指定的消费者组名称实例化*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");/** 指定名称服务器地址。* <p/>** 或者,您可以通过导出环境变量来指定名称服务器地址:NAMESRV_ADDR* <pre>* {@code* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*/consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");/*如果指定的消费者组是全新的,请指定从哪里开始*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);/*再订阅一个主题来消费。*/consumer.subscribe("TopicTest", "*");/** 注册回调以在从代理获取的消息到达时执行。*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/** 启动消费者实例*/consumer.start();System.out.printf("Consumer Started.%n");}}
