1. import org.apache.rocketmq.client.exception.MQClientException;
    2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
    3. import org.apache.rocketmq.client.producer.SendCallback;
    4. import org.apache.rocketmq.client.producer.SendResult;
    5. import org.apache.rocketmq.common.message.Message;
    6. import org.apache.rocketmq.remoting.common.RemotingHelper;
    7. import java.util.concurrent.CountDownLatch;
    8. import java.util.concurrent.TimeUnit;
    9. /**
    10. * 异步发送
    11. */
    12. public class AsyncProducer {
    13. public static void main(
    14. String[] args) throws MQClientException, InterruptedException {
    15. //生产者实例化
    16. DefaultMQProducer producer = new DefaultMQProducer("async");
    17. //指定rocket服务器地址
    18. producer.setNamesrvAddr("zjj101:9876");
    19. //启动实例
    20. producer.start();
    21. //发送异步失败时的重试次数(这里不重试)
    22. //这就参数设置为0就是不重试,设置成1的话,如果发送失败了最多重试一次.
    23. producer.setRetryTimesWhenSendAsyncFailed(0);
    24. int messageCount = 10;
    25. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    26. for (int i = 0; i < messageCount; i++) {
    27. try {
    28. final int index = i;
    29. Message msg = new Message("TopicTest",
    30. "TagC",
    31. "OrderID"+index,
    32. ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));
    33. //生产者异步发送
    34. producer.send(msg, new SendCallback() {
    35. @Override
    36. public void onSuccess(SendResult sendResult) {
    37. countDownLatch.countDown();
    38. System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));
    39. }
    40. @Override
    41. public void onException(Throwable e) {
    42. countDownLatch.countDown();
    43. System.out.printf("%-10d Exception %s %n", index, e);
    44. e.printStackTrace();
    45. }
    46. });
    47. } catch (Exception e) {
    48. e.printStackTrace();
    49. }
    50. }
    51. countDownLatch.await(5, TimeUnit.SECONDS);
    52. producer.shutdown();
    53. }
    54. }