1.环境准备
1.创建Maven工程,quickstart的就好
2.在自己的机器上启动rocketmq,windows的上篇文章已经讲过
3.maven中导入rocketmq的连接jar包和测试包
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
2.RocketMQ整体存储原理
2.1消息存储的相关文件
2.1.1CommitLog文件
①存储消息的主体内容
②一个CommitLog文件大小为1G
③文件的命名为20位的二进制表示(文件位置:${HOME}/store/commitlog/),如下图
④如果第一文件0000**0000文件存满1G后,会生成名称为 :00000000001073741824文件(第二个文件)。
2.1.2ConsumeQueue文件
①消费者要消费消息时,需要到2.1.1中的commitlog文件中寻找数据,那么如何定位消息在commitLog文件的位置呢??
②ConsumeQueue文件中存贮每个消息在commitLog中的物理偏移量,消息的大小和消息Tag的hashCode
③图形展示ConsumeQueue文件和CommitLog文件的关系
2.1.3IndexFile文件
①提供了可以根据时间和key值来查询commitlog消息的索引
②提供的就是另一种查询消息的方式,官方说是个IndexFile实现的是hashMap结构。
③文件名称为 时间戳,按照时间查询就到时间对应的文件查询即可
④按照key查询,那么大概的原理图应该是:
3.RokectMQ相关概念
2.各个概念的图形化
3.一条消息包含的内容
①所属的Topic名称
②Tag标签
③key值
④消息内容
4.一个消息保存到RocketMq的一个过程
①生产者向nameserver请求发送消息
②nameserver将返回brokerserver的一个ip:port
③生产者向这个brokerserver发送消息(包含topic tag key和消息主题)
④这台brokerserver的commitlog文件会记录这个消息
⑤indexfile中会记录这个消息索引。
⑥消息会自动选择Topic下的一个consumequeue,将消息索引放入这个队列中。
⑦broker返回ack
4.RockectMQ的使用
4.1消息发送
发送一个消息需要确定3个问题。
①同步、异步或单项发送。
②发送普通消息、顺序消息还是批量发送(只能同步发)
③消息设不设置属性、设不设置延时。
4.1.1发送同步消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送。
⑥关闭生产者
2.代码
@Test
public void testSendSyncMsg() throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("syncMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
Message message = new Message("syncMsgTopic", "Tag", "key1", "Hello RocketMq".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.通过生产者将消息发送。
producer.send(message);
//6.关闭生产者
producer.shutdown();
}
4.1.2发送异步消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者将消息发送,并设置回调方法。
⑥关闭生产者
2.代码
@Test
public void testSendAsyncMsg() throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("aSyncMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
Message message = new Message("aSyncMsgTopic", "Tag", "key2", "Hello RocketMq Async".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.通过生产者将消息发送,并设置回调方法
producer.send(message, new SendCallback() {
//成功时的回调
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送异步消息成功了!");
System.out.println(sendResult);
}
//异常时的回调
@Override
public void onException(Throwable throwable) {
System.out.println("发送异步消息失败了");
throwable.printStackTrace();
}
});
//这里我可以使用闭锁,但是为了简单直接就睡一会
Thread.sleep(8000);
//6.关闭生产者
producer.shutdown();
}
4.1.3发送单向消息
- 步骤
①创建一个生产者,并指定生产者组
②为生产者指定 nameserver的ip和端口
③启动生产者实例
④创建消息实例
⑤通过生产者发送单向消息,没有任何的返回值
⑥关闭生产者
2.代码
@Test
public void testSendSingleMsg() throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("singleMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
Message message = new Message("singleMsgTopic", "Tag", "key3", "Hello RocketMq Single".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.通过生产者发送单向消息,没有任何的返回值
producer.sendOneway(message);
//6.关闭生产者
producer.shutdown();
}
4.1.4同步顺序消息(异步和单向的不再举例)
1.步骤
①创建生产者实例
②设置nameserver的ip和端口
③启动生产者实例
④创建3条消息
⑤通过producer发送3条消息,并设置序号
⑥关闭生产者
2.代码
@Test
public void testSendSequenceMsg() throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("sequenceMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建3个消息实例
ArrayList<Message> messages = new ArrayList<>();
Message message = new Message("sequenceMsgTopic", "Tag", "key1", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message message1 = new Message("sequenceMsgTopic", "Tag", "key2", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message message2 = new Message("sequenceMsgTopic", "Tag", "key3", "Hello RocketMq Sequence".getBytes(RemotingHelper.DEFAULT_CHARSET));
messages.add(message);
messages.add(message1);
messages.add(message2);
//5.遍历消息并发送在第一消息队列中
for(Message msg:messages){
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//都发在第一个队列中
return list.get(0);
}
}, messages.indexOf(msg)+1);
System.out.println(sendResult);
}
//6.关闭生产者
producer.shutdown();
}
3.实际上就是下面这个send使用
// 消息 指定消息队列 指定排序的
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
4.1.5延时消息
1.代码
@Test
public void testSendDlayMsg() throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("delayMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
Message message = new Message("delayMsgTopic", "Tag", "key1", "Hello RocketMq Delay".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.为消息设置延时等级 1~18级
message.setDelayTimeLevel(3);
//6.通过生产者将消息发送。
SendResult send = producer.send(message);
System.out.println(send);
//7.关闭生产者
producer.shutdown();
}
2.1~18级对应的时间
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
4.1.6批量消息(只能以同步方式发送)
1.代码
@Test
public void testSendBatchMsg()throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("batchMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
ArrayList<Message> messages = new ArrayList<>();
messages.add(new Message("batchMsgTopic", "Tag", "key1", "Hello RocketMq Batch1".getBytes(RemotingHelper.DEFAULT_CHARSET)));
messages.add(new Message("batchMsgTopic", "Tag", "key2", "Hello RocketMq Batch2".getBytes(RemotingHelper.DEFAULT_CHARSET)));
messages.add(new Message("batchMsgTopic", "Tag", "key3", "Hello RocketMq Batch3".getBytes(RemotingHelper.DEFAULT_CHARSET)));
messages.add(new Message("batchMsgTopic", "Tag", "key4", "Hello RocketMq Batch4".getBytes(RemotingHelper.DEFAULT_CHARSET)));
//5.发送
SendResult send = producer.send(messages);
System.out.println(send);
//6.关闭生产者
producer.shutdown();
}
2.注意:批量消息大小要小于 4M
3.当消息多了后使用官方提供的切list的类
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
===============================================================
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
4.1.7带有属性的消息
@Test
public void testSendHasPropertyMsg()throws Exception{
//1.创建一个生产者,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("propertyMsgProducerGroup");
//2.为生产者指定 nameserver的ip和端口
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者实例
producer.start();
//4.创建消息实例
Message message = new Message("propertyMsgTopic", "Tag", "key1", "Hello RocketMq Property1".getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.设置属性
message.putUserProperty("name","JamesLeBron");
//6.发送
SendResult send = producer.send(message);
System.out.println(send);
//7.关闭生产者
producer.shutdown();
}
4.2消费消息
4.2.1集群推动式普通消费
1.集群消费:同一个消费者组下均摊消息
2.推动式:Broker主动向消费者推送消息
3.代码
@Test
public void testClusterPushCommonConsume() throws Exception{
//1.实例化推动式消费模式
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syncConsume");
//2.设置消费者消费模式,默认也是Clustering
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
//3.设置nameServer的ip和端口和读取的偏移量(默认最后)
defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//4.设置订阅的主题和标签
defaultMQPushConsumer.subscribe("syncTopic","");
//5.注册监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list.size());
System.out.println(list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//6.启动消费者实例
defaultMQPushConsumer.start();
//7.睡眠
Thread.sleep(100000);
}