一、同步发送消息
适用场景:
适用于一些对响应时间无要求的场景,但是对消息的可靠性要求极高
使用案例:
同步等待send函数的执行结果,通过返回值获取发送的结果
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("syncGroup");
// 2.指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
// 参数一:主题的名称
// 参数二:标签名
// 参数三,实际的消息内容
Message message = new Message("syncTopic","TagA", ("Hello" +i).getBytes());
// 5.发送消息
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}
// 6.关闭生产者producer
producer.shutdown();
}
}
二、异步发送消息
适用场景:
使用案例:
通过SendCallback 中的函数,获取发送消息的响应结果
public class AsyncProducer {
public static void main(String[] args) throws Exception{
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("asyncGroup");
// 2.指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setRetryTimesWhenSendAsyncFailed(0);
// 3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
// 参数一:主题的名称
// 参数二:标签名
// 参数三,实际的消息内容
Message message = new Message("asyncTopic","TagB", ("Hello" +i).getBytes());
// 5.发送消息
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("success, sendResult = " + sendResult);
}
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
}
// 6.关闭生产者producer
producer.shutdown();
}
}
三、单向发送消息(即不需要服务端的响应)
适用场景:
适用于不关心服务端是否已经接收到消息,即对发送结果不感兴趣的场景。例如日志收集(N多的日志,丢几条就丢几条吧)
使用案例:
public class OneWayProducer {
public static void main(String[] args) throws Exception{
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("onewayGroup");
// 2.指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
// 4.创建消息对象,指定主题Topic、Tag和消息体
// 参数一:主题的名称
// 参数二:标签名
// 参数三,实际的消息内容
Message message = new Message("oneWayTopic","TagC", ("Hello" +i).getBytes());
// 5.发送消息
producer.sendOneway(message);
}
// 6.关闭生产者producer
producer.shutdown();
}
}
四、消费端消费消息
消费模式:
- 负载均衡模式: 即多个消费者共同处理生产者发送的所有消息(总3条,A消费者1条,B消费者2条),各个消费端处理的消息总和为生产者发送的消息数量
- 广播模式:即多个消费者全量消费生产者的所有消息,类似于订阅的公众号,所有的订阅者都可以看到。
使用案例:
通过 consumer.setMessageModel(MessageModel.CLUSTERING); 这行代码,就可以设置消费者的消费模式。
负载均衡模式:
public class LoadBalancerConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");
// 2.指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 指定consumer的消费模式,默认是负载均衡模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 3.订阅主题Topic和Tag
consumer.subscribe("syncTopic", "TagA");
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("messageExt = " + messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}
广播模式
public class BroadcastingConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syncGroup");
// 2.指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 指定consumer的消费模式,默认是负载均衡模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 3.订阅主题Topic和Tag
consumer.subscribe("syncTopic", "TagA");
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("messageExt = " + messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者consumer
consumer.start();
}
}
五、顺序消息
适用场景:
顺序消息指的是,按照消息发送的顺序进行消费,RocketMQ中分为分区有序以及全局有序两种。
在RocketMQ默认中,往topic发送消息时,采用轮询的方式发送到不同的Queue中(默认8个),消费者从不同的Queue中拉取数据,这就无法保证消息的有序性,这种情况下,可以通过将需要保证有序的消息,发送到同一个Queue中,消费者只从这个队列中拉取,就可以保证顺序性。如果发送端和消费端都只有一个queue,则全局有序,如果有多个queue,则对于同一个queue中有序,即分区有序。
使用案例:
首先构建几个订单对象
10001号订单 - > 创建 -> 付款 -> 推送-> 完成->
10002号订单 -> 创建 -> 付款 -> 完成
10003号订单 -> 创建 -> 付款 -> 完成
package cn.spectrumrpc.rocketmq.order.producer;
import java.util.ArrayList;
import java.util.List;
public class Order {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
/**
* 生成模拟订单数据
*/
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<Order>();
Order orderDemo = new Order();
orderDemo.setOrderId(10001);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10002);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10001);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10003);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10002);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10003);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10002);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10001);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10003);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(10001);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
顺序生产者:
相比于简单案例中,只在Send方法中,多添加了一个MessageQueueSelector,用于选择此条消息发送到哪个队列当中,这里采用id取模的方式,保证同一个id落入同一个队列。
public class OrderProducer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("orderGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Order> orders = Order.buildOrders();
for (int i = 0; i < orders.size(); i++) {
String messageBody = "orderId: " + orders.get(i).getOrderId();
Message message = new Message("OrderTopic", messageBody.getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
* 第一个参数,消息队列的集合,默认8个
* 第二个参数,发送的消息对象
* 第三个参数,为send方法中,传入的第三个参数,在此例子中,为Order的id
* @param list 消息队列的集合
* @param message 消息对象
* @param o send的第三个参数
* @return 选中的消息队列
*/
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
long orderId = (Long) o;
// 这里使用OrderId与队列的个数进行取模,保证同一个OrderId,落入到同一个MessageQueue中,
//从而保证同一个订单中:消息的有序性
return list.get((int) (orderId % list.size()));
}
}, orders.get(i).getOrderId());
System.out.println("message:" + messageBody + " sendResult.getMessageQueue().getQueueId()" + sendResult.getMessageQueue().getQueueId());
}
producer.shutdown();
}
}
顺序消费者:
public class OrderConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt message : list) {
consumeOrderlyContext.setAutoCommit(true);
String threadName = Thread.currentThread().getName();
System.out.println("threadName = " + threadName + " queueId:" + message.getQueueId() + ",content: " + new String(message.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
测试结果:
顺序生产者:
顺序消费者:
可以看到,同一个id的消息,都是被同一个线程进行处理,订单对象对某一个Queue而言有序
总结:
顺序消费,主要依靠 MessageListenerOrderly(消费者) 以及 MessageQueueSelector (消费者)来实现
六、延时发送消息
适用场景:
例如电商场景中,生成了一个未支付的订单,半小时之后去查询,如果还未支付,则设置为已取消,释放库存。
使用案例:
延迟生产者:
通过 message.setDelayLevel() 来设置消息的延时级别。
public class DelayProducer {
public static void main(String[] args) throws Exception{
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("DelayGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
int totalMessagesToSend = 1;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("DelayTopic", ("Hello delay message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
延时消费者:
public class DelayConsumer {
public static void main(String[] args) throws Exception{
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topics
consumer.subscribe("DelayTopic", "*");
// 注册消息监听者
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
}
测试结果:
可以看出,从消息的发生者启动,到消费者打印消费信息,其间大概过了10s
延时消息的使用限制:
rocketmq目前不支持任意时间的延时消息,只能设置固定的几个级别。在 org.apache.rocketmq.store.config.MessageStoreConfig 这个类中定义了延时队列的级别,总共从1s到2H不等。总共18个级别。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七、批量发送消息
适用场景:
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
使用案例:
如果每次发送的消息不超过4MB,则可以一次性发送,示例代码如下:
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messageList.add(new Message("batchTopic", ("message" + i).getBytes()));
}
producer.send(messageList);
producer.shutdown();
}
}
如果发送的消息总量大于4MB,建议将消息进行拆分之后,再进行发送,下面为拆分的示例代码
将List进行分割的类ListSplitter
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
// 增加日志的开销20字节
tmpSize = tmpSize + 20;
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
生产者代码:
public class SplitBatchProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
messageList.add(new Message("batchTopic", ("message" + i).getBytes()));
}
ListSplitter splitter = new ListSplitter(messageList);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
}
}
八、过滤消息
适用场景:
过滤消息的分类:
- Tag标签过滤
例如如下的代码,消费者接收TAGA 或者TAGB或者TAGC标签的数据。但是一个消息,只能有一个标签,这对于复杂的业务场景来讲的话,就不是特别的适用了,所以在这种情况下,就引申出了SQL语法过滤消息,即第二种消息过滤。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
- SQL语法过滤
上述说到,SQL语法是为了应付复杂的业务场景,而SQL可以根据发送消息时的属性进行计算,从而筛选出想要的消息。
例如如下,消息包含A、B、C三个属性,可以通过以下的语法,过滤出所期望的消息,获取到消息体。
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> MessageA
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> MessageB
| b = 'abc'|
| c = true |
------------
SQL过滤的基本语法:
在RocketMQ中只定义了一些基础的语法来支持这个语法的特性,有如下几种:
- 数值比较,比如:>,>=,<,<=,BETWEEN,=
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量类型支持:
- 数值型:123,1.23
- 字符串: ‘123456’,必须使用单引号
- 空值,NULL,特殊的常量
- Boolean值,TRUE/FALSE
只有在PUSH模式下的消费者,才支持SQL92标准的sql语句,其API为:
public void subscribe(finalString topic, final MessageSelector messageSelector)
使用前提:
在conf/broker.conf 中添加如下的配置,开启属性过滤,开启完成之后,重启broker
enablePropertyFilter=true
如果没有添加如下的属性,进行sql查询时,则会报如下的错误。
使用案例:
生产者:
public class SQLProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("sqlGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("sqlTopic",
"tagA",
("Hello RocketMQ " + i).getBytes()
);
// 设置消息的属性,用于过滤
msg.putUserProperty("a", String.valueOf(i));
producer.send(msg);
}
producer.shutdown();
}
}
消费者:
public class SQLConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("sqlTopic", MessageSelector.bySql("a between 0 and 3"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("msg = " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}