购买
Topic 消息类型

创建topic时,指定消息的类型
/*** rocketmq 示例* <p>* 巨商汇开发MQ 实例ID:MQ_INST_1596633715004367_Bb10yCDU* 阿里云控制台: https://ons.console.aliyun.com/region/cn-qingdao/instance/MQ_INST_1596633715004367_Bb10yCDU/detail*/public class RocketMqDemo {//------定义常量类----///---阿里云身份校验-----String accessId = "";String accessKey = "";//--- 客户端接入点----///---TCP接入点----//内网 http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080//tcp不提供公网//properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080");///---HTTP接入点---//公网:http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com//内网:http://1596633715004367.mqrest.cn-qingdao-internal.aliyuncs.comString accountEndpoint = "http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com";// 所属的 TopicString topic = "TOPIC_TEST_GENGERAL";//普通话题String topicTestFenquShunxu = "TOPIC_TEST_FENQU_SHUNXU";//分区顺序话题String topicTestShunxu = "";//全局顺序话题String topicYanshi = "TOPIC_YANSHI";//延时或定时话题String topicShiwu = "TOPIC_SHIWU";//事务话题// 您在控制台创建的 Consumer ID(Group ID) 注意TCP和HTTP 分开创建String groupId = "GID_TEST_001";// String groupId = "GID_TEST_002";// Topic所属实例ID,默认实例为空String instanceId = "MQ_INST_1596633715004367_Bb10yCDU";//1.创建客户端MQClient mqClient = new MQClient(// 设置HTTP接入域名(此处以公共云生产环境为例)accountEndpoint,// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建accessId,// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建accessKey);/*** 生产消息步骤:* 1.创建生产者* 2.创建消息* 3.生产者发送消息*/@Testpublic void 普通消息_生产者() {MQProducer producer = mqClient.getProducer(instanceId, topic);TopicMessage message = new TopicMessage();message.setMessageKey("key");message.setMessageBody("普通消息测试".getBytes());// message.setMessageTag("tag01");// 设置属性// message.getProperties().put("a", "1");// 同步发送消息,只要不抛异常就是成功TopicMessage publishMessage = producer.publishMessage(message);System.out.println(publishMessage);}@Testpublic void 普通消息_生产者_异步生产() throws InterruptedException {MQProducer producer = mqClient.getProducer(instanceId, topic);TopicMessage message = new TopicMessage();message.setMessageKey("key02");message.setMessageBody("普通消息异步测试".getBytes());message.setMessageTag("tag02");// 设置属性// message.getProperties().put("a", "1");//异步发送消息 传入回调函数AsyncResult<TopicMessage> asyncResult = producer.asyncPublishMessage(//消息message,//回调函数new AsyncCallback<TopicMessage>() {@Overridepublic void onSuccess(TopicMessage result) {System.err.println("成功!");}@Overridepublic void onFail(Exception ex) {System.err.println("失败!");}});System.out.println(asyncResult);System.out.println(asyncResult.getResult());//防止junit执行完就杀死进程 等一会Thread.sleep(3_000);}/*** 消费消息流程:* 1.创建消费者* 2.消费消息* 3.消费确认*/@Testpublic void 普通消息_消费者() {//创建消费者//入参:String instanceId mq实例ID, String topicName topic名称, String consumer 消费者, String messageTag 消息标签MQConsumer consumer = mqClient.getConsumer(instanceId, topic, "GID_TEST_002", null);List<Message> messages = consumer.consumeMessage(3, 5);if (null != messages) {System.err.println("消息数量" + messages.size());List<String> receiptHandles = new ArrayList<>();for (Message message : messages) {receiptHandles.add(message.getReceiptHandle());System.err.println("message:" + message);System.err.println("消息体:" + message.getMessageBodyString());}consumer.ackMessage(receiptHandles);} else {System.err.println("消息为空");}}@Testpublic void 普通消息_消费者_循环消费() {MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息while (true) {// 长轮询消费消息// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回List<Message> messages = consumer.consumeMessage(3,// 一次最多消费3条(最多可设置为16条)3// 长轮询时间3秒(最多可设置为30秒));System.err.println(messages);// 没有消息if (messages == null || messages.isEmpty()) {System.err.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}// 处理业务逻辑for (Message message : messages) {System.err.println("Receive message: " + message);}// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样List<String> handles = new ArrayList<>();for (Message message : messages) {handles.add(message.getReceiptHandle());}consumer.ackMessage(handles);}}@Testpublic void 顺序消息_生产者() throws InterruptedException {MQProducer producer = mqClient.getProducer(instanceId, topicTestFenquShunxu);TopicMessage message1 = new TopicMessage();message1.setMessageKey("k1");message1.setMessageBody("分区顺序消息1".getBytes());message1.setMessageTag("tag");// 设置顺序消息的分区KEYmessage1.setShardingKey("1");// 同步发送消息,只要不抛异常就是成功TopicMessage result1 = producer.publishMessage(message1);System.err.println(result1);TopicMessage message2 = new TopicMessage();message2.setMessageKey("k2");message2.setMessageBody("分区顺序消息2".getBytes());message2.setMessageTag("tag");// 设置顺序消息的分区KEYmessage2.setShardingKey("1");// 同步发送消息,只要不抛异常就是成功TopicMessage result2 = producer.publishMessage(message2);System.err.println(result2);}@Testpublic void 顺序消息_消费者() {MQConsumer consumer = mqClient.getConsumer(instanceId, topicTestFenquShunxu, groupId, null);// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息do {// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回List<Message> messages = consumer.consumeMessageOrderly(3,// 一次最多消费3条(最多可设置为16条)3// 长轮询时间3秒(最多可设置为30秒));// 没有消息if (messages == null || messages.isEmpty()) {System.err.println(Thread.currentThread().getName() + ": no new message, continue!");} else {// 处理业务逻辑System.err.println("Receive " + messages.size() + " messages:");for (Message message : messages) {System.err.println(message);System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));}// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样List<String> handles = new ArrayList<>();for (Message message : messages) {handles.add(message.getReceiptHandle());}consumer.ackMessage(handles);}} while (true);}@Testpublic void 延时或定时消息_生产者() {// 获取Topic的生产者MQProducer producer = mqClient.getProducer(instanceId, topicYanshi);// 循环发送4条消息for (int i = 0; i < 4; i++) {TopicMessage pubMsg;if (i % 2 == 0) {// 普通消息pubMsg = new TopicMessage(// 消息内容"hello mq!".getBytes(),// 消息标签"A");// 设置属性pubMsg.getProperties().put("a", String.valueOf(i));// 设置KEYpubMsg.setMessageKey("MessageKey");} else {pubMsg = new TopicMessage(// 消息内容"hello mq!".getBytes(),// 消息标签"A");// 设置属性pubMsg.getProperties().put("a", String.valueOf(i));// 定时消息, 定时时间为10s后pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);}// 同步发送消息,只要不抛异常就是成功TopicMessage pubResultMsg = producer.publishMessage(pubMsg);// 同步发送消息,只要不抛异常就是成功System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());}}@Testpublic void 延时或定时消息_消费者() {MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息do {// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回List<Message> messages = consumer.consumeMessageOrderly(3,// 一次最多消费3条(最多可设置为16条)3// 长轮询时间3秒(最多可设置为30秒));// 没有消息if (messages == null || messages.isEmpty()) {System.err.println(Thread.currentThread().getName() + ": no new message, continue!");} else {// 处理业务逻辑System.err.println("Receive " + messages.size() + " messages:");for (Message message : messages) {System.err.println(message);System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));}// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样List<String> handles = new ArrayList<>();for (Message message : messages) {handles.add(message.getReceiptHandle());}consumer.ackMessage(handles);}} while (true);}@Testpublic void 事务消息_生产者() throws InterruptedException {final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topicShiwu, groupId);for (int i = 0; i < 4; i++) {TopicMessage message = new TopicMessage();message.setMessageBody("trans_msg".getBytes());message.setMessageTag("tag");message.setMessageKey(String.valueOf(System.currentTimeMillis()));// 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间// 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天message.setTransCheckImmunityTime(10);message.getProperties().put("a", String.valueOf(i));TopicMessage pubResultMsg = mqTransProducer.publishMessage(message);System.err.println("Send---->msgId is: " + pubResultMsg.getMessageId()+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()+ ", Handle: " + pubResultMsg.getReceiptHandle());if (pubResultMsg.getReceiptHandle() != null) {if (i == 0) {// 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息mqTransProducer.commit(pubResultMsg.getReceiptHandle());System.err.printf("MessageId:%s, commit%n", pubResultMsg.getMessageId());}}}// 客户端需要有一个线程或者进程来消费没有确认的事务消息// 示例这里启动一个线程来检查没有确认的事务消息Thread t = new Thread(() -> {int count = 0;while (true) {if (count == 3) {break;}List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);if (messages == null) {System.out.println("No Half message!");continue;}System.out.printf("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d%n",messages.get(0).getMessageId(),messages.get(0).getProperties(),messages.get(0).getMessageBodyString(),System.currentTimeMillis() - messages.get(0).getPublishTime());for (Message message : messages) {int i = Integer.parseInt(message.getProperties().get("a"));if (i == 1 ||i == 2 && message.getConsumedTimes() > 1 ||i == 3) {// 确认提交事务消息mqTransProducer.commit(message.getReceiptHandle());count++;System.out.printf("MessageId:%s, commit%n", message.getMessageId());} else {// 什么都不做,下次再检查System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));}}}});t.start();t.join();}@Testpublic void 事务消息_消费者() {MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息do {// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回List<Message> messages = consumer.consumeMessageOrderly(3,// 一次最多消费3条(最多可设置为16条)3// 长轮询时间3秒(最多可设置为30秒));// 没有消息if (messages == null || messages.isEmpty()) {System.err.println(Thread.currentThread().getName() + ": no new message, continue!");} else {// 处理业务逻辑System.err.println("Receive " + messages.size() + " messages:");for (Message message : messages) {System.err.println(message);System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));}// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样List<String> handles = new ArrayList<>();for (Message message : messages) {handles.add(message.getReceiptHandle());}consumer.ackMessage(handles);}} while (true);}@Afterpublic void after() {mqClient.close();}}

