一、安装

参考官网:
https://rocketmq.apache.org/docs/quick-start/

Start Name Server

nohup sh bin/mqnamesrv &

Start Broker

nohup sh bin/mqbroker -n localhost:9876 &

Start console

nohup java -jar rocketmq-console-ng-1.0.0.jar > out.log &

image.png

控制台安装:
1.下载源码

git clone https://github.com/apache/rocketmq-externals.git

2.进入rocketmq-externals\rocketmq-console 工程修改application.properties中的配置

rocketmq.config.namesrvAddr=localhost:9876

3.编译源码

mvn clean package -Dmaven.test.skip=true

  1. 运行jar包

    nohup java -jar rocketmq-console-ng-1.0.0.jar -> out.log &

image.png

二、 消息发送-基于Java环境构建消息发送与消息接收基础程序

引入依赖


org.apache.rocketmq
rocketmq-client
4.8.0

1.单生产者单消费者 (OneToOne)

构建生产者

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. /**
  4. 1. 谁来发?
  5. 2. 发给谁?
  6. 3. 怎么发?
  7. 4. 发什么?
  8. 5. 发的结果是什么?
  9. 4消费者
  10. 6. 打扫战场
  11. **/
  12. //1.创建一个发送消息的对象Producer
  13. DefaultMQProducer producer = new DefaultMQProducer("group1");
  14. //2.设定发送的命名服务器地址
  15. producer.setNamesrvAddr("192.168.128.102:9876");
  16. //3.1启动发送的服务
  17. producer.start();
  18. //4.创建要发送的消息对象,指定topic,指定内容body
  19. String string = LocalDateTime.now() + "rocketMq中文";
  20. Message msg = new Message("topic1", "tag1", string.getBytes(StandardCharsets.UTF_8));
  21. SendResult send = producer.send(msg);
  22. System.out.println(send);
  23. //5.关闭连接
  24. producer.shutdown();
  25. }
  26. }

构建消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个接收消息的对象Consumer
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
  5. //2.设定接收的命名服务器地址
  6. consumer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.设置接收消息对应的topic,对应的sub标签为任意
  8. consumer.subscribe("topic1", "*");
  9. //3.开启监听,用于接收消息
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  13. //遍历消息
  14. for (MessageExt msg : msgs) {
  15. byte[] body = msg.getBody();
  16. System.out.println("收到消息:" + new String(body, StandardCharsets.UTF_8));
  17. }
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. });
  21. //4.启动接收消息的服务
  22. consumer.start();
  23. System.out.println("接受消息服务已经开启!");
  24. //5 不要关闭消费者!
  25. }
  26. }

2. 单生产者多消费者(OneToMany)

2.1不同组别订阅一个topic

生产者:

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个发送消息的对象Producer
  4. DefaultMQProducer producer = new DefaultMQProducer("group-many-1");
  5. //2.设定发送的命名服务器地址
  6. producer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.1启动发送的服务
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. //4.创建要发送的消息对象,指定topic,指定内容body
  11. Message msg = new Message("topic", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
  12. //3.2发送消息
  13. SendResult result = producer.send(msg);
  14. System.out.println("返回结果:" + result);
  15. }
  16. //5.关闭连接
  17. producer.shutdown();
  18. }
  19. }

注意设置消费者:消息的topic相同、group不同.消费者会分别消费生产者的消息
多消费者:

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个接收消息的对象Consumer
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many2");
  5. //2.设定接收的命名服务器地址
  6. consumer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.设置接收消息对应的topic,对应的sub标签为任意
  8. consumer.subscribe("topic", "*");
  9. //设置当前消费者的消费模式(默认模式:负载均衡CLUSTERING)
  10. // consumer.setMessageModel(MessageModel.CLUSTERING);
  11. //3.开启监听,用于接收消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
  15. list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  16. //遍历消息
  17. for (MessageExt msg : list) {
  18. // System.out.println("收到消息:" + msg);
  19. System.out.println("消息是:" + new String(msg.getBody()));
  20. }
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  22. }
  23. });
  24. //4.启动接收消息的服务
  25. consumer.start();
  26. System.out.println("接受消息服务已经开启!");
  27. //5 不要关闭消费者!
  28. }
  29. }
  1. public class Consumer2 {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个接收消息的对象Consumer
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many1");
  5. //2.设定接收的命名服务器地址
  6. consumer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.设置接收消息对应的topic,对应的sub标签为任意
  8. consumer.subscribe("topic", "*");
  9. // 设置当前消费者的消费模式(默认模式:负载均衡)
  10. // consumer.setMessageModel(MessageModel.CLUSTERING);
  11. //3.开启监听,用于接收消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
  15. list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  16. //遍历消息
  17. for (MessageExt msg : list) {
  18. // System.out.println("收到消息:" + msg);
  19. System.out.println("消息是:" + new String(msg.getBody()));
  20. }
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  22. }
  23. });
  24. //4.启动接收消息的服务
  25. consumer.start();
  26. System.out.println("接受消息服务2已经开启!");
  27. //5 不要关闭消费者!
  28. }
  29. }

image.png

2.1相同组别订阅一个topic(广播模式)

生产者不变,消费者代码均相同,修改消费模式为MessageModel.BROADCASTING既可。

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个接收消息的对象Consumer
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many");
  5. //2.设定接收的命名服务器地址
  6. consumer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.设置接收消息对应的topic,对应的sub标签为任意
  8. consumer.subscribe("topic", "*");
  9. //设置当前消费者的消费模式(默认模式:负载均衡)
  10. consumer.setMessageModel(MessageModel.BROADCASTING);
  11. //3.开启监听,用于接收消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
  15. list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  16. //遍历消息
  17. for (MessageExt msg : list) {
  18. // System.out.println("收到消息:" + msg);
  19. System.out.println("消息是:" + new String(msg.getBody()));
  20. }
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  22. }
  23. });
  24. //4.启动接收消息的服务
  25. consumer.start();
  26. System.out.println("接受消息服务已经开启!");
  27. //5 不要关闭消费者!
  28. }
  29. }

2.3 轮询模式(负载均衡模式)

注意:同一个消费者多份。争抢topic数据。

  1. public class Consumer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个接收消息的对象Consumer
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-many");
  5. //2.设定接收的命名服务器地址
  6. consumer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.设置接收消息对应的topic,对应的sub标签为任意
  8. consumer.subscribe("topic", "*");
  9. //设置当前消费者的消费模式(默认模式:负载均衡)
  10. consumer.setMessageModel(MessageModel.CLUSTERING);
  11. //3.开启监听,用于接收消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. @Override
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  15. //遍历消息
  16. for (MessageExt msg : list) {
  17. // System.out.println("收到消息:" + msg);
  18. System.out.println("消息是:" + new String(msg.getBody()));
  19. }
  20. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. }
  22. });
  23. //4.启动接收消息的服务
  24. consumer.start();
  25. System.out.println("接受消息服务已经开启!");
  26. //5 不要关闭消费者!
  27. }
  28. }

2.4 多生产者多消费者消息发送(ManyToMany)

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

三. 消息类别

  1. 同步消息
    2. 异步消息
    3. 单向消息

    3.1 同步消息

    特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
    image.png
    代码实现 :

    SendResult result = producer.send(msg);

3.2 异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
image.png

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. //1.创建一个发送消息的对象Producer
  4. DefaultMQProducer producer = new DefaultMQProducer("async");
  5. //2.设定发送的命名服务器地址
  6. producer.setNamesrvAddr("192.168.128.102:9876");
  7. //3.1启动发送的服务
  8. producer.start();
  9. //4.创建要发送的消息对象,指定topic,指定内容body
  10. String string = LocalDateTime.now() + "rocketMq中文";
  11. Message msg = new Message("topic-async", "tag1", string.getBytes(StandardCharsets.UTF_8));
  12. producer.send(msg, new SendCallback() {
  13. @Override
  14. public void onSuccess(SendResult sendResult) {
  15. System.out.println("异步消息发送成功" + sendResult);
  16. }
  17. @Override
  18. public void onException(Throwable e) {
  19. System.out.println("异步消息发送失败" + e);
  20. }
  21. });
  22. //休眠10秒
  23. TimeUnit.SECONDS.sleep(10);
  24. //5.关闭连接
  25. producer.shutdown();
  26. }
  27. }

3.3 单向消息

特征:不需要有回执的消息,例如日志类消息
image.png
代码实现 :

producer.sendOneway(msg);

3.4 延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

  1. Message msg = new Message("topic1", "tag1", string.getBytes(StandardCharsets.UTF_8));
  2. //设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  3. msg.setDelayTimeLevel(3);
  4. SendResult result = producer.send(msg);
  5. System.out.println("返回结果:" + result);

目前支持的消息时间

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

3.5 批量消息

批量发送消息能显著提高传递小消息的性能.
发送批量消息:

  1. List<Message> msgList = new ArrayList<Message>();
  2. Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF8"));
  3. Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF8"));
  4. Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF8"));
  5. msgList.add(msg1);
  6. msgList.add(msg2);
  7. msgList.add(msg3);
  8. SendResult result = producer.send(msgList);

注意限制:
1这些批量消息应该有相同的topic
2相同的waitStoreMsgOK
3不能是延时消息
4消息内容总长度不超过4M 消息内容总长度包含如下:

  • topic(字符串字节数)
  • body (字节数组长度)
  • 消息追加的属性(key与value对应字符串字节数)
  • 日志(固定20字节)

    3.6 消息过滤

    3.6.1分类过滤

    按照tag过滤信息
    生产者

    1. String msg = "消息过滤按照tag: hello rocketmq 2";
    2. Message msg = new Message("topic6","tag2",msg.getBytes("UTF-8"));

    消费者

    1. //接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
    2. consumer.subscribe("topic6","tag1 || tag2");

    3.6.2语法过滤(属性过滤/语法过滤/SQL过滤)

    基本语法

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;

  • 字符比较,比如:=,<>,IN
  • IS NULL 或者 IS NOT NULL
  • 逻辑符号 ANDORNOT

    常量支持类型为:

  • 数值,比如:123,3.1415;

  • 字符,比如:‘abc’,必须用单引号包裹起来
  • NULL,特殊的常量 布尔值,TRUE FALSE

生产者

  1. //为消息添加属性
  2. msg.putUserProperty("vip","1");
  3. msg.putUserProperty("age","20");

消费者

  1. //使用消息选择器来过滤对应的属性,语法格式为类SQL语法
  2. consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
  3. consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

查看页面是否开启
image.png