一、安装
参考官网:
https://rocketmq.apache.org/docs/quick-start/
Start Name Server
nohup sh bin/mqnamesrv &
Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
Start console
nohup java -jar rocketmq-console-ng-1.0.0.jar > out.log &
控制台安装:
1.下载源码
2.进入rocketmq-externals\rocketmq-console 工程修改application.properties中的配置
rocketmq.config.namesrvAddr=localhost:9876
3.编译源码
mvn clean package -Dmaven.test.skip=true
- 运行jar包
nohup java -jar rocketmq-console-ng-1.0.0.jar -> out.log &
二、 消息发送-基于Java环境构建消息发送与消息接收基础程序
引入依赖
org.apache.rocketmq
rocketmq-client
4.8.0
1.单生产者单消费者 (OneToOne)
构建生产者
public class Producer {
public static void main(String[] args) throws Exception {
/**
1. 谁来发?
2. 发给谁?
3. 怎么发?
4. 发什么?
5. 发的结果是什么?
4消费者
6. 打扫战场
**/
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.128.102:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
String string = LocalDateTime.now() + "rocketMq中文";
Message msg = new Message("topic1", "tag1", string.getBytes(StandardCharsets.UTF_8));
SendResult send = producer.send(msg);
System.out.println(send);
//5.关闭连接
producer.shutdown();
}
}
构建消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.128.102:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1", "*");
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//遍历消息
for (MessageExt msg : msgs) {
byte[] body = msg.getBody();
System.out.println("收到消息:" + new String(body, StandardCharsets.UTF_8));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}
2. 单生产者多消费者(OneToMany)
2.1不同组别订阅一个topic
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group-many-1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.128.102:9876");
//3.1启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建要发送的消息对象,指定topic,指定内容body
Message msg = new Message("topic", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
//3.2发送消息
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
}
//5.关闭连接
producer.shutdown();
}
}
注意设置消费者:消息的topic相同、group不同.消费者会分别消费生产者的消息
多消费者:
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many2");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.128.102:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic", "*");
//设置当前消费者的消费模式(默认模式:负载均衡CLUSTERING)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
// System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}
public class Consumer2 {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.128.102:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic", "*");
// 设置当前消费者的消费模式(默认模式:负载均衡)
// consumer.setMessageModel(MessageModel.CLUSTERING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
// System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务2已经开启!");
//5 不要关闭消费者!
}
}
2.1相同组别订阅一个topic(广播模式)
生产者不变,消费者代码均相同,修改消费模式为MessageModel.BROADCASTING既可。
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.128.102:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic", "*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
// System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}
2.3 轮询模式(负载均衡模式)
注意:同一个消费者多份。争抢topic数据。
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.128.102:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic", "*");
//设置当前消费者的消费模式(默认模式:负载均衡)
consumer.setMessageModel(MessageModel.CLUSTERING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历消息
for (MessageExt msg : list) {
// System.out.println("收到消息:" + msg);
System.out.println("消息是:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");
//5 不要关闭消费者!
}
}
2.4 多生产者多消费者消息发送(ManyToMany)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
三. 消息类别
- 同步消息
2. 异步消息
3. 单向消息3.1 同步消息
特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
代码实现 :SendResult result = producer.send(msg);
3.2 异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("async");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.128.102:9876");
//3.1启动发送的服务
producer.start();
//4.创建要发送的消息对象,指定topic,指定内容body
String string = LocalDateTime.now() + "rocketMq中文";
Message msg = new Message("topic-async", "tag1", string.getBytes(StandardCharsets.UTF_8));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步消息发送失败" + e);
}
});
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.关闭连接
producer.shutdown();
}
}
3.3 单向消息
特征:不需要有回执的消息,例如日志类消息
代码实现 :
producer.sendOneway(msg);
3.4 延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
Message msg = new Message("topic1", "tag1", string.getBytes(StandardCharsets.UTF_8));
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
目前支持的消息时间
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
3.5 批量消息
批量发送消息能显著提高传递小消息的性能.
发送批量消息:
List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);
注意限制:
1这些批量消息应该有相同的topic
2相同的waitStoreMsgOK
3不能是延时消息
4消息内容总长度不超过4M 消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
-
3.6 消息过滤
3.6.1分类过滤
按照tag过滤信息
生产者String msg = "消息过滤按照tag: hello rocketmq 2";
Message msg = new Message("topic6","tag2",msg.getBytes("UTF-8"));
消费者
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
3.6.2语法过滤(属性过滤/语法过滤/SQL过滤)
基本语法
数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量 布尔值,TRUE 或 FALSE
生产者
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
消费者
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
查看页面是否开启