public interface MQAdmin {
* 创建Topic
* @param key 索引key
* @param newTopic topic名称
* @param queueNum Topic队列数量
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;
* 创建Topic
* @param key 索引key
* @param newTopic topic名称
* @param queueNum Topic队列数量
* @param topicSysFlag Topic系统标记
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
throws MQClientException;
* 根据时间戳获取消息队列偏移量
* @param 消息队列实例
* @param 时间戳(毫秒)
* @return 偏移量
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
* 获取最大偏移量
* @param 消息队列实例
* @return 最大偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;
* 获取最小偏移量
* @param 消息队列实例
* @return 获取最小偏移量
long minOffset(final MessageQueue mq) throws MQClientException;
* 获取最早存储消息的时间
* @param 消息队列实例
* @return 消息时间戳
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
* 根据消息ID获取消息
* @param 偏移量消息ID
* @return 消息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
* 查询消息
* @param topic 消息Topic
* @param key 消息索引
* @param maxNum 最大消息数
* @param begin 开始时间
* @param end 结束时间
* @return 结果集
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;
* 根据Topic和消息ID返回消息扩展实例
MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
public interface MQProducer extends MQAdmin {
void start() throws MQClientException;
void shutdown();
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg) throws MQClientException, RemotingException,
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback) throws MQClientException, RemotingException,
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//Send request message in synchronous mode. This method returns
// only when the consumer consume the request message and reply a message.
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
private String producerGroup;
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
private volatile int defaultTopicQueueNums = 4;
private int sendMsgTimeout = 3000;
private int compressMsgBodyOverHowmuch = 1024 * 4;
private int retryTimesWhenSendFailed = 2;
private int retryTimesWhenSendAsyncFailed = 2;
private boolean retryAnotherBrokerWhenNotStoreOK = false;
private int maxMessageSize = 1024 * 1024 * 4; // 4M
private TraceDispatcher traceDispatcher = null;