购买
Topic 消息类型
创建topic时,指定消息的类型
/**
* rocketmq 示例
* <p>
* 巨商汇开发MQ 实例ID:MQ_INST_1596633715004367_Bb10yCDU
* 阿里云控制台: https://ons.console.aliyun.com/region/cn-qingdao/instance/MQ_INST_1596633715004367_Bb10yCDU/detail
*/
public class RocketMqDemo {
//------定义常量类----
///---阿里云身份校验-----
String accessId = "";
String accessKey = "";
//--- 客户端接入点----
///---TCP接入点----
//内网 http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080
//tcp不提供公网
//properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080");
///---HTTP接入点---
//公网:http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com
//内网:http://1596633715004367.mqrest.cn-qingdao-internal.aliyuncs.com
String accountEndpoint = "http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com";
// 所属的 Topic
String topic = "TOPIC_TEST_GENGERAL";//普通话题
String topicTestFenquShunxu = "TOPIC_TEST_FENQU_SHUNXU";//分区顺序话题
String topicTestShunxu = "";//全局顺序话题
String topicYanshi = "TOPIC_YANSHI";//延时或定时话题
String topicShiwu = "TOPIC_SHIWU";//事务话题
// 您在控制台创建的 Consumer ID(Group ID) 注意TCP和HTTP 分开创建
String groupId = "GID_TEST_001";
// String groupId = "GID_TEST_002";
// Topic所属实例ID,默认实例为空
String instanceId = "MQ_INST_1596633715004367_Bb10yCDU";
//1.创建客户端
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
accountEndpoint,
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
accessId,
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
accessKey
);
/**
* 生产消息步骤:
* 1.创建生产者
* 2.创建消息
* 3.生产者发送消息
*/
@Test
public void 普通消息_生产者() {
MQProducer producer = mqClient.getProducer(instanceId, topic);
TopicMessage message = new TopicMessage();
message.setMessageKey("key");
message.setMessageBody("普通消息测试".getBytes());
// message.setMessageTag("tag01");
// 设置属性
// message.getProperties().put("a", "1");
// 同步发送消息,只要不抛异常就是成功
TopicMessage publishMessage = producer.publishMessage(message);
System.out.println(publishMessage);
}
@Test
public void 普通消息_生产者_异步生产() throws InterruptedException {
MQProducer producer = mqClient.getProducer(instanceId, topic);
TopicMessage message = new TopicMessage();
message.setMessageKey("key02");
message.setMessageBody("普通消息异步测试".getBytes());
message.setMessageTag("tag02");
// 设置属性
// message.getProperties().put("a", "1");
//异步发送消息 传入回调函数
AsyncResult<TopicMessage> asyncResult = producer.asyncPublishMessage(
//消息
message,
//回调函数
new AsyncCallback<TopicMessage>() {
@Override
public void onSuccess(TopicMessage result) {
System.err.println("成功!");
}
@Override
public void onFail(Exception ex) {
System.err.println("失败!");
}
});
System.out.println(asyncResult);
System.out.println(asyncResult.getResult());
//防止junit执行完就杀死进程 等一会
Thread.sleep(3_000);
}
/**
* 消费消息流程:
* 1.创建消费者
* 2.消费消息
* 3.消费确认
*/
@Test
public void 普通消息_消费者() {
//创建消费者
//入参:String instanceId mq实例ID, String topicName topic名称, String consumer 消费者, String messageTag 消息标签
MQConsumer consumer = mqClient.getConsumer(instanceId, topic, "GID_TEST_002", null);
List<Message> messages = consumer.consumeMessage(3, 5);
if (null != messages) {
System.err.println("消息数量" + messages.size());
List<String> receiptHandles = new ArrayList<>();
for (Message message : messages) {
receiptHandles.add(message.getReceiptHandle());
System.err.println("message:" + message);
System.err.println("消息体:" + message.getMessageBodyString());
}
consumer.ackMessage(receiptHandles);
} else {
System.err.println("消息为空");
}
}
@Test
public void 普通消息_消费者_循环消费() {
MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
while (true) {
// 长轮询消费消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = consumer.consumeMessage(
3,// 一次最多消费3条(最多可设置为16条)
3// 长轮询时间3秒(最多可设置为30秒)
);
System.err.println(messages);
// 没有消息
if (messages == null || messages.isEmpty()) {
System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// 处理业务逻辑
for (Message message : messages) {
System.err.println("Receive message: " + message);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
List<String> handles = new ArrayList<>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
consumer.ackMessage(handles);
}
}
@Test
public void 顺序消息_生产者() throws InterruptedException {
MQProducer producer = mqClient.getProducer(instanceId, topicTestFenquShunxu);
TopicMessage message1 = new TopicMessage();
message1.setMessageKey("k1");
message1.setMessageBody("分区顺序消息1".getBytes());
message1.setMessageTag("tag");
// 设置顺序消息的分区KEY
message1.setShardingKey("1");
// 同步发送消息,只要不抛异常就是成功
TopicMessage result1 = producer.publishMessage(message1);
System.err.println(result1);
TopicMessage message2 = new TopicMessage();
message2.setMessageKey("k2");
message2.setMessageBody("分区顺序消息2".getBytes());
message2.setMessageTag("tag");
// 设置顺序消息的分区KEY
message2.setShardingKey("1");
// 同步发送消息,只要不抛异常就是成功
TopicMessage result2 = producer.publishMessage(message2);
System.err.println(result2);
}
@Test
public void 顺序消息_消费者() {
MQConsumer consumer = mqClient.getConsumer(instanceId, topicTestFenquShunxu, groupId, null);
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
do {
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = consumer.consumeMessageOrderly(
3,// 一次最多消费3条(最多可设置为16条)
3// 长轮询时间3秒(最多可设置为30秒)
);
// 没有消息
if (messages == null || messages.isEmpty()) {
System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
} else {
// 处理业务逻辑
System.err.println("Receive " + messages.size() + " messages:");
for (Message message : messages) {
System.err.println(message);
System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
List<String> handles = new ArrayList<>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
consumer.ackMessage(handles);
}
} while (true);
}
@Test
public void 延时或定时消息_生产者() {
// 获取Topic的生产者
MQProducer producer = mqClient.getProducer(instanceId, topicYanshi);
// 循环发送4条消息
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// 普通消息
pubMsg = new TopicMessage(
// 消息内容
"hello mq!".getBytes(),
// 消息标签
"A"
);
// 设置属性
pubMsg.getProperties().put("a", String.valueOf(i));
// 设置KEY
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// 消息内容
"hello mq!".getBytes(),
// 消息标签
"A"
);
// 设置属性
pubMsg.getProperties().put("a", String.valueOf(i));
// 定时消息, 定时时间为10s后
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// 同步发送消息,只要不抛异常就是成功
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送消息,只要不抛异常就是成功
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
}
@Test
public void 延时或定时消息_消费者() {
MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
do {
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = consumer.consumeMessageOrderly(
3,// 一次最多消费3条(最多可设置为16条)
3// 长轮询时间3秒(最多可设置为30秒)
);
// 没有消息
if (messages == null || messages.isEmpty()) {
System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
} else {
// 处理业务逻辑
System.err.println("Receive " + messages.size() + " messages:");
for (Message message : messages) {
System.err.println(message);
System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
List<String> handles = new ArrayList<>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
consumer.ackMessage(handles);
}
} while (true);
}
@Test
public void 事务消息_生产者() throws InterruptedException {
final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topicShiwu, groupId);
for (int i = 0; i < 4; i++) {
TopicMessage message = new TopicMessage();
message.setMessageBody("trans_msg".getBytes());
message.setMessageTag("tag");
message.setMessageKey(String.valueOf(System.currentTimeMillis()));
// 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
// 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
message.setTransCheckImmunityTime(10);
message.getProperties().put("a", String.valueOf(i));
TopicMessage pubResultMsg = mqTransProducer.publishMessage(message);
System.err.println("Send---->msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
+ ", Handle: " + pubResultMsg.getReceiptHandle()
);
if (pubResultMsg.getReceiptHandle() != null) {
if (i == 0) {
// 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
mqTransProducer.commit(pubResultMsg.getReceiptHandle());
System.err.printf("MessageId:%s, commit%n", pubResultMsg.getMessageId());
}
}
}
// 客户端需要有一个线程或者进程来消费没有确认的事务消息
// 示例这里启动一个线程来检查没有确认的事务消息
Thread t = new Thread(() -> {
int count = 0;
while (true) {
if (count == 3) {
break;
}
List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
if (messages == null) {
System.out.println("No Half message!");
continue;
}
System.out.printf("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d%n",
messages.get(0).getMessageId(),
messages.get(0).getProperties(),
messages.get(0).getMessageBodyString(),
System.currentTimeMillis() - messages.get(0).getPublishTime());
for (Message message : messages) {
int i = Integer.parseInt(message.getProperties().get("a"));
if (i == 1 ||
i == 2 && message.getConsumedTimes() > 1 ||
i == 3) {
// 确认提交事务消息
mqTransProducer.commit(message.getReceiptHandle());
count++;
System.out.printf("MessageId:%s, commit%n", message.getMessageId());
} else {
// 什么都不做,下次再检查
System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
}
}
}
});
t.start();
t.join();
}
@Test
public void 事务消息_消费者() {
MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
do {
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = consumer.consumeMessageOrderly(
3,// 一次最多消费3条(最多可设置为16条)
3// 长轮询时间3秒(最多可设置为30秒)
);
// 没有消息
if (messages == null || messages.isEmpty()) {
System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
} else {
// 处理业务逻辑
System.err.println("Receive " + messages.size() + " messages:");
for (Message message : messages) {
System.err.println(message);
System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
List<String> handles = new ArrayList<>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
consumer.ackMessage(handles);
}
} while (true);
}
@After
public void after() {
mqClient.close();
}
}