最近刷到小红书里一位极客时间里的大佬的话,其大概意思:“任何代码都不是写一次就可以完美的写成功。C++的STL的每个API平均下来都重写了14次以上。”。文章又何尝不是呢?所以这里我将我之前写过的文章重新阅读整改。这就是突破自己呀! 2022年4月3日
本篇文章的要点罗列:
- 简介:一笔带过,因为是纯概念的内容。
- 基本概念:参考官方文章的概念章节
- 特性:参考官方文章的特性章节
- 安装:Windows下安装RocketMQ
- 消息发送3种方式:演示
同步发送
、异步发送
和oneway发送
- 简单消费消息:演示了一个简单的消费Topic消息的代码
1.简介
1.RocketMQ是一个消息队列
2.优点:可以参考官网的Why RocketMQ2.基本概念
- 消息模型:主要由Producer、Broker、Consumer组成。
- Broker(对应一台服务器资源)的组成
- Topic:一个Broker有多个不同Topic,每个Topic的消息会分片存储在不同Broker上
- MessageQueue:一个Topic有多个MessageQueue,存储的是指向真实消息的数据信息
- Producer生产者
- 生产者组:生产者组包含多个生产者
- 生产者
- Consumer消费者
- 消费者组:消费者组包含多个消费者
- 消费者
- Broker(对应一台服务器资源)的组成
- 生产者:向Broker服务器发送消息。并获取Brokker的一个确认信息(单向发送(oneway方式)不需要)
- 消费者:从Broker服务器拉取消息。一个消费者只能显式指定一个Topic进行消费。消费形式有推送和主动拉取两种模式
- 拉取模式(Pull):消费者主动从Broker服务器中拉取消息
- 推动模式(Push):Broker收到生产者消息,主动推送给消费者(RocketMQ推模式依赖拉取模式)
- 生产者组:同一类Producer集合。原始生产者发送之后崩溃。Broker会联系其他成员提交
- 消费者组:同一类消费者集合。
- 主题(Topic):一类消息的集合。
- 代理服务器(Broker Server):负责存储、转发消息。并且存储消费者组、消费进度、队列消息的等元数据信息
- 名字服务(Name Server):充当路由。消费者和生产者通过名字服务找到各个主题相应的BrokerIP列表。NameServer彼此之间相互独立BrokerIP信息不同步
- 集群消费:相同消费者组共同分摊消息
- 广播消费:相同消费者组的成员都得到全部消息
- 普通顺序消息:通过同一个Topic中的一个消费队列(MessageQueue)收到的消息是有顺序的
- 严格顺序消息:Topic中获取的消息都是顺序
- 消息:生产和消费的最小单位。每个消息有一个 Messageid,且可有一个 业务标识的 key。 MessageId和key可查询消息
标签(Tag):为消息设置的标志。用于同一主题下的不同类型消息。
3.特性
订阅预发布:订阅:消费者订阅某个主题。发布:生产者向某个主题发布消息
- 消息顺序:按照发送的顺序来消费。
- 全局顺序:从一个Topic中获取的消息时均是按照发送的先后顺序获得(需要将Topic的MessageQueue数量设置为1)
- 分区顺序:从一个Topic的同一个分区(MessageQueue)的消息消费保持顺序(默认就是这样)
- 消息过滤:减少无用消息
- 通过Tag标签过滤
- 通过SQL表达式过滤
- 消息的可靠性
- 单点暂时性故障(修复Broker即可):突然停电,运维拔网线
- 异步刷盘:基本不丢
- 同步刷盘:一定不丢
- 单点永久性故障(使用Slave):磁盘坏了
- 异步复制:基本不丢
- 同步复制:一定不丢
- 单点暂时性故障(修复Broker即可):突然停电,运维拔网线
- 消息语义:至少一次
- 生产者:消息收到Broker的成功响应才结束,否则有重投机制
- 消费者:只有消费者返回ACK时,Broker才会认为此消息被消费啦
- 回溯消费:支持按时间维度回退消费进度(毫秒级)
- 事务消息:应用本地事务和发送消息操作可以定义到全局事务中。要么同时成功,要么同时失败。事务消息提供类似X/OpenXA的分布事务功能,通过事务消息达到分布式事务一致性
- 定时消息:延迟队列是指被发送到Broker时,不会立即被消费,等待特定时间后才真正发送到topic。通过配置项messageDelayLevel。默认18个级别。定时消息会暂存到
SCHEDULE_TOPIC_XXXX
中。一个队列只存储相同延迟消息的消息。 - 消息重试:消费者消费失败后,提供重试机制,令消费者再消费一次。
- 2种消费消息失败的场景
- 由于消息自身的错误原因。一般应用程序编写跳过消息逻辑。
- 下游服务不可用。即使消费其他消息也不能成功。可以暂时停止消费消息一段时间后再开始消费
- RocketMQ提供的重试机制
- 每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列
- 重试次数约多下次重试等待时间越久
- 2种消费消息失败的场景
- 消息重投:同步消息会重投、异步消息会重试。可能会消费重复(无法避免)。
- 重投策略:
- retryTimesWhenSendFailed:针对同步发送的参数,默认2次重投(即最多3次选择Broker发送消息的机会)。超过次数,抛异常,客户端保证不丢失消息。重投触发逻辑必须时发消息时出现以下异常:
- RemotingException
- MQClientException
- 部分MQBrokerException
- retryTimeAsyncFailed:针对异步发送,设置重试次数(同一个broker重试)
- retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启
- retryTimesWhenSendFailed:针对同步发送的参数,默认2次重投(即最多3次选择Broker发送消息的机会)。超过次数,抛异常,客户端保证不丢失消息。重投触发逻辑必须时发消息时出现以下异常:
- 重投策略:
- 流量控制
- 生产者流控
- 若CommitLog文件被锁1s(可设置)则send失败
- 在异步刷盘时且transientStorePool=true:transientStorePool资源不足,则send失败
- 生产者每10ms检查请求队列头时间,若等待总时长大时,则send失败
- 消费者流控
- pullThresholdForQueue:最大拉取消息数量
- pullThresholdSizeForQueue:最大拉取消息大小
- consumeConcurrentlyMaxSpan:消息之间跨度
- 生产者流控
死信队列:就是存储消费不能成功的消息(达到重试次数的消息)的场所(Topic)。可以控制重发。
4.安装
下载bin的zip包并解压(最好全英文的目录下)
- 配置环境变量(这步可以省略)
开启NameServer
.\bin\mqnamesrv.cmd
开启Broker
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
测试发送消息
tool.cmd org.apache.rocketmq.example.quickstart.Producer
消费者测试
tool.cmd org.apache.rocketmq.example.quickstart.Consumer
关闭服务
ctrl+c
5.消息发送的3种方式
同步发送代码
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("syncProducerGroup");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTestSync" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
异步发送代码
@Test
public void testSendAsyncMsg() throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("sendAsyncMsgGroup");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("AsyncTopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
单向发送
@Test
public void testSendSingle() throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("SingleSendGroup");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("SingleTopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
6.简单消费消息
@Test
public void testConsumeMsg() throws Exception{
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumeGroup");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTestSync", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
Thread.sleep(50000);
System.out.printf("Consumer Started.%n");
}