最近刷到小红书里一位极客时间里的大佬的话,其大概意思:“任何代码都不是写一次就可以完美的写成功。C++的STL的每个API平均下来都重写了14次以上。”。文章又何尝不是呢?所以这里我将我之前写过的文章重新阅读整改。这就是突破自己呀! 2022年4月3日

本篇文章的要点罗列:

  1. 消息模型:主要由Producer、Broker、Consumer组成。
    • Broker(对应一台服务器资源)的组成
      • Topic:一个Broker有多个不同Topic,每个Topic的消息会分片存储在不同Broker上
      • MessageQueue:一个Topic有多个MessageQueue,存储的是指向真实消息的数据信息
    • Producer生产者
      • 生产者组:生产者组包含多个生产者
      • 生产者
    • Consumer消费者
      • 消费者组:消费者组包含多个消费者
      • 消费者

image.png
image.png

  1. 生产者:向Broker服务器发送消息。并获取Brokker的一个确认信息(单向发送(oneway方式)不需要)
  2. 消费者:从Broker服务器拉取消息。一个消费者只能显式指定一个Topic进行消费。消费形式有推送和主动拉取两种模式
    • 拉取模式(Pull):消费者主动从Broker服务器中拉取消息
    • 推动模式(Push):Broker收到生产者消息,主动推送给消费者(RocketMQ推模式依赖拉取模式)
  3. 生产者组:同一类Producer集合。原始生产者发送之后崩溃。Broker会联系其他成员提交
  4. 消费者组:同一类消费者集合。
  5. 主题(Topic):一类消息的集合。
  6. 代理服务器(Broker Server):负责存储、转发消息。并且存储消费者组、消费进度、队列消息的等元数据信息
  7. 名字服务(Name Server):充当路由。消费者和生产者通过名字服务找到各个主题相应的BrokerIP列表。NameServer彼此之间相互独立BrokerIP信息不同步
  8. 集群消费:相同消费者组共同分摊消息
  9. 广播消费:相同消费者组的成员都得到全部消息
  10. 普通顺序消息:通过同一个Topic中的一个消费队列(MessageQueue)收到的消息是有顺序的
  11. 严格顺序消息:Topic中获取的消息都是顺序
  12. 消息:生产和消费的最小单位。每个消息有一个 Messageid,且可有一个 业务标识的 key。 MessageId和key可查询消息
  13. 标签(Tag):为消息设置的标志。用于同一主题下的不同类型消息。

    3.特性

  14. 订阅预发布:订阅:消费者订阅某个主题。发布:生产者向某个主题发布消息

  15. 消息顺序:按照发送的顺序来消费。
    • 全局顺序:从一个Topic中获取的消息时均是按照发送的先后顺序获得(需要将Topic的MessageQueue数量设置为1)
    • 分区顺序:从一个Topic的同一个分区(MessageQueue)的消息消费保持顺序(默认就是这样)
  16. 消息过滤:减少无用消息
    • 通过Tag标签过滤
    • 通过SQL表达式过滤
  17. 消息的可靠性
    • 单点暂时性故障(修复Broker即可):突然停电,运维拔网线
      • 异步刷盘:基本不丢
      • 同步刷盘:一定不丢
    • 单点永久性故障(使用Slave):磁盘坏了
      • 异步复制:基本不丢
      • 同步复制:一定不丢
  18. 消息语义:至少一次
    • 生产者:消息收到Broker的成功响应才结束,否则有重投机制
    • 消费者:只有消费者返回ACK时,Broker才会认为此消息被消费啦
  19. 回溯消费:支持按时间维度回退消费进度(毫秒级)
  20. 事务消息:应用本地事务和发送消息操作可以定义到全局事务中。要么同时成功,要么同时失败。事务消息提供类似X/OpenXA的分布事务功能,通过事务消息达到分布式事务一致性
  21. 定时消息:延迟队列是指被发送到Broker时,不会立即被消费,等待特定时间后才真正发送到topic。通过配置项messageDelayLevel。默认18个级别。定时消息会暂存到SCHEDULE_TOPIC_XXXX中。一个队列只存储相同延迟消息的消息。
  22. 消息重试:消费者消费失败后,提供重试机制,令消费者再消费一次。
    • 2种消费消息失败的场景
      • 由于消息自身的错误原因。一般应用程序编写跳过消息逻辑。
      • 下游服务不可用。即使消费其他消息也不能成功。可以暂时停止消费消息一段时间后再开始消费
    • RocketMQ提供的重试机制
      • 每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列
      • 重试次数约多下次重试等待时间越久
  23. 消息重投:同步消息会重投、异步消息会重试。可能会消费重复(无法避免)。
    • 重投策略:
      • retryTimesWhenSendFailed:针对同步发送的参数,默认2次重投(即最多3次选择Broker发送消息的机会)。超过次数,抛异常,客户端保证不丢失消息。重投触发逻辑必须时发消息时出现以下异常:
        • RemotingException
        • MQClientException
        • 部分MQBrokerException
      • retryTimeAsyncFailed:针对异步发送,设置重试次数(同一个broker重试)
      • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启
  24. 流量控制
    • 生产者流控
      • 若CommitLog文件被锁1s(可设置)则send失败
      • 在异步刷盘时且transientStorePool=true:transientStorePool资源不足,则send失败
      • 生产者每10ms检查请求队列头时间,若等待总时长大时,则send失败
    • 消费者流控
      • pullThresholdForQueue:最大拉取消息数量
      • pullThresholdSizeForQueue:最大拉取消息大小
      • consumeConcurrentlyMaxSpan:消息之间跨度
  25. 死信队列:就是存储消费不能成功的消息(达到重试次数的消息)的场所(Topic)。可以控制重发。

    4.安装

  26. 下载bin的zip包并解压(最好全英文的目录下)

  27. 配置环境变量(这步可以省略)

image.png
image.png

  1. 开启NameServer

    1. .\bin\mqnamesrv.cmd

    image.png

  2. 开启Broker

    1. .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

    image.png

  3. 测试发送消息

    1. tool.cmd org.apache.rocketmq.example.quickstart.Producer
  4. 消费者测试

    1. tool.cmd org.apache.rocketmq.example.quickstart.Consumer
  5. 关闭服务

ctrl+c

5.消息发送的3种方式

  1. 同步发送代码

    1. public static void main(String[] args) throws Exception{
    2. //Instantiate with a producer group name.
    3. DefaultMQProducer producer = new
    4. DefaultMQProducer("syncProducerGroup");
    5. // Specify name server addresses.
    6. producer.setNamesrvAddr("localhost:9876");
    7. //Launch the instance.
    8. producer.start();
    9. for (int i = 0; i < 100; i++) {
    10. //Create a message instance, specifying topic, tag and message body.
    11. Message msg = new Message("TopicTestSync" /* Topic */,
    12. "TagA" /* Tag */,
    13. ("Hello RocketMQ " +
    14. i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    15. );
    16. //Call send message to deliver message to one of brokers.
    17. SendResult sendResult = producer.send(msg);
    18. System.out.printf("%s%n", sendResult);
    19. }
    20. //Shut down once the producer instance is not longer in use.
    21. producer.shutdown();
    22. }
  2. 异步发送代码

    1. @Test
    2. public void testSendAsyncMsg() throws Exception{
    3. //Instantiate with a producer group name.
    4. DefaultMQProducer producer = new DefaultMQProducer("sendAsyncMsgGroup");
    5. // Specify name server addresses.
    6. producer.setNamesrvAddr("localhost:9876");
    7. //Launch the instance.
    8. producer.start();
    9. producer.setRetryTimesWhenSendAsyncFailed(0);
    10. int messageCount = 100;
    11. final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    12. for (int i = 0; i < messageCount; i++) {
    13. try {
    14. final int index = i;
    15. Message msg = new Message("AsyncTopicTest",
    16. "TagA",
    17. "OrderID188",
    18. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    19. producer.send(msg, new SendCallback() {
    20. @Override
    21. public void onSuccess(SendResult sendResult) {
    22. countDownLatch.countDown();
    23. System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    24. }
    25. @Override
    26. public void onException(Throwable e) {
    27. countDownLatch.countDown();
    28. System.out.printf("%-10d Exception %s %n", index, e);
    29. e.printStackTrace();
    30. }
    31. });
    32. } catch (Exception e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. countDownLatch.await(5, TimeUnit.SECONDS);
    37. producer.shutdown();
    38. }
  3. 单向发送

    1. @Test
    2. public void testSendSingle() throws Exception{
    3. //Instantiate with a producer group name.
    4. DefaultMQProducer producer = new DefaultMQProducer("SingleSendGroup");
    5. // Specify name server addresses.
    6. producer.setNamesrvAddr("localhost:9876");
    7. //Launch the instance.
    8. producer.start();
    9. for (int i = 0; i < 100; i++) {
    10. //Create a message instance, specifying topic, tag and message body.
    11. Message msg = new Message("SingleTopicTest" /* Topic */,
    12. "TagA" /* Tag */,
    13. ("Hello RocketMQ " +
    14. i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    15. );
    16. //Call send message to deliver message to one of brokers.
    17. producer.sendOneway(msg);
    18. }
    19. //Wait for sending to complete
    20. Thread.sleep(5000);
    21. producer.shutdown();
    22. }

6.简单消费消息

  1. @Test
  2. public void testConsumeMsg() throws Exception{
  3. // Instantiate with specified consumer group name.
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumeGroup");
  5. // Specify name server addresses.
  6. consumer.setNamesrvAddr("localhost:9876");
  7. // Subscribe one more more topics to consume.
  8. consumer.subscribe("TopicTestSync", "*");
  9. // Register callback to execute on arrival of messages fetched from brokers.
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  13. ConsumeConcurrentlyContext context) {
  14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. //Launch the consumer instance.
  19. consumer.start();
  20. Thread.sleep(50000);
  21. System.out.printf("Consumer Started.%n");
  22. }

参考资料