import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 异步发送
*/
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException {
//生产者实例化
DefaultMQProducer producer = new DefaultMQProducer("async");
//指定rocket服务器地址
producer.setNamesrvAddr("zjj101:9876");
//启动实例
producer.start();
//发送异步失败时的重试次数(这里不重试)
//这就参数设置为0就是不重试,设置成1的话,如果发送失败了最多重试一次.
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagC",
"OrderID"+index,
("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));
//生产者异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}