单向(OneWay)发送
Producer只要把消息往Mq里面一推,Producer就不管了,至于消息是否成功到达MQ,Producer不管,这种吞吐量是最高的,
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* 简单样例:同步发送消息
* Producer通过NameServer找到broker,把消息发送给broker,
* 然后broker把消息推送给了消费者
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
for (int i = 0; i < 20; i++)
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID188", // keys3
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
// SendResult sendResult = producer.send(msg);
// System.out.printf("%s%n", sendResult);
//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
//不知道消息是否发送成功,反正Producer发送完了就不管了 .
producer.sendOneway(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
同步发送
Producer 往MQ发送消息之后,会等待MQ给他自己响应,然后Producer那里的代码再往下继续执行自己的逻辑.
好处就是 Producer发送消息有没有发送成功,Producer自己是知道的,如果发送失败了话,Producer内部会有代码进行重试.
SendResult sendResult = producer.send(msg); 就是同步发送
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
try {
{
Message msg = new Message("TopicTest", // 发送的topic
"TagA", //tags
"OrderID188", // keys3
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
//同步传递消息,消息会发给集群中的一个Broker节点。
//如果发送失败了 ,这里会有重试机制
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
打印结果
SendResult [sendStatus=SEND_OK, msgId=AC100A010AD818B4AAC255F0E3B50000, offsetMsgId=AC100A6600002A9F00000000000974CF, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=380]
其中就能通过sendStatus字段来判断是否发送成功了
异步发送
Producer发送完了消息给MQ之后就继续执行下面的逻辑了,然后Producer会给MQ一个回调函数,
MQ在完成消息之后会回过来请求Producer的回调方法,在回调方法去执行逻辑, 但是这个逻辑什么时候执行Producer就不管了.
异步模式就是一个双向的交互,Producer既发消息也会接收MQ传过来的回调结果
MQ既接收消息也会发送 处理结果给Producer
需要注意一点就是如果Producer服务停了,就无法接收到MQ发过来的回调了.
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 简单样例:异步发送消息
*/
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
producer.start();
//重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) { // 发送100条消息
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback 是回调函数
producer.send(msg, new SendCallback() {
/**
* 消息成功执行这里
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
/**
* 消息失败执行这里
* @param e
*/
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
}
}
//保证100条消息的回调都处理完了,再结束producer
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
输出结果:
日志输出太多了,下面只是粘贴了部分的输出结果
消息发送完成
消息发送完成
消息发送完成
18 OK AC100A012F3018B4AAC255F865870011
2 OK AC100A012F3018B4AAC255F865810009
87 OK AC100A012F3018B4AAC255F8658A0056
三种方式总结
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息
发送指的是消息从Producer发送给MQ, 至于消费者是否消费消息,Producer是不管的.
消费者代码,三种方式公用这一个消费者
package org.apache.rocketmq.example.quickstart;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
使用指定的消费者组名称实例化
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
/*
* 指定名称服务器地址。
* <p/>
*
* 或者,您可以通过导出环境变量来指定名称服务器地址:NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
/*如果指定的消费者组是全新的,请指定从哪里开始
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/*
再订阅一个主题来消费。
*/
consumer.subscribe("TopicTest", "*");
/*
* 注册回调以在从代理获取的消息到达时执行。
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* 启动消费者实例
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}