Producer跟Consumer对于RocketMQ来说都是客户端,我们在使用的过程中初始化其中的一个实例来进行发送或消费消息。
DefaultMQProducer
DefaultMQProducer是默认的Producer实现类,实现了MQProducer接口,而MQProducer又实现了MQAdmin接口。
org.apache.rocketmq.client.MQAdmin接口结构
MQAdmin是RocketMQ管理的基本接口public interface MQAdmin {/*** 创建Topic** @param key 索引key* @param newTopic topic名称* @param queueNum Topic队列数量*/void createTopic(final String key, final String newTopic, final int queueNum)throws MQClientException;/*** 创建Topic** @param key 索引key* @param newTopic topic名称* @param queueNum Topic队列数量* @param topicSysFlag Topic系统标记*/void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)throws MQClientException;/*** 根据时间戳获取消息队列偏移量** @param 消息队列实例* @param 时间戳(毫秒)* @return 偏移量*/long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;/*** 获取最大偏移量** @param 消息队列实例* @return 最大偏移量*/long maxOffset(final MessageQueue mq) throws MQClientException;/*** 获取最小偏移量** @param 消息队列实例* @return 获取最小偏移量*/long minOffset(final MessageQueue mq) throws MQClientException;/*** 获取最早存储消息的时间** @param 消息队列实例* @return 消息时间戳*/long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;/*** 根据消息ID获取消息** @param 偏移量消息ID* @return 消息*/MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,InterruptedException, MQClientException;/*** 查询消息** @param topic 消息Topic* @param key 消息索引* @param maxNum 最大消息数* @param begin 开始时间* @param end 结束时间* @return 结果集*/QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end) throws MQClientException, InterruptedException;/*** 根据Topic和消息ID返回消息扩展实例*/MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;}
org.apache.rocketmq.client.producer.MQProducer结构
MQProducer的方法总结:
MQProducer中消息发送大致分为三种:单向,异步,同步
其中对消息发送有多种扩展机制:超时,指定队列,指定消息选择算法
request:表示该结果的返回条件为consumer已经消费了该消息并进行回复,producer利用同步或异步的方式获得consumer的回复消息,注:该模式效率低下。
public interface MQProducer extends MQAdmin {//启动void start() throws MQClientException;//停止void shutdown();//查找主题下所有消息队列List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;//同步发送消息SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//同步发送消息(有超时机制)SendResult send(final Message msg, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//异步发送消息+发送回调void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;//异步发送消息+发送回调(有超时机制)void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;//单向发送消息void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;//同步发送消息(指定消息队列)SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//同步发送消息(指定消息队列+超时机制)SendResult send(final Message msg, final MessageQueue mq, final long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;//异步发送消息+发送回调(指定消息队列)void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)throws MQClientException, RemotingException, InterruptedException;//异步发送消息+发送回调(指定消息队列+超时机制)void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)throws MQClientException, RemotingException, InterruptedException;//单向发送消息(指定消息队列)void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, InterruptedException;//同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载)SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;//同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载)void send(final Message msg, final MessageQueueSelector selector, final Object arg,final SendCallback sendCallback) throws MQClientException, RemotingException,InterruptedException;//异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)void send(final Message msg, final MessageQueueSelector selector, final Object arg,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,InterruptedException;//单向发送消息(指定消息选择算法,会覆盖默认的消息队列负载)void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)throws MQClientException, RemotingException, InterruptedException;//同步事务发送消息(指定事务执行器)TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;//同步事务发送消息TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException;//同步批量发送消息SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//同步批量发送消息(超时机制)SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//同步批量发送消息(指定队列)SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//同步批量发送消息(指定队列+超时机制)SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;//异步批量发送消息void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//异步批量发送消息(超时机制)void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,MQBrokerException, InterruptedException;//异步批量发送消息(指定队列)void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,MQBrokerException, InterruptedException;//异步批量发送消息(指定队列+超时机制)void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;//request方法解释:使用request方法的返回结果时表示消费者已经消费该消息并且进行回复//Send request message in synchronous mode. This method returns// only when the consumer consume the request message and reply a message.//同步+超时Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,RemotingException, MQBrokerException, InterruptedException;//异步+超时void request(final Message msg, final RequestCallback requestCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException, MQBrokerException;//同步+消息选择算法+超时Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,InterruptedException;//异步+消息选择算法+超时void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback,final long timeout) throws MQClientException, RemotingException,InterruptedException, MQBrokerException;//同步+指定队列+超时Message request(final Message msg, final MessageQueue mq, final long timeout)throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;//异步+指定队列+超时void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;}
DefaultMQProducer核心属性
//生产者组private String producerGroup;//默认TopicKeyprivate String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;//默认Topic队列数private volatile int defaultTopicQueueNums = 4;//发送消息超时时间,默认3sprivate int sendMsgTimeout = 3000;//压缩消息(超过默认值则启动压缩,默认4kb)private int compressMsgBodyOverHowmuch = 1024 * 4;//失败重试次数(同步)private int retryTimesWhenSendFailed = 2;//失败重试次数(异步)private int retryTimesWhenSendAsyncFailed = 2;//发送失败时重试发送给其他Broker开关private boolean retryAnotherBrokerWhenNotStoreOK = false;//消息最大长度private int maxMessageSize = 1024 * 1024 * 4; // 4M//消息追踪器private TraceDispatcher traceDispatcher = null;
