Producer跟Consumer对于RocketMQ来说都是客户端,我们在使用的过程中初始化其中的一个实例来进行发送或消费消息。

DefaultMQProducer

DefaultMQProducer是默认的Producer实现类,实现了MQProducer接口,而MQProducer又实现了MQAdmin接口。

org.apache.rocketmq.client.MQAdmin接口结构

  1. MQAdminRocketMQ管理的基本接口
  2. public interface MQAdmin {
  3. /**
  4. * 创建Topic
  5. *
  6. * @param key 索引key
  7. * @param newTopic topic名称
  8. * @param queueNum Topic队列数量
  9. */
  10. void createTopic(final String key, final String newTopic, final int queueNum)
  11. throws MQClientException;
  12. /**
  13. * 创建Topic
  14. *
  15. * @param key 索引key
  16. * @param newTopic topic名称
  17. * @param queueNum Topic队列数量
  18. * @param topicSysFlag Topic系统标记
  19. */
  20. void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
  21. throws MQClientException;
  22. /**
  23. * 根据时间戳获取消息队列偏移量
  24. *
  25. * @param 消息队列实例
  26. * @param 时间戳(毫秒)
  27. * @return 偏移量
  28. */
  29. long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
  30. /**
  31. * 获取最大偏移量
  32. *
  33. * @param 消息队列实例
  34. * @return 最大偏移量
  35. */
  36. long maxOffset(final MessageQueue mq) throws MQClientException;
  37. /**
  38. * 获取最小偏移量
  39. *
  40. * @param 消息队列实例
  41. * @return 获取最小偏移量
  42. */
  43. long minOffset(final MessageQueue mq) throws MQClientException;
  44. /**
  45. * 获取最早存储消息的时间
  46. *
  47. * @param 消息队列实例
  48. * @return 消息时间戳
  49. */
  50. long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
  51. /**
  52. * 根据消息ID获取消息
  53. *
  54. * @param 偏移量消息ID
  55. * @return 消息
  56. */
  57. MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
  58. InterruptedException, MQClientException;
  59. /**
  60. * 查询消息
  61. *
  62. * @param topic 消息Topic
  63. * @param key 消息索引
  64. * @param maxNum 最大消息数
  65. * @param begin 开始时间
  66. * @param end 结束时间
  67. * @return 结果集
  68. */
  69. QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
  70. final long end) throws MQClientException, InterruptedException;
  71. /**
  72. * 根据Topic和消息ID返回消息扩展实例
  73. */
  74. MessageExt viewMessage(String topic,
  75. String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
  76. }

org.apache.rocketmq.client.producer.MQProducer结构

MQProducer的方法总结:
MQProducer中消息发送大致分为三种:单向,异步,同步
其中对消息发送有多种扩展机制:超时,指定队列,指定消息选择算法
request:表示该结果的返回条件为consumer已经消费了该消息并进行回复,producer利用同步或异步的方式获得consumer的回复消息,注:该模式效率低下。

  1. public interface MQProducer extends MQAdmin {
  2. //启动
  3. void start() throws MQClientException;
  4. //停止
  5. void shutdown();
  6. //查找主题下所有消息队列
  7. List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
  8. //同步发送消息
  9. SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
  10. InterruptedException;
  11. //同步发送消息(有超时机制)
  12. SendResult send(final Message msg, final long timeout) throws MQClientException,
  13. RemotingException, MQBrokerException, InterruptedException;
  14. //异步发送消息+发送回调
  15. void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
  16. RemotingException, InterruptedException;
  17. //异步发送消息+发送回调(有超时机制)
  18. void send(final Message msg, final SendCallback sendCallback, final long timeout)
  19. throws MQClientException, RemotingException, InterruptedException;
  20. //单向发送消息
  21. void sendOneway(final Message msg) throws MQClientException, RemotingException,
  22. InterruptedException;
  23. //同步发送消息(指定消息队列)
  24. SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
  25. RemotingException, MQBrokerException, InterruptedException;
  26. //同步发送消息(指定消息队列+超时机制)
  27. SendResult send(final Message msg, final MessageQueue mq, final long timeout)
  28. throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
  29. //异步发送消息+发送回调(指定消息队列)
  30. void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
  31. throws MQClientException, RemotingException, InterruptedException;
  32. //异步发送消息+发送回调(指定消息队列+超时机制)
  33. void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
  34. throws MQClientException, RemotingException, InterruptedException;
  35. //单向发送消息(指定消息队列)
  36. void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
  37. RemotingException, InterruptedException;
  38. //同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载)
  39. SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
  40. throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
  41. //同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)
  42. SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
  43. final long timeout) throws MQClientException, RemotingException, MQBrokerException,
  44. InterruptedException;
  45. //异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载)
  46. void send(final Message msg, final MessageQueueSelector selector, final Object arg,
  47. final SendCallback sendCallback) throws MQClientException, RemotingException,
  48. InterruptedException;
  49. //异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)
  50. void send(final Message msg, final MessageQueueSelector selector, final Object arg,
  51. final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
  52. InterruptedException;
  53. //单向发送消息(指定消息选择算法,会覆盖默认的消息队列负载)
  54. void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
  55. throws MQClientException, RemotingException, InterruptedException;
  56. //同步事务发送消息(指定事务执行器)
  57. TransactionSendResult sendMessageInTransaction(final Message msg,
  58. final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
  59. //同步事务发送消息
  60. TransactionSendResult sendMessageInTransaction(final Message msg,
  61. final Object arg) throws MQClientException;
  62. //同步批量发送消息
  63. SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
  64. InterruptedException;
  65. //同步批量发送消息(超时机制)
  66. SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
  67. RemotingException, MQBrokerException, InterruptedException;
  68. //同步批量发送消息(指定队列)
  69. SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
  70. RemotingException, MQBrokerException, InterruptedException;
  71. //同步批量发送消息(指定队列+超时机制)
  72. SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
  73. throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
  74. //异步批量发送消息
  75. void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
  76. InterruptedException;
  77. //异步批量发送消息(超时机制)
  78. void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
  79. MQBrokerException, InterruptedException;
  80. //异步批量发送消息(指定队列)
  81. void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
  82. MQBrokerException, InterruptedException;
  83. //异步批量发送消息(指定队列+超时机制)
  84. void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
  85. RemotingException, MQBrokerException, InterruptedException;
  86. //request方法解释:使用request方法的返回结果时表示消费者已经消费该消息并且进行回复
  87. //Send request message in synchronous mode. This method returns
  88. // only when the consumer consume the request message and reply a message.
  89. //同步+超时
  90. Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
  91. RemotingException, MQBrokerException, InterruptedException;
  92. //异步+超时
  93. void request(final Message msg, final RequestCallback requestCallback, final long timeout)
  94. throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
  95. //同步+消息选择算法+超时
  96. Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
  97. final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
  98. InterruptedException;
  99. //异步+消息选择算法+超时
  100. void request(final Message msg, final MessageQueueSelector selector, final Object arg,
  101. final RequestCallback requestCallback,
  102. final long timeout) throws MQClientException, RemotingException,
  103. InterruptedException, MQBrokerException;
  104. //同步+指定队列+超时
  105. Message request(final Message msg, final MessageQueue mq, final long timeout)
  106. throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
  107. //异步+指定队列+超时
  108. void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
  109. throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
  110. }

DefaultMQProducer核心属性

  1. //生产者组
  2. private String producerGroup;
  3. //默认TopicKey
  4. private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
  5. //默认Topic队列数
  6. private volatile int defaultTopicQueueNums = 4;
  7. //发送消息超时时间,默认3s
  8. private int sendMsgTimeout = 3000;
  9. //压缩消息(超过默认值则启动压缩,默认4kb)
  10. private int compressMsgBodyOverHowmuch = 1024 * 4;
  11. //失败重试次数(同步)
  12. private int retryTimesWhenSendFailed = 2;
  13. //失败重试次数(异步)
  14. private int retryTimesWhenSendAsyncFailed = 2;
  15. //发送失败时重试发送给其他Broker开关
  16. private boolean retryAnotherBrokerWhenNotStoreOK = false;
  17. //消息最大长度
  18. private int maxMessageSize = 1024 * 1024 * 4; // 4M
  19. //消息追踪器
  20. private TraceDispatcher traceDispatcher = null;