一、同步发送消息

适用场景:

适用于一些对响应时间无要求的场景,但是对消息的可靠性要求极高

使用案例:

同步等待send函数的执行结果,通过返回值获取发送的结果

  1. public class SyncProducer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("syncGroup");
  5. // 2.指定NameServer地址
  6. producer.setNamesrvAddr("127.0.0.1:9876");
  7. // 3.启动producer
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. // 4.创建消息对象,指定主题Topic、Tag和消息体
  11. // 参数一:主题的名称
  12. // 参数二:标签名
  13. // 参数三,实际的消息内容
  14. Message message = new Message("syncTopic","TagA", ("Hello" +i).getBytes());
  15. // 5.发送消息
  16. SendResult sendResult = producer.send(message);
  17. System.out.println("sendResult = " + sendResult);
  18. }
  19. // 6.关闭生产者producer
  20. producer.shutdown();
  21. }
  22. }

二、异步发送消息

适用场景:

对响应时间比较敏感的场景

使用案例:

通过SendCallback 中的函数,获取发送消息的响应结果

  1. public class AsyncProducer {
  2. public static void main(String[] args) throws Exception{
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("asyncGroup");
  5. // 2.指定NameServer地址
  6. producer.setNamesrvAddr("127.0.0.1:9876");
  7. producer.setRetryTimesWhenSendAsyncFailed(0);
  8. // 3.启动producer
  9. producer.start();
  10. for (int i = 0; i < 10; i++) {
  11. // 4.创建消息对象,指定主题Topic、Tag和消息体
  12. // 参数一:主题的名称
  13. // 参数二:标签名
  14. // 参数三,实际的消息内容
  15. Message message = new Message("asyncTopic","TagB", ("Hello" +i).getBytes());
  16. // 5.发送消息
  17. producer.send(message, new SendCallback() {
  18. public void onSuccess(SendResult sendResult) {
  19. System.out.println("success, sendResult = " + sendResult);
  20. }
  21. public void onException(Throwable throwable) {
  22. throwable.printStackTrace();
  23. }
  24. });
  25. }
  26. // 6.关闭生产者producer
  27. producer.shutdown();
  28. }
  29. }

三、单向发送消息(即不需要服务端的响应)

适用场景:

适用于不关心服务端是否已经接收到消息,即对发送结果不感兴趣的场景。例如日志收集(N多的日志,丢几条就丢几条吧)

使用案例:

  1. public class OneWayProducer {
  2. public static void main(String[] args) throws Exception{
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("onewayGroup");
  5. // 2.指定NameServer地址
  6. producer.setNamesrvAddr("127.0.0.1:9876");
  7. // 3.启动producer
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. // 4.创建消息对象,指定主题Topic、Tag和消息体
  11. // 参数一:主题的名称
  12. // 参数二:标签名
  13. // 参数三,实际的消息内容
  14. Message message = new Message("oneWayTopic","TagC", ("Hello" +i).getBytes());
  15. // 5.发送消息
  16. producer.sendOneway(message);
  17. }
  18. // 6.关闭生产者producer
  19. producer.shutdown();
  20. }
  21. }

四、消费端消费消息

消费模式:

  • 负载均衡模式: 即多个消费者共同处理生产者发送的所有消息(总3条,A消费者1条,B消费者2条),各个消费端处理的消息总和为生产者发送的消息数量
  • 广播模式:即多个消费者全量消费生产者的所有消息,类似于订阅的公众号,所有的订阅者都可以看到。

默认消费者是负载均衡的模式

使用案例:

通过 consumer.setMessageModel(MessageModel.CLUSTERING); 这行代码,就可以设置消费者的消费模式。

负载均衡模式:

  1. public class LoadBalancerConsumer {
  2. public static void main(String[] args) throws MQClientException {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");
  5. // 2.指定NameServer地址
  6. consumer.setNamesrvAddr("127.0.0.1:9876");
  7. // 指定consumer的消费模式,默认是负载均衡模式
  8. consumer.setMessageModel(MessageModel.CLUSTERING);
  9. // 3.订阅主题Topic和Tag
  10. consumer.subscribe("syncTopic", "TagA");
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  14. for (MessageExt messageExt : list) {
  15. System.out.println("messageExt = " + messageExt);
  16. }
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. }
  19. });
  20. // 5.启动消费者consumer
  21. consumer.start();
  22. }
  23. }

广播模式

  1. public class BroadcastingConsumer {
  2. public static void main(String[] args) throws MQClientException {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");
  5. // 2.指定NameServer地址
  6. consumer.setNamesrvAddr("127.0.0.1:9876");
  7. // 指定consumer的消费模式,默认是负载均衡模式
  8. consumer.setMessageModel(MessageModel.BROADCASTING);
  9. // 3.订阅主题Topic和Tag
  10. consumer.subscribe("syncTopic", "TagA");
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  14. for (MessageExt messageExt : list) {
  15. System.out.println("messageExt = " + messageExt);
  16. }
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. }
  19. });
  20. // 5.启动消费者consumer
  21. consumer.start();
  22. }
  23. }

五、顺序消息

适用场景:

顺序消息指的是,按照消息发送的顺序进行消费,RocketMQ中分为分区有序以及全局有序两种。
在RocketMQ默认中,往topic发送消息时,采用轮询的方式发送到不同的Queue中(默认8个),消费者从不同的Queue中拉取数据,这就无法保证消息的有序性,这种情况下,可以通过将需要保证有序的消息,发送到同一个Queue中,消费者只从这个队列中拉取,就可以保证顺序性。如果发送端和消费端都只有一个queue,则全局有序,如果有多个queue,则对于同一个queue中有序,即分区有序。

使用案例:

首先构建几个订单对象

10001号订单 - > 创建 -> 付款 -> 推送-> 完成->
10002号订单 -> 创建 -> 付款 -> 完成
10003号订单 -> 创建 -> 付款 -> 完成

  1. package cn.spectrumrpc.rocketmq.order.producer;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. public class Order {
  5. private long orderId;
  6. private String desc;
  7. public long getOrderId() {
  8. return orderId;
  9. }
  10. public void setOrderId(long orderId) {
  11. this.orderId = orderId;
  12. }
  13. public String getDesc() {
  14. return desc;
  15. }
  16. public void setDesc(String desc) {
  17. this.desc = desc;
  18. }
  19. @Override
  20. public String toString() {
  21. return "OrderStep{" +
  22. "orderId=" + orderId +
  23. ", desc='" + desc + '\'' +
  24. '}';
  25. }
  26. /**
  27. * 生成模拟订单数据
  28. */
  29. private List<Order> buildOrders() {
  30. List<Order> orderList = new ArrayList<Order>();
  31. Order orderDemo = new Order();
  32. orderDemo.setOrderId(10001);
  33. orderDemo.setDesc("创建");
  34. orderList.add(orderDemo);
  35. orderDemo = new Order();
  36. orderDemo.setOrderId(10002);
  37. orderDemo.setDesc("创建");
  38. orderList.add(orderDemo);
  39. orderDemo = new Order();
  40. orderDemo.setOrderId(10001);
  41. orderDemo.setDesc("付款");
  42. orderList.add(orderDemo);
  43. orderDemo = new Order();
  44. orderDemo.setOrderId(10003);
  45. orderDemo.setDesc("创建");
  46. orderList.add(orderDemo);
  47. orderDemo = new Order();
  48. orderDemo.setOrderId(10002);
  49. orderDemo.setDesc("付款");
  50. orderList.add(orderDemo);
  51. orderDemo = new Order();
  52. orderDemo.setOrderId(10003);
  53. orderDemo.setDesc("付款");
  54. orderList.add(orderDemo);
  55. orderDemo = new Order();
  56. orderDemo.setOrderId(10002);
  57. orderDemo.setDesc("完成");
  58. orderList.add(orderDemo);
  59. orderDemo = new Order();
  60. orderDemo.setOrderId(10001);
  61. orderDemo.setDesc("推送");
  62. orderList.add(orderDemo);
  63. orderDemo = new Order();
  64. orderDemo.setOrderId(10003);
  65. orderDemo.setDesc("完成");
  66. orderList.add(orderDemo);
  67. orderDemo = new Order();
  68. orderDemo.setOrderId(10001);
  69. orderDemo.setDesc("完成");
  70. orderList.add(orderDemo);
  71. return orderList;
  72. }
  73. }

顺序生产者:

相比于简单案例中,只在Send方法中,多添加了一个MessageQueueSelector,用于选择此条消息发送到哪个队列当中,这里采用id取模的方式,保证同一个id落入同一个队列。

  1. public class OrderProducer {
  2. public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  3. DefaultMQProducer producer = new DefaultMQProducer("orderGroup");
  4. producer.setNamesrvAddr("127.0.0.1:9876");
  5. producer.start();
  6. List<Order> orders = Order.buildOrders();
  7. for (int i = 0; i < orders.size(); i++) {
  8. String messageBody = "orderId: " + orders.get(i).getOrderId();
  9. Message message = new Message("OrderTopic", messageBody.getBytes());
  10. SendResult sendResult = producer.send(message, new MessageQueueSelector() {
  11. /**
  12. * 第一个参数,消息队列的集合,默认8个
  13. * 第二个参数,发送的消息对象
  14. * 第三个参数,为send方法中,传入的第三个参数,在此例子中,为Order的id
  15. * @param list 消息队列的集合
  16. * @param message 消息对象
  17. * @param o send的第三个参数
  18. * @return 选中的消息队列
  19. */
  20. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  21. long orderId = (Long) o;
  22. // 这里使用OrderId与队列的个数进行取模,保证同一个OrderId,落入到同一个MessageQueue中,
  23. //从而保证同一个订单中:消息的有序性
  24. return list.get((int) (orderId % list.size()));
  25. }
  26. }, orders.get(i).getOrderId());
  27. System.out.println("message:" + messageBody + " sendResult.getMessageQueue().getQueueId()" + sendResult.getMessageQueue().getQueueId());
  28. }
  29. producer.shutdown();
  30. }
  31. }

顺序消费者:

  1. public class OrderConsumer {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroup");
  4. consumer.setNamesrvAddr("127.0.0.1:9876");
  5. consumer.subscribe("OrderTopic", "*");
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  8. for (MessageExt message : list) {
  9. consumeOrderlyContext.setAutoCommit(true);
  10. String threadName = Thread.currentThread().getName();
  11. System.out.println("threadName = " + threadName + " queueId:" + message.getQueueId() + ",content: " + new String(message.getBody()));
  12. }
  13. return ConsumeOrderlyStatus.SUCCESS;
  14. }
  15. });
  16. consumer.start();
  17. }
  18. }

测试结果:

顺序生产者:

可以看到,同一个id的消息,落入到同一个队列中
image.png

顺序消费者:

可以看到,同一个id的消息,都是被同一个线程进行处理,订单对象对某一个Queue而言有序
image.png

总结:

顺序消费,主要依靠 MessageListenerOrderly(消费者) 以及 MessageQueueSelector (消费者)来实现

六、延时发送消息

适用场景:

例如电商场景中,生成了一个未支付的订单,半小时之后去查询,如果还未支付,则设置为已取消,释放库存。

使用案例:

延迟生产者:

通过 message.setDelayLevel() 来设置消息的延时级别。

  1. public class DelayProducer {
  2. public static void main(String[] args) throws Exception{
  3. // 实例化一个生产者来产生延时消息
  4. DefaultMQProducer producer = new DefaultMQProducer("DelayGroup");
  5. producer.setNamesrvAddr("127.0.0.1:9876");
  6. // 启动生产者
  7. producer.start();
  8. int totalMessagesToSend = 1;
  9. for (int i = 0; i < totalMessagesToSend; i++) {
  10. Message message = new Message("DelayTopic", ("Hello delay message " + i).getBytes());
  11. // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  12. message.setDelayTimeLevel(3);
  13. // 发送消息
  14. producer.send(message);
  15. }
  16. // 关闭生产者
  17. producer.shutdown();
  18. }
  19. }

延时消费者:

  1. public class DelayConsumer {
  2. public static void main(String[] args) throws Exception{
  3. // 实例化消费者
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayGroup");
  5. consumer.setNamesrvAddr("127.0.0.1:9876");
  6. // 订阅Topics
  7. consumer.subscribe("DelayTopic", "*");
  8. // 注册消息监听者
  9. consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
  10. for (MessageExt message : messages) {
  11. // Print approximate delay time period
  12. System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
  13. }
  14. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  15. });
  16. // 启动消费者
  17. consumer.start();
  18. }
  19. }

测试结果:

可以看出,从消息的发生者启动,到消费者打印消费信息,其间大概过了10s

延时消息的使用限制:

rocketmq目前不支持任意时间的延时消息,只能设置固定的几个级别。在 org.apache.rocketmq.store.config.MessageStoreConfig 这个类中定义了延时队列的级别,总共从1s到2H不等。总共18个级别。

  1. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

七、批量发送消息

适用场景:

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

使用案例:

如果每次发送的消息不超过4MB,则可以一次性发送,示例代码如下:

  1. public class SimpleBatchProducer {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
  4. producer.setNamesrvAddr("127.0.0.1:9876");
  5. producer.start();
  6. List<Message> messageList = new ArrayList<>();
  7. for (int i = 0; i < 5; i++) {
  8. messageList.add(new Message("batchTopic", ("message" + i).getBytes()));
  9. }
  10. producer.send(messageList);
  11. producer.shutdown();
  12. }
  13. }

如果发送的消息总量大于4MB,建议将消息进行拆分之后,再进行发送,下面为拆分的示例代码
将List进行分割的类ListSplitter

  1. public class ListSplitter implements Iterator<List<Message>> {
  2. private final int SIZE_LIMIT = 1024 * 1024 * 4;
  3. private final List<Message> messages;
  4. private int currIndex;
  5. public ListSplitter(List<Message> messages) {
  6. this.messages = messages;
  7. }
  8. @Override
  9. public boolean hasNext() {
  10. return currIndex < messages.size();
  11. }
  12. @Override
  13. public List<Message> next() {
  14. int nextIndex = currIndex;
  15. int totalSize = 0;
  16. for (; nextIndex < messages.size(); nextIndex++) {
  17. Message message = messages.get(nextIndex);
  18. int tmpSize = message.getTopic().length() + message.getBody().length;
  19. Map<String, String> properties = message.getProperties();
  20. for (Map.Entry<String, String> entry : properties.entrySet()) {
  21. tmpSize += entry.getKey().length() + entry.getValue().length();
  22. }
  23. // 增加日志的开销20字节
  24. tmpSize = tmpSize + 20;
  25. if (tmpSize > SIZE_LIMIT) {
  26. //单个消息超过了最大的限制
  27. //忽略,否则会阻塞分裂的进程
  28. if (nextIndex - currIndex == 0) {
  29. //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
  30. nextIndex++;
  31. }
  32. break;
  33. }
  34. if (tmpSize + totalSize > SIZE_LIMIT) {
  35. break;
  36. } else {
  37. totalSize += tmpSize;
  38. }
  39. }
  40. List<Message> subList = messages.subList(currIndex, nextIndex);
  41. currIndex = nextIndex;
  42. return subList;
  43. }
  44. }

生产者代码:

  1. public class SplitBatchProducer {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
  4. producer.setNamesrvAddr("127.0.0.1:9876");
  5. producer.start();
  6. List<Message> messageList = new ArrayList<>();
  7. for (int i = 0; i < 1000; i++) {
  8. messageList.add(new Message("batchTopic", ("message" + i).getBytes()));
  9. }
  10. ListSplitter splitter = new ListSplitter(messageList);
  11. while (splitter.hasNext()) {
  12. List<Message> listItem = splitter.next();
  13. producer.send(listItem);
  14. }
  15. producer.shutdown();
  16. }
  17. }

八、过滤消息

适用场景:

用于从海量的消息中,筛选出自己想要的数据。

过滤消息的分类:

  • Tag标签过滤

例如如下的代码,消费者接收TAGA 或者TAGB或者TAGC标签的数据。但是一个消息,只能有一个标签,这对于复杂的业务场景来讲的话,就不是特别的适用了,所以在这种情况下,就引申出了SQL语法过滤消息,即第二种消息过滤。

  1. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
  2. consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL语法过滤

上述说到,SQL语法是为了应付复杂的业务场景,而SQL可以根据发送消息时的属性进行计算,从而筛选出想要的消息。
例如如下,消息包含A、B、C三个属性,可以通过以下的语法,过滤出所期望的消息,获取到消息体。

  1. ------------
  2. | message |
  3. |----------| a > 5 AND b = 'abc'
  4. | a = 10 | --------------------> MessageA
  5. | b = 'abc'|
  6. | c = true |
  7. ------------
  8. ------------
  9. | message |
  10. |----------| a > 5 AND b = 'abc'
  11. | a = 1 | --------------------> MessageB
  12. | b = 'abc'|
  13. | c = true |
  14. ------------

SQL过滤的基本语法:

在RocketMQ中只定义了一些基础的语法来支持这个语法的特性,有如下几种:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量类型支持:

  • 数值型:123,1.23
  • 字符串: ‘123456’,必须使用单引号
  • 空值,NULL,特殊的常量
  • Boolean值,TRUE/FALSE

只有在PUSH模式下的消费者,才支持SQL92标准的sql语句,其API为:

  1. public void subscribe(finalString topic, final MessageSelector messageSelector)

使用前提:

在conf/broker.conf 中添加如下的配置,开启属性过滤,开启完成之后,重启broker

  1. enablePropertyFilter=true

如果没有添加如下的属性,进行sql查询时,则会报如下的错误。
image.png

使用案例:

生产者:

  1. public class SQLProducer {
  2. public static void main(String[] args) throws Exception{
  3. DefaultMQProducer producer = new DefaultMQProducer("sqlGroup");
  4. producer.setNamesrvAddr("127.0.0.1:9876");
  5. producer.start();
  6. for (int i = 0; i < 10; i++) {
  7. Message msg = new Message("sqlTopic",
  8. "tagA",
  9. ("Hello RocketMQ " + i).getBytes()
  10. );
  11. // 设置消息的属性,用于过滤
  12. msg.putUserProperty("a", String.valueOf(i));
  13. producer.send(msg);
  14. }
  15. producer.shutdown();
  16. }
  17. }

消费者:

  1. public class SQLConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlGroup");
  4. consumer.setNamesrvAddr("127.0.0.1:9876");
  5. consumer.subscribe("sqlTopic", MessageSelector.bySql("a between 0 and 3"));
  6. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  7. for (MessageExt msg : msgs) {
  8. System.out.println("msg = " + new String(msg.getBody()));
  9. }
  10. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  11. });
  12. consumer.start();
  13. }
  14. }

测试结果:

消费者端打印根据过滤条件筛选出的几条消息
image.png

九、事务消息

适用场景:

使用案例: