Producer跟Consumer对于RocketMQ来说都是客户端,我们在使用的过程中初始化其中的一个实例来进行发送或消费消息。
DefaultMQProducer
DefaultMQProducer是默认的Producer实现类,实现了MQProducer接口,而MQProducer又实现了MQAdmin接口。
org.apache.rocketmq.client.MQAdmin接口结构
MQAdmin是RocketMQ管理的基本接口
public interface MQAdmin {
/**
* 创建Topic
*
* @param key 索引key
* @param newTopic topic名称
* @param queueNum Topic队列数量
*/
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
/**
* 创建Topic
*
* @param key 索引key
* @param newTopic topic名称
* @param queueNum Topic队列数量
* @param topicSysFlag Topic系统标记
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
/**
* 根据时间戳获取消息队列偏移量
*
* @param 消息队列实例
* @param 时间戳(毫秒)
* @return 偏移量
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
/**
* 获取最大偏移量
*
* @param 消息队列实例
* @return 最大偏移量
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
/**
* 获取最小偏移量
*
* @param 消息队列实例
* @return 获取最小偏移量
*/
long minOffset(final MessageQueue mq) throws MQClientException;
/**
* 获取最早存储消息的时间
*
* @param 消息队列实例
* @return 消息时间戳
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
/**
* 根据消息ID获取消息
*
* @param 偏移量消息ID
* @return 消息
*/
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
/**
* 查询消息
*
* @param topic 消息Topic
* @param key 消息索引
* @param maxNum 最大消息数
* @param begin 开始时间
* @param end 结束时间
* @return 结果集
*/
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
/**
* 根据Topic和消息ID返回消息扩展实例
*/
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
org.apache.rocketmq.client.producer.MQProducer结构
MQProducer的方法总结:
MQProducer中消息发送大致分为三种:单向,异步,同步
其中对消息发送有多种扩展机制:超时,指定队列,指定消息选择算法
request:表示该结果的返回条件为consumer已经消费了该消息并进行回复,producer利用同步或异步的方式获得consumer的回复消息,注:该模式效率低下。
public interface MQProducer extends MQAdmin {
//启动
void start() throws MQClientException;
//停止
void shutdown();
//查找主题下所有消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//同步发送消息(有超时机制)
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//异步发送消息+发送回调
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
//异步发送消息+发送回调(有超时机制)
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
//单向发送消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
//同步发送消息(指定消息队列)
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//同步发送消息(指定消息队列+超时机制)
SendResult send(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//异步发送消息+发送回调(指定消息队列)
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
//异步发送消息+发送回调(指定消息队列+超时机制)
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException;
//单向发送消息(指定消息队列)
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;
//同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载)
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//同步发送消息(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载)
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback) throws MQClientException, RemotingException,
InterruptedException;
//异步发送消息+发送回调(指定消息选择算法,会覆盖默认的消息队列负载+超时机制)
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException;
//单向发送消息(指定消息选择算法,会覆盖默认的消息队列负载)
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
//同步事务发送消息(指定事务执行器)
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
//同步事务发送消息
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//同步批量发送消息
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//同步批量发送消息(超时机制)
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//同步批量发送消息(指定队列)
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//同步批量发送消息(指定队列+超时机制)
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
//异步批量发送消息
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//异步批量发送消息(超时机制)
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
//异步批量发送消息(指定队列)
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
//异步批量发送消息(指定队列+超时机制)
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//request方法解释:使用request方法的返回结果时表示消费者已经消费该消息并且进行回复
//Send request message in synchronous mode. This method returns
// only when the consumer consume the request message and reply a message.
//同步+超时
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//异步+超时
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
//同步+消息选择算法+超时
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//异步+消息选择算法+超时
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
//同步+指定队列+超时
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
//异步+指定队列+超时
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
DefaultMQProducer核心属性
//生产者组
private String producerGroup;
//默认TopicKey
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
//默认Topic队列数
private volatile int defaultTopicQueueNums = 4;
//发送消息超时时间,默认3s
private int sendMsgTimeout = 3000;
//压缩消息(超过默认值则启动压缩,默认4kb)
private int compressMsgBodyOverHowmuch = 1024 * 4;
//失败重试次数(同步)
private int retryTimesWhenSendFailed = 2;
//失败重试次数(异步)
private int retryTimesWhenSendAsyncFailed = 2;
//发送失败时重试发送给其他Broker开关
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//消息最大长度
private int maxMessageSize = 1024 * 1024 * 4; // 4M
//消息追踪器
private TraceDispatcher traceDispatcher = null;