单向(OneWay)发送

Producer只要把消息往Mq里面一推,Producer就不管了,至于消息是否成功到达MQ,Producer不管,这种吞吐量是最高的,

image.png

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. /**
  7. * 简单样例:同步发送消息
  8. * Producer通过NameServer找到broker,把消息发送给broker,
  9. * 然后broker把消息推送给了消费者
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws MQClientException, InterruptedException {
  13. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  14. //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
  15. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  16. producer.start();
  17. for (int i = 0; i < 20; i++)
  18. try {
  19. {
  20. Message msg = new Message("TopicTest", // 发送的topic
  21. "TagA", //tags
  22. "OrderID188", // keys3
  23. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
  24. );
  25. //同步传递消息,消息会发给集群中的一个Broker节点。
  26. // SendResult sendResult = producer.send(msg);
  27. // System.out.printf("%s%n", sendResult);
  28. //这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
  29. //不知道消息是否发送成功,反正Producer发送完了就不管了 .
  30. producer.sendOneway(msg);
  31. }
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. producer.shutdown();
  36. }
  37. }

同步发送

Producer 往MQ发送消息之后,会等待MQ给他自己响应,然后Producer那里的代码再往下继续执行自己的逻辑.
好处就是 Producer发送消息有没有发送成功,Producer自己是知道的,如果发送失败了话,Producer内部会有代码进行重试.

image.png

SendResult sendResult = producer.send(msg); 就是同步发送

  1. public static void main(String[] args) throws MQClientException, InterruptedException {
  2. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  3. //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
  4. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  5. producer.start();
  6. try {
  7. {
  8. Message msg = new Message("TopicTest", // 发送的topic
  9. "TagA", //tags
  10. "OrderID188", // keys3
  11. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
  12. );
  13. //同步传递消息,消息会发给集群中的一个Broker节点。
  14. //如果发送失败了 ,这里会有重试机制
  15. SendResult sendResult = producer.send(msg);
  16. System.out.printf("%s%n", sendResult);
  17. }
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. producer.shutdown();
  22. }

打印结果

  1. SendResult [sendStatus=SEND_OK, msgId=AC100A010AD818B4AAC255F0E3B50000, offsetMsgId=AC100A6600002A9F00000000000974CF, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=380]

其中就能通过sendStatus字段来判断是否发送成功了

异步发送

Producer发送完了消息给MQ之后就继续执行下面的逻辑了,然后Producer会给MQ一个回调函数,
MQ在完成消息之后会回过来请求Producer的回调方法,在回调方法去执行逻辑, 但是这个逻辑什么时候执行Producer就不管了.

image.png
异步模式就是一个双向的交互,Producer既发消息也会接收MQ传过来的回调结果
MQ既接收消息也会发送 处理结果给Producer

需要注意一点就是如果Producer服务停了,就无法接收到MQ发过来的回调了.

  1. package org.apache.rocketmq.example.simple;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  4. import org.apache.rocketmq.client.producer.SendCallback;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.common.RemotingHelper;
  8. import java.io.UnsupportedEncodingException;
  9. import java.util.concurrent.CountDownLatch;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * 简单样例:异步发送消息
  13. */
  14. public class AsyncProducer {
  15. public static void main(
  16. String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
  17. DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
  18. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  19. producer.start();
  20. //重试次数
  21. producer.setRetryTimesWhenSendAsyncFailed(0);
  22. int messageCount = 100;
  23. //由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
  24. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
  25. for (int i = 0; i < messageCount; i++) { // 发送100条消息
  26. try {
  27. final int index = i;
  28. Message msg = new Message("TopicTest",
  29. "TagA",
  30. "OrderID188",
  31. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  32. // SendCallback 是回调函数
  33. producer.send(msg, new SendCallback() {
  34. /**
  35. * 消息成功执行这里
  36. * @param sendResult
  37. */
  38. @Override
  39. public void onSuccess(SendResult sendResult) {
  40. countDownLatch.countDown();
  41. System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
  42. }
  43. /**
  44. * 消息失败执行这里
  45. * @param e
  46. */
  47. @Override
  48. public void onException(Throwable e) {
  49. countDownLatch.countDown();
  50. System.out.printf("%-10d Exception %s %n", index, e);
  51. e.printStackTrace();
  52. }
  53. });
  54. System.out.println("消息发送完成");
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. //保证100条消息的回调都处理完了,再结束producer
  60. countDownLatch.await(5, TimeUnit.SECONDS);
  61. producer.shutdown();
  62. }
  63. }

输出结果:

日志输出太多了,下面只是粘贴了部分的输出结果

  1. 消息发送完成
  2. 消息发送完成
  3. 消息发送完成
  4. 18 OK AC100A012F3018B4AAC255F865870011
  5. 2 OK AC100A012F3018B4AAC255F865810009
  6. 87 OK AC100A012F3018B4AAC255F8658A0056

三种方式总结

吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.

安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息

发送指的是消息从Producer发送给MQ, 至于消费者是否消费消息,Producer是不管的.

消费者代码,三种方式公用这一个消费者

  1. package org.apache.rocketmq.example.quickstart;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. /**
  11. * 此示例显示如何使用提供 {@link DefaultMQPushConsumer} 订阅和使用消息。
  12. */
  13. public class Consumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. /*
  16. 使用指定的消费者组名称实例化
  17. */
  18. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
  19. /*
  20. * 指定名称服务器地址。
  21. * <p/>
  22. *
  23. * 或者,您可以通过导出环境变量来指定名称服务器地址:NAMESRV_ADDR
  24. * <pre>
  25. * {@code
  26. * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
  27. * }
  28. * </pre>
  29. */
  30. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  31. /*如果指定的消费者组是全新的,请指定从哪里开始
  32. */
  33. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  34. /*
  35. 再订阅一个主题来消费。
  36. */
  37. consumer.subscribe("TopicTest", "*");
  38. /*
  39. * 注册回调以在从代理获取的消息到达时执行。
  40. */
  41. consumer.registerMessageListener(new MessageListenerConcurrently() {
  42. @Override
  43. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  44. ConsumeConcurrentlyContext context) {
  45. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  46. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  47. }
  48. });
  49. /*
  50. * 启动消费者实例
  51. */
  52. consumer.start();
  53. System.out.printf("Consumer Started.%n");
  54. }
  55. }