一、同步发送消息
适用场景:
适用于一些对响应时间无要求的场景,但是对消息的可靠性要求极高
使用案例:
同步等待send函数的执行结果,通过返回值获取发送的结果
public class SyncProducer {public static void main(String[] args) throws Exception {// 1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("syncGroup");// 2.指定NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.创建消息对象,指定主题Topic、Tag和消息体// 参数一:主题的名称// 参数二:标签名// 参数三,实际的消息内容Message message = new Message("syncTopic","TagA", ("Hello" +i).getBytes());// 5.发送消息SendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}// 6.关闭生产者producerproducer.shutdown();}}
二、异步发送消息
适用场景:
使用案例:
通过SendCallback 中的函数,获取发送消息的响应结果
public class AsyncProducer {public static void main(String[] args) throws Exception{// 1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("asyncGroup");// 2.指定NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");producer.setRetryTimesWhenSendAsyncFailed(0);// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.创建消息对象,指定主题Topic、Tag和消息体// 参数一:主题的名称// 参数二:标签名// 参数三,实际的消息内容Message message = new Message("asyncTopic","TagB", ("Hello" +i).getBytes());// 5.发送消息producer.send(message, new SendCallback() {public void onSuccess(SendResult sendResult) {System.out.println("success, sendResult = " + sendResult);}public void onException(Throwable throwable) {throwable.printStackTrace();}});}// 6.关闭生产者producerproducer.shutdown();}}
三、单向发送消息(即不需要服务端的响应)
适用场景:
适用于不关心服务端是否已经接收到消息,即对发送结果不感兴趣的场景。例如日志收集(N多的日志,丢几条就丢几条吧)
使用案例:
public class OneWayProducer {public static void main(String[] args) throws Exception{// 1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("onewayGroup");// 2.指定NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.创建消息对象,指定主题Topic、Tag和消息体// 参数一:主题的名称// 参数二:标签名// 参数三,实际的消息内容Message message = new Message("oneWayTopic","TagC", ("Hello" +i).getBytes());// 5.发送消息producer.sendOneway(message);}// 6.关闭生产者producerproducer.shutdown();}}
四、消费端消费消息
消费模式:
- 负载均衡模式: 即多个消费者共同处理生产者发送的所有消息(总3条,A消费者1条,B消费者2条),各个消费端处理的消息总和为生产者发送的消息数量
- 广播模式:即多个消费者全量消费生产者的所有消息,类似于订阅的公众号,所有的订阅者都可以看到。
使用案例:
通过 consumer.setMessageModel(MessageModel.CLUSTERING); 这行代码,就可以设置消费者的消费模式。
负载均衡模式:
public class LoadBalancerConsumer {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");// 2.指定NameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");// 指定consumer的消费模式,默认是负载均衡模式consumer.setMessageModel(MessageModel.CLUSTERING);// 3.订阅主题Topic和Tagconsumer.subscribe("syncTopic", "TagA");// 4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("messageExt = " + messageExt);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消费者consumerconsumer.start();}}
广播模式
public class BroadcastingConsumer {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");// 2.指定NameServer地址consumer.setNamesrvAddr("127.0.0.1:9876");// 指定consumer的消费模式,默认是负载均衡模式consumer.setMessageModel(MessageModel.BROADCASTING);// 3.订阅主题Topic和Tagconsumer.subscribe("syncTopic", "TagA");// 4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("messageExt = " + messageExt);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消费者consumerconsumer.start();}}
五、顺序消息
适用场景:
顺序消息指的是,按照消息发送的顺序进行消费,RocketMQ中分为分区有序以及全局有序两种。
在RocketMQ默认中,往topic发送消息时,采用轮询的方式发送到不同的Queue中(默认8个),消费者从不同的Queue中拉取数据,这就无法保证消息的有序性,这种情况下,可以通过将需要保证有序的消息,发送到同一个Queue中,消费者只从这个队列中拉取,就可以保证顺序性。如果发送端和消费端都只有一个queue,则全局有序,如果有多个queue,则对于同一个queue中有序,即分区有序。
使用案例:
首先构建几个订单对象
10001号订单 - > 创建 -> 付款 -> 推送-> 完成->
10002号订单 -> 创建 -> 付款 -> 完成
10003号订单 -> 创建 -> 付款 -> 完成
package cn.spectrumrpc.rocketmq.order.producer;import java.util.ArrayList;import java.util.List;public class Order {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}/*** 生成模拟订单数据*/private List<Order> buildOrders() {List<Order> orderList = new ArrayList<Order>();Order orderDemo = new Order();orderDemo.setOrderId(10001);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10002);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10001);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10003);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10002);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10003);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10002);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10001);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10003);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(10001);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}}
顺序生产者:
相比于简单案例中,只在Send方法中,多添加了一个MessageQueueSelector,用于选择此条消息发送到哪个队列当中,这里采用id取模的方式,保证同一个id落入同一个队列。
public class OrderProducer {public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("orderGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Order> orders = Order.buildOrders();for (int i = 0; i < orders.size(); i++) {String messageBody = "orderId: " + orders.get(i).getOrderId();Message message = new Message("OrderTopic", messageBody.getBytes());SendResult sendResult = producer.send(message, new MessageQueueSelector() {/*** 第一个参数,消息队列的集合,默认8个* 第二个参数,发送的消息对象* 第三个参数,为send方法中,传入的第三个参数,在此例子中,为Order的id* @param list 消息队列的集合* @param message 消息对象* @param o send的第三个参数* @return 选中的消息队列*/public MessageQueue select(List<MessageQueue> list, Message message, Object o) {long orderId = (Long) o;// 这里使用OrderId与队列的个数进行取模,保证同一个OrderId,落入到同一个MessageQueue中,//从而保证同一个订单中:消息的有序性return list.get((int) (orderId % list.size()));}}, orders.get(i).getOrderId());System.out.println("message:" + messageBody + " sendResult.getMessageQueue().getQueueId()" + sendResult.getMessageQueue().getQueueId());}producer.shutdown();}}
顺序消费者:
public class OrderConsumer {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroup");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("OrderTopic", "*");consumer.registerMessageListener(new MessageListenerOrderly() {public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt message : list) {consumeOrderlyContext.setAutoCommit(true);String threadName = Thread.currentThread().getName();System.out.println("threadName = " + threadName + " queueId:" + message.getQueueId() + ",content: " + new String(message.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}}
测试结果:
顺序生产者:
顺序消费者:
可以看到,同一个id的消息,都是被同一个线程进行处理,订单对象对某一个Queue而言有序
总结:
顺序消费,主要依靠 MessageListenerOrderly(消费者) 以及 MessageQueueSelector (消费者)来实现
六、延时发送消息
适用场景:
例如电商场景中,生成了一个未支付的订单,半小时之后去查询,如果还未支付,则设置为已取消,释放库存。
使用案例:
延迟生产者:
通过 message.setDelayLevel() 来设置消息的延时级别。
public class DelayProducer {public static void main(String[] args) throws Exception{// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("DelayGroup");producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者producer.start();int totalMessagesToSend = 1;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("DelayTopic", ("Hello delay message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}}
延时消费者:
public class DelayConsumer {public static void main(String[] args) throws Exception{// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayGroup");consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅Topicsconsumer.subscribe("DelayTopic", "*");// 注册消息监听者consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者consumer.start();}}
测试结果:
可以看出,从消息的发生者启动,到消费者打印消费信息,其间大概过了10s
延时消息的使用限制:
rocketmq目前不支持任意时间的延时消息,只能设置固定的几个级别。在 org.apache.rocketmq.store.config.MessageStoreConfig 这个类中定义了延时队列的级别,总共从1s到2H不等。总共18个级别。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七、批量发送消息
适用场景:
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
使用案例:
如果每次发送的消息不超过4MB,则可以一次性发送,示例代码如下:
public class SimpleBatchProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("batchGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Message> messageList = new ArrayList<>();for (int i = 0; i < 5; i++) {messageList.add(new Message("batchTopic", ("message" + i).getBytes()));}producer.send(messageList);producer.shutdown();}}
如果发送的消息总量大于4MB,建议将消息进行拆分之后,再进行发送,下面为拆分的示例代码
将List进行分割的类ListSplitter
public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}// 增加日志的开销20字节tmpSize = tmpSize + 20;if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}}
生产者代码:
public class SplitBatchProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("batchGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Message> messageList = new ArrayList<>();for (int i = 0; i < 1000; i++) {messageList.add(new Message("batchTopic", ("message" + i).getBytes()));}ListSplitter splitter = new ListSplitter(messageList);while (splitter.hasNext()) {List<Message> listItem = splitter.next();producer.send(listItem);}producer.shutdown();}}
八、过滤消息
适用场景:
过滤消息的分类:
- Tag标签过滤
例如如下的代码,消费者接收TAGA 或者TAGB或者TAGC标签的数据。但是一个消息,只能有一个标签,这对于复杂的业务场景来讲的话,就不是特别的适用了,所以在这种情况下,就引申出了SQL语法过滤消息,即第二种消息过滤。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
- SQL语法过滤
上述说到,SQL语法是为了应付复杂的业务场景,而SQL可以根据发送消息时的属性进行计算,从而筛选出想要的消息。
例如如下,消息包含A、B、C三个属性,可以通过以下的语法,过滤出所期望的消息,获取到消息体。
------------| message ||----------| a > 5 AND b = 'abc'| a = 10 | --------------------> MessageA| b = 'abc'|| c = true |------------------------| message ||----------| a > 5 AND b = 'abc'| a = 1 | --------------------> MessageB| b = 'abc'|| c = true |------------
SQL过滤的基本语法:
在RocketMQ中只定义了一些基础的语法来支持这个语法的特性,有如下几种:
- 数值比较,比如:>,>=,<,<=,BETWEEN,=
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量类型支持:
- 数值型:123,1.23
- 字符串: ‘123456’,必须使用单引号
- 空值,NULL,特殊的常量
- Boolean值,TRUE/FALSE
只有在PUSH模式下的消费者,才支持SQL92标准的sql语句,其API为:
public void subscribe(finalString topic, final MessageSelector messageSelector)
使用前提:
在conf/broker.conf 中添加如下的配置,开启属性过滤,开启完成之后,重启broker
enablePropertyFilter=true
如果没有添加如下的属性,进行sql查询时,则会报如下的错误。
使用案例:
生产者:
public class SQLProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("sqlGroup");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("sqlTopic","tagA",("Hello RocketMQ " + i).getBytes());// 设置消息的属性,用于过滤msg.putUserProperty("a", String.valueOf(i));producer.send(msg);}producer.shutdown();}}
消费者:
public class SQLConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlGroup");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("sqlTopic", MessageSelector.bySql("a between 0 and 3"));consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("msg = " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}}
