购买

image.png

Topic 消息类型

image.png
创建topic时,指定消息的类型

  • 普通消息
  • 事务消息
  • 分区顺序消息
  • 全局顺序消息
  • 定时/延时消息

    架构图

    image.png
  1. /**
  2. * rocketmq 示例
  3. * <p>
  4. * 巨商汇开发MQ 实例ID:MQ_INST_1596633715004367_Bb10yCDU
  5. * 阿里云控制台: https://ons.console.aliyun.com/region/cn-qingdao/instance/MQ_INST_1596633715004367_Bb10yCDU/detail
  6. */
  7. public class RocketMqDemo {
  8. //------定义常量类----
  9. ///---阿里云身份校验-----
  10. String accessId = "";
  11. String accessKey = "";
  12. //--- 客户端接入点----
  13. ///---TCP接入点----
  14. //内网 http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080
  15. //tcp不提供公网
  16. //properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1596633715004367_Bb10yCDU.cn-qingdao.mq-internal.aliyuncs.com:8080");
  17. ///---HTTP接入点---
  18. //公网:http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com
  19. //内网:http://1596633715004367.mqrest.cn-qingdao-internal.aliyuncs.com
  20. String accountEndpoint = "http://1596633715004367.mqrest.cn-qingdao.aliyuncs.com";
  21. // 所属的 Topic
  22. String topic = "TOPIC_TEST_GENGERAL";//普通话题
  23. String topicTestFenquShunxu = "TOPIC_TEST_FENQU_SHUNXU";//分区顺序话题
  24. String topicTestShunxu = "";//全局顺序话题
  25. String topicYanshi = "TOPIC_YANSHI";//延时或定时话题
  26. String topicShiwu = "TOPIC_SHIWU";//事务话题
  27. // 您在控制台创建的 Consumer ID(Group ID) 注意TCP和HTTP 分开创建
  28. String groupId = "GID_TEST_001";
  29. // String groupId = "GID_TEST_002";
  30. // Topic所属实例ID,默认实例为空
  31. String instanceId = "MQ_INST_1596633715004367_Bb10yCDU";
  32. //1.创建客户端
  33. MQClient mqClient = new MQClient(
  34. // 设置HTTP接入域名(此处以公共云生产环境为例)
  35. accountEndpoint,
  36. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  37. accessId,
  38. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  39. accessKey
  40. );
  41. /**
  42. * 生产消息步骤:
  43. * 1.创建生产者
  44. * 2.创建消息
  45. * 3.生产者发送消息
  46. */
  47. @Test
  48. public void 普通消息_生产者() {
  49. MQProducer producer = mqClient.getProducer(instanceId, topic);
  50. TopicMessage message = new TopicMessage();
  51. message.setMessageKey("key");
  52. message.setMessageBody("普通消息测试".getBytes());
  53. // message.setMessageTag("tag01");
  54. // 设置属性
  55. // message.getProperties().put("a", "1");
  56. // 同步发送消息,只要不抛异常就是成功
  57. TopicMessage publishMessage = producer.publishMessage(message);
  58. System.out.println(publishMessage);
  59. }
  60. @Test
  61. public void 普通消息_生产者_异步生产() throws InterruptedException {
  62. MQProducer producer = mqClient.getProducer(instanceId, topic);
  63. TopicMessage message = new TopicMessage();
  64. message.setMessageKey("key02");
  65. message.setMessageBody("普通消息异步测试".getBytes());
  66. message.setMessageTag("tag02");
  67. // 设置属性
  68. // message.getProperties().put("a", "1");
  69. //异步发送消息 传入回调函数
  70. AsyncResult<TopicMessage> asyncResult = producer.asyncPublishMessage(
  71. //消息
  72. message,
  73. //回调函数
  74. new AsyncCallback<TopicMessage>() {
  75. @Override
  76. public void onSuccess(TopicMessage result) {
  77. System.err.println("成功!");
  78. }
  79. @Override
  80. public void onFail(Exception ex) {
  81. System.err.println("失败!");
  82. }
  83. });
  84. System.out.println(asyncResult);
  85. System.out.println(asyncResult.getResult());
  86. //防止junit执行完就杀死进程 等一会
  87. Thread.sleep(3_000);
  88. }
  89. /**
  90. * 消费消息流程:
  91. * 1.创建消费者
  92. * 2.消费消息
  93. * 3.消费确认
  94. */
  95. @Test
  96. public void 普通消息_消费者() {
  97. //创建消费者
  98. //入参:String instanceId mq实例ID, String topicName topic名称, String consumer 消费者, String messageTag 消息标签
  99. MQConsumer consumer = mqClient.getConsumer(instanceId, topic, "GID_TEST_002", null);
  100. List<Message> messages = consumer.consumeMessage(3, 5);
  101. if (null != messages) {
  102. System.err.println("消息数量" + messages.size());
  103. List<String> receiptHandles = new ArrayList<>();
  104. for (Message message : messages) {
  105. receiptHandles.add(message.getReceiptHandle());
  106. System.err.println("message:" + message);
  107. System.err.println("消息体:" + message.getMessageBodyString());
  108. }
  109. consumer.ackMessage(receiptHandles);
  110. } else {
  111. System.err.println("消息为空");
  112. }
  113. }
  114. @Test
  115. public void 普通消息_消费者_循环消费() {
  116. MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
  117. // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
  118. while (true) {
  119. // 长轮询消费消息
  120. // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  121. List<Message> messages = consumer.consumeMessage(
  122. 3,// 一次最多消费3条(最多可设置为16条)
  123. 3// 长轮询时间3秒(最多可设置为30秒)
  124. );
  125. System.err.println(messages);
  126. // 没有消息
  127. if (messages == null || messages.isEmpty()) {
  128. System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
  129. continue;
  130. }
  131. // 处理业务逻辑
  132. for (Message message : messages) {
  133. System.err.println("Receive message: " + message);
  134. }
  135. // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
  136. // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
  137. List<String> handles = new ArrayList<>();
  138. for (Message message : messages) {
  139. handles.add(message.getReceiptHandle());
  140. }
  141. consumer.ackMessage(handles);
  142. }
  143. }
  144. @Test
  145. public void 顺序消息_生产者() throws InterruptedException {
  146. MQProducer producer = mqClient.getProducer(instanceId, topicTestFenquShunxu);
  147. TopicMessage message1 = new TopicMessage();
  148. message1.setMessageKey("k1");
  149. message1.setMessageBody("分区顺序消息1".getBytes());
  150. message1.setMessageTag("tag");
  151. // 设置顺序消息的分区KEY
  152. message1.setShardingKey("1");
  153. // 同步发送消息,只要不抛异常就是成功
  154. TopicMessage result1 = producer.publishMessage(message1);
  155. System.err.println(result1);
  156. TopicMessage message2 = new TopicMessage();
  157. message2.setMessageKey("k2");
  158. message2.setMessageBody("分区顺序消息2".getBytes());
  159. message2.setMessageTag("tag");
  160. // 设置顺序消息的分区KEY
  161. message2.setShardingKey("1");
  162. // 同步发送消息,只要不抛异常就是成功
  163. TopicMessage result2 = producer.publishMessage(message2);
  164. System.err.println(result2);
  165. }
  166. @Test
  167. public void 顺序消息_消费者() {
  168. MQConsumer consumer = mqClient.getConsumer(instanceId, topicTestFenquShunxu, groupId, null);
  169. // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
  170. do {
  171. // 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
  172. // 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
  173. // 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
  174. // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  175. List<Message> messages = consumer.consumeMessageOrderly(
  176. 3,// 一次最多消费3条(最多可设置为16条)
  177. 3// 长轮询时间3秒(最多可设置为30秒)
  178. );
  179. // 没有消息
  180. if (messages == null || messages.isEmpty()) {
  181. System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
  182. } else {
  183. // 处理业务逻辑
  184. System.err.println("Receive " + messages.size() + " messages:");
  185. for (Message message : messages) {
  186. System.err.println(message);
  187. System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
  188. }
  189. // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
  190. // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
  191. List<String> handles = new ArrayList<>();
  192. for (Message message : messages) {
  193. handles.add(message.getReceiptHandle());
  194. }
  195. consumer.ackMessage(handles);
  196. }
  197. } while (true);
  198. }
  199. @Test
  200. public void 延时或定时消息_生产者() {
  201. // 获取Topic的生产者
  202. MQProducer producer = mqClient.getProducer(instanceId, topicYanshi);
  203. // 循环发送4条消息
  204. for (int i = 0; i < 4; i++) {
  205. TopicMessage pubMsg;
  206. if (i % 2 == 0) {
  207. // 普通消息
  208. pubMsg = new TopicMessage(
  209. // 消息内容
  210. "hello mq!".getBytes(),
  211. // 消息标签
  212. "A"
  213. );
  214. // 设置属性
  215. pubMsg.getProperties().put("a", String.valueOf(i));
  216. // 设置KEY
  217. pubMsg.setMessageKey("MessageKey");
  218. } else {
  219. pubMsg = new TopicMessage(
  220. // 消息内容
  221. "hello mq!".getBytes(),
  222. // 消息标签
  223. "A"
  224. );
  225. // 设置属性
  226. pubMsg.getProperties().put("a", String.valueOf(i));
  227. // 定时消息, 定时时间为10s后
  228. pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
  229. }
  230. // 同步发送消息,只要不抛异常就是成功
  231. TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
  232. // 同步发送消息,只要不抛异常就是成功
  233. System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
  234. + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
  235. }
  236. }
  237. @Test
  238. public void 延时或定时消息_消费者() {
  239. MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);
  240. // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
  241. do {
  242. // 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
  243. // 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
  244. // 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
  245. // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  246. List<Message> messages = consumer.consumeMessageOrderly(
  247. 3,// 一次最多消费3条(最多可设置为16条)
  248. 3// 长轮询时间3秒(最多可设置为30秒)
  249. );
  250. // 没有消息
  251. if (messages == null || messages.isEmpty()) {
  252. System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
  253. } else {
  254. // 处理业务逻辑
  255. System.err.println("Receive " + messages.size() + " messages:");
  256. for (Message message : messages) {
  257. System.err.println(message);
  258. System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
  259. }
  260. // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
  261. // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
  262. List<String> handles = new ArrayList<>();
  263. for (Message message : messages) {
  264. handles.add(message.getReceiptHandle());
  265. }
  266. consumer.ackMessage(handles);
  267. }
  268. } while (true);
  269. }
  270. @Test
  271. public void 事务消息_生产者() throws InterruptedException {
  272. final MQTransProducer mqTransProducer = mqClient.getTransProducer(instanceId, topicShiwu, groupId);
  273. for (int i = 0; i < 4; i++) {
  274. TopicMessage message = new TopicMessage();
  275. message.setMessageBody("trans_msg".getBytes());
  276. message.setMessageTag("tag");
  277. message.setMessageKey(String.valueOf(System.currentTimeMillis()));
  278. // 设置事务第一次回查的时间,为相对时间,单位:秒,范围为10~300s之间
  279. // 第一次事务回查后如果消息没有commit或者rollback,则之后每隔10s左右会回查一次,总共回查一天
  280. message.setTransCheckImmunityTime(10);
  281. message.getProperties().put("a", String.valueOf(i));
  282. TopicMessage pubResultMsg = mqTransProducer.publishMessage(message);
  283. System.err.println("Send---->msgId is: " + pubResultMsg.getMessageId()
  284. + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()
  285. + ", Handle: " + pubResultMsg.getReceiptHandle()
  286. );
  287. if (pubResultMsg.getReceiptHandle() != null) {
  288. if (i == 0) {
  289. // 发送完事务消息后能获取到半消息句柄,可以直接commit/rollback事务消息
  290. mqTransProducer.commit(pubResultMsg.getReceiptHandle());
  291. System.err.printf("MessageId:%s, commit%n", pubResultMsg.getMessageId());
  292. }
  293. }
  294. }
  295. // 客户端需要有一个线程或者进程来消费没有确认的事务消息
  296. // 示例这里启动一个线程来检查没有确认的事务消息
  297. Thread t = new Thread(() -> {
  298. int count = 0;
  299. while (true) {
  300. if (count == 3) {
  301. break;
  302. }
  303. List<Message> messages = mqTransProducer.consumeHalfMessage(3, 3);
  304. if (messages == null) {
  305. System.out.println("No Half message!");
  306. continue;
  307. }
  308. System.out.printf("Half---->MessageId:%s,Properties:%s,Body:%s,Latency:%d%n",
  309. messages.get(0).getMessageId(),
  310. messages.get(0).getProperties(),
  311. messages.get(0).getMessageBodyString(),
  312. System.currentTimeMillis() - messages.get(0).getPublishTime());
  313. for (Message message : messages) {
  314. int i = Integer.parseInt(message.getProperties().get("a"));
  315. if (i == 1 ||
  316. i == 2 && message.getConsumedTimes() > 1 ||
  317. i == 3) {
  318. // 确认提交事务消息
  319. mqTransProducer.commit(message.getReceiptHandle());
  320. count++;
  321. System.out.printf("MessageId:%s, commit%n", message.getMessageId());
  322. } else {
  323. // 什么都不做,下次再检查
  324. System.out.println(String.format("MessageId:%s, unknown", message.getMessageId()));
  325. }
  326. }
  327. }
  328. });
  329. t.start();
  330. t.join();
  331. }
  332. @Test
  333. public void 事务消息_消费者() {
  334. MQConsumer consumer = mqClient.getConsumer(instanceId, topicYanshi, groupId, null);
  335. // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
  336. do {
  337. // 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
  338. // 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
  339. // 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
  340. // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
  341. List<Message> messages = consumer.consumeMessageOrderly(
  342. 3,// 一次最多消费3条(最多可设置为16条)
  343. 3// 长轮询时间3秒(最多可设置为30秒)
  344. );
  345. // 没有消息
  346. if (messages == null || messages.isEmpty()) {
  347. System.err.println(Thread.currentThread().getName() + ": no new message, continue!");
  348. } else {
  349. // 处理业务逻辑
  350. System.err.println("Receive " + messages.size() + " messages:");
  351. for (Message message : messages) {
  352. System.err.println(message);
  353. System.err.println("ShardingKey: " + message.getShardingKey() + ", a:" + message.getProperties().get("a"));
  354. }
  355. // Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
  356. // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
  357. List<String> handles = new ArrayList<>();
  358. for (Message message : messages) {
  359. handles.add(message.getReceiptHandle());
  360. }
  361. consumer.ackMessage(handles);
  362. }
  363. } while (true);
  364. }
  365. @After
  366. public void after() {
  367. mqClient.close();
  368. }
  369. }