RocketMQ提供了事务消息—->高可靠消息
首先需要搭建RocketMQ服务器:文章地址

导入无事务项目

  1. 将项目解压至rocketmq-dtx工程目录
  2. 导入
    • 添加模块
    • 导入pom文件导入模块

订单服务使用事务消息异步调用账户服务

添加事务消息

步骤如下:

  1. 父项目添加spring rocketMQ依赖

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.2.1</version>
    5. </dependency>
  2. yml配置rocketmq连接和生产者组名

    1. rocketmq:
    2. name-server: 192.168.64.141.:9876
    3. producer:
    4. group: order_producer
  3. 添加新的数据表tx_table记录事务执行状态

修改 db-init 项目中的 order.sql 文件,创建 tx_table 表
order_rocketmq.sql

  1. 添加实体类TXInfo与Mapper

    1. @Data
    2. @NoArgsConstructor
    3. @AllArgsConstructor
    4. @Accessors(chain = true)
    5. public class TxInfo {
    6. /**
    7. * 事务id
    8. */
    9. private String xid;
    10. /**
    11. * 事务执行状态,0成功,1失败,2未知
    12. */
    13. private Integer status;
    14. /**
    15. * 存储时间
    16. */
    17. private Long created;
    18. }

    mapper:

    1. <?xml version="1.0" encoding="UTF-8" ?>
    2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    3. <mapper namespace="cn.tedu.order.mapper.TxMapper">
    4. <resultMap id="BaseResultMap" type="cn.tedu.order.entity.TxInfo" >
    5. <id column="xid" property="xid" jdbcType="CHAR" />
    6. <result column="created_at" property="created" jdbcType="BIGINT" />
    7. <result column="status" property="status" jdbcType="INTEGER"/>
    8. </resultMap>
    9. <insert id="insert">
    10. INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});
    11. </insert>
    12. <select id="selectById" resultMap="BaseResultMap">
    13. SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};
    14. </select>
    15. </mapper>
  2. 创建AccountMessage,封装发给用户的调用消息:UserId,money,txId

    1. @Data
    2. @NoArgsConstructor
    3. @AllArgsConstructor
    4. @Accessors(chain = true)
    5. public class AccountMessage {
    6. private Long userId;
    7. private BigDecimal money;
    8. private String xid;
    9. }
  3. 将封装的数据转换为Json字符串使用JsonUtil

    1. @Slf4j
    2. public class JsonUtil {
    3. private static ObjectMapper mapper;
    4. private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;
    5. private static boolean IS_ENABLE_INDENT_OUTPUT = false;
    6. private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";
    7. static {
    8. try {
    9. initMapper();
    10. configPropertyInclusion();
    11. configIndentOutput();
    12. configCommon();
    13. } catch (Exception e) {
    14. log.error("jackson config error", e);
    15. }
    16. }
    17. private static void initMapper() {
    18. mapper = new ObjectMapper();
    19. }
    20. private static void configCommon() {
    21. config(mapper);
    22. }
    23. private static void configPropertyInclusion() {
    24. mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);
    25. }
    26. private static void configIndentOutput() {
    27. mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);
    28. }
    29. private static void config(ObjectMapper objectMapper) {
    30. objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
    31. objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
    32. objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
    33. objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
    34. objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
    35. objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
    36. objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);
    37. objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
    38. objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
    39. objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);
    40. objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);
    41. objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);
    42. objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    43. objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    44. objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
    45. objectMapper.registerModule(new ParameterNamesModule());
    46. objectMapper.registerModule(new Jdk8Module());
    47. objectMapper.registerModule(new JavaTimeModule());
    48. }
    49. public static void setSerializationInclusion(JsonInclude.Include inclusion) {
    50. DEFAULT_PROPERTY_INCLUSION = inclusion;
    51. configPropertyInclusion();
    52. }
    53. public static void setIndentOutput(boolean isEnable) {
    54. IS_ENABLE_INDENT_OUTPUT = isEnable;
    55. configIndentOutput();
    56. }
    57. public static <V> V from(URL url, Class<V> c) {
    58. try {
    59. return mapper.readValue(url, c);
    60. } catch (IOException e) {
    61. log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);
    62. return null;
    63. }
    64. }
    65. public static <V> V from(InputStream inputStream, Class<V> c) {
    66. try {
    67. return mapper.readValue(inputStream, c);
    68. } catch (IOException e) {
    69. log.error("jackson from error, type: {}", c, e);
    70. return null;
    71. }
    72. }
    73. public static <V> V from(File file, Class<V> c) {
    74. try {
    75. return mapper.readValue(file, c);
    76. } catch (IOException e) {
    77. log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);
    78. return null;
    79. }
    80. }
    81. public static <V> V from(Object jsonObj, Class<V> c) {
    82. try {
    83. return mapper.readValue(jsonObj.toString(), c);
    84. } catch (IOException e) {
    85. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);
    86. return null;
    87. }
    88. }
    89. public static <V> V from(String json, Class<V> c) {
    90. try {
    91. return mapper.readValue(json, c);
    92. } catch (IOException e) {
    93. log.error("jackson from error, json: {}, type: {}", json, c, e);
    94. return null;
    95. }
    96. }
    97. public static <V> V from(URL url, TypeReference<V> type) {
    98. try {
    99. return mapper.readValue(url, type);
    100. } catch (IOException e) {
    101. log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);
    102. return null;
    103. }
    104. }
    105. public static <V> V from(InputStream inputStream, TypeReference<V> type) {
    106. try {
    107. return mapper.readValue(inputStream, type);
    108. } catch (IOException e) {
    109. log.error("jackson from error, type: {}", type, e);
    110. return null;
    111. }
    112. }
    113. public static <V> V from(File file, TypeReference<V> type) {
    114. try {
    115. return mapper.readValue(file, type);
    116. } catch (IOException e) {
    117. log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);
    118. return null;
    119. }
    120. }
    121. public static <V> V from(Object jsonObj, TypeReference<V> type) {
    122. try {
    123. return mapper.readValue(jsonObj.toString(), type);
    124. } catch (IOException e) {
    125. log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);
    126. return null;
    127. }
    128. }
    129. public static <V> V from(String json, TypeReference<V> type) {
    130. try {
    131. return mapper.readValue(json, type);
    132. } catch (IOException e) {
    133. log.error("jackson from error, json: {}, type: {}", json, type, e);
    134. return null;
    135. }
    136. }
    137. public static <V> String to(List<V> list) {
    138. try {
    139. return mapper.writeValueAsString(list);
    140. } catch (JsonProcessingException e) {
    141. log.error("jackson to error, obj: {}", list, e);
    142. return null;
    143. }
    144. }
    145. public static <V> String to(V v) {
    146. try {
    147. return mapper.writeValueAsString(v);
    148. } catch (JsonProcessingException e) {
    149. log.error("jackson to error, obj: {}", v, e);
    150. return null;
    151. }
    152. }
    153. public static <V> void toFile(String path, List<V> list) {
    154. try (Writer writer = new FileWriter(new File(path), true)) {
    155. mapper.writer().writeValues(writer).writeAll(list);
    156. writer.flush();
    157. } catch (Exception e) {
    158. log.error("jackson to file error, path: {}, list: {}", path, list, e);
    159. }
    160. }
    161. public static <V> void toFile(String path, V v) {
    162. try (Writer writer = new FileWriter(new File(path), true)) {
    163. mapper.writer().writeValues(writer).write(v);
    164. writer.flush();
    165. } catch (Exception e) {
    166. log.error("jackson to file error, path: {}, obj: {}", path, v, e);
    167. }
    168. }
    169. public static String getString(String json, String key) {
    170. if (StringUtils.isEmpty(json)) {
    171. return null;
    172. }
    173. try {
    174. JsonNode node = mapper.readTree(json);
    175. if (null != node) {
    176. return node.get(key).asText();
    177. } else {
    178. return null;
    179. }
    180. } catch (IOException e) {
    181. log.error("jackson get string error, json: {}, key: {}", json, key, e);
    182. return null;
    183. }
    184. }
    185. public static Integer getInt(String json, String key) {
    186. if (StringUtils.isEmpty(json)) {
    187. return null;
    188. }
    189. try {
    190. JsonNode node = mapper.readTree(json);
    191. if (null != node) {
    192. return node.get(key).intValue();
    193. } else {
    194. return null;
    195. }
    196. } catch (IOException e) {
    197. log.error("jackson get int error, json: {}, key: {}", json, key, e);
    198. return null;
    199. }
    200. }
    201. public static Long getLong(String json, String key) {
    202. if (StringUtils.isEmpty(json)) {
    203. return null;
    204. }
    205. try {
    206. JsonNode node = mapper.readTree(json);
    207. if (null != node) {
    208. return node.get(key).longValue();
    209. } else {
    210. return null;
    211. }
    212. } catch (IOException e) {
    213. log.error("jackson get long error, json: {}, key: {}", json, key, e);
    214. return null;
    215. }
    216. }
    217. public static Double getDouble(String json, String key) {
    218. if (StringUtils.isEmpty(json)) {
    219. return null;
    220. }
    221. try {
    222. JsonNode node = mapper.readTree(json);
    223. if (null != node) {
    224. return node.get(key).doubleValue();
    225. } else {
    226. return null;
    227. }
    228. } catch (IOException e) {
    229. log.error("jackson get double error, json: {}, key: {}", json, key, e);
    230. return null;
    231. }
    232. }
    233. public static BigInteger getBigInteger(String json, String key) {
    234. if (StringUtils.isEmpty(json)) {
    235. return new BigInteger(String.valueOf(0.00));
    236. }
    237. try {
    238. JsonNode node = mapper.readTree(json);
    239. if (null != node) {
    240. return node.get(key).bigIntegerValue();
    241. } else {
    242. return null;
    243. }
    244. } catch (IOException e) {
    245. log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);
    246. return null;
    247. }
    248. }
    249. public static BigDecimal getBigDecimal(String json, String key) {
    250. if (StringUtils.isEmpty(json)) {
    251. return null;
    252. }
    253. try {
    254. JsonNode node = mapper.readTree(json);
    255. if (null != node) {
    256. return node.get(key).decimalValue();
    257. } else {
    258. return null;
    259. }
    260. } catch (IOException e) {
    261. log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);
    262. return null;
    263. }
    264. }
    265. public static boolean getBoolean(String json, String key) {
    266. if (StringUtils.isEmpty(json)) {
    267. return false;
    268. }
    269. try {
    270. JsonNode node = mapper.readTree(json);
    271. if (null != node) {
    272. return node.get(key).booleanValue();
    273. } else {
    274. return false;
    275. }
    276. } catch (IOException e) {
    277. log.error("jackson get boolean error, json: {}, key: {}", json, key, e);
    278. return false;
    279. }
    280. }
    281. public static byte[] getByte(String json, String key) {
    282. if (StringUtils.isEmpty(json)) {
    283. return null;
    284. }
    285. try {
    286. JsonNode node = mapper.readTree(json);
    287. if (null != node) {
    288. return node.get(key).binaryValue();
    289. } else {
    290. return null;
    291. }
    292. } catch (IOException e) {
    293. log.error("jackson get byte error, json: {}, key: {}", json, key, e);
    294. return null;
    295. }
    296. }
    297. public static <T> ArrayList<T> getList(String json, String key) {
    298. if (StringUtils.isEmpty(json)) {
    299. return null;
    300. }
    301. String string = getString(json, key);
    302. return from(string, new TypeReference<ArrayList<T>>() {});
    303. }
    304. public static <T> String add(String json, String key, T value) {
    305. try {
    306. JsonNode node = mapper.readTree(json);
    307. add(node, key, value);
    308. return node.toString();
    309. } catch (IOException e) {
    310. log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);
    311. return json;
    312. }
    313. }
    314. private static <T> void add(JsonNode jsonNode, String key, T value) {
    315. if (value instanceof String) {
    316. ((ObjectNode) jsonNode).put(key, (String) value);
    317. } else if (value instanceof Short) {
    318. ((ObjectNode) jsonNode).put(key, (Short) value);
    319. } else if (value instanceof Integer) {
    320. ((ObjectNode) jsonNode).put(key, (Integer) value);
    321. } else if (value instanceof Long) {
    322. ((ObjectNode) jsonNode).put(key, (Long) value);
    323. } else if (value instanceof Float) {
    324. ((ObjectNode) jsonNode).put(key, (Float) value);
    325. } else if (value instanceof Double) {
    326. ((ObjectNode) jsonNode).put(key, (Double) value);
    327. } else if (value instanceof BigDecimal) {
    328. ((ObjectNode) jsonNode).put(key, (BigDecimal) value);
    329. } else if (value instanceof BigInteger) {
    330. ((ObjectNode) jsonNode).put(key, (BigInteger) value);
    331. } else if (value instanceof Boolean) {
    332. ((ObjectNode) jsonNode).put(key, (Boolean) value);
    333. } else if (value instanceof byte[]) {
    334. ((ObjectNode) jsonNode).put(key, (byte[]) value);
    335. } else {
    336. ((ObjectNode) jsonNode).put(key, to(value));
    337. }
    338. }
    339. public static String remove(String json, String key) {
    340. try {
    341. JsonNode node = mapper.readTree(json);
    342. ((ObjectNode) node).remove(key);
    343. return node.toString();
    344. } catch (IOException e) {
    345. log.error("jackson remove error, json: {}, key: {}", json, key, e);
    346. return json;
    347. }
    348. }
    349. public static <T> String update(String json, String key, T value) {
    350. try {
    351. JsonNode node = mapper.readTree(json);
    352. ((ObjectNode) node).remove(key);
    353. add(node, key, value);
    354. return node.toString();
    355. } catch (IOException e) {
    356. log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);
    357. return json;
    358. }
    359. }
    360. public static String format(String json) {
    361. try {
    362. JsonNode node = mapper.readTree(json);
    363. return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);
    364. } catch (IOException e) {
    365. log.error("jackson format json error, json: {}", json, e);
    366. return json;
    367. }
    368. }
    369. public static boolean isJson(String json) {
    370. try {
    371. mapper.readTree(json);
    372. return true;
    373. } catch (Exception e) {
    374. log.error("jackson check json error, json: {}", json, e);
    375. return false;
    376. }
    377. }
    378. private static InputStream getResourceStream(String name) {
    379. return JsonUtil.class.getClassLoader().getResourceAsStream(name);
    380. }
    381. private static InputStreamReader getResourceReader(InputStream inputStream) {
    382. if (null == inputStream) {
    383. return null;
    384. }
    385. return new InputStreamReader(inputStream, StandardCharsets.UTF_8);
    386. }
    387. }
  4. OrderServiceImpl发送事务消息

    1. @Override
    2. public void createOrder(Order order) {
    3. //准备消息数据
    4. String xid = UUID.randomUUID().toString().replace("-", "");
    5. AccountMessage message = new AccountMessage(order.getUserId(), order.getMoney(), xid);
    6. String json = JsonUtil.to(message);
    7. //将json字符串封装至spring的同用message对象
    8. assert json != null;
    9. Message<String> msg = MessageBuilder.withPayload(json).build();
    10. /*参数 1. 消息主题 2. 消息 3. 触发监听器执行业务时需要的业务数据参数*/
    11. rocketMQTemplate.sendMessageInTransaction("orderTopic", msg, order);
    12. }
  5. 实现事务监听器

    1. /**
    2. * 执行本地事务
    3. *
    4. * @param message 消息
    5. * @param o 参数
    6. * @return 状态
    7. */
    8. @Transactional
    9. @Override
    10. public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    11. //用于返回状态
    12. RocketMQLocalTransactionState state;
    13. //用于在数据库中保存状态
    14. Integer status;
    15. try {
    16. doCreateOrder((Order) o);
    17. state = RocketMQLocalTransactionState.COMMIT;
    18. status = 0;
    19. } catch (Exception e) {
    20. log.error("创建订单失败", e);
    21. state = RocketMQLocalTransactionState.ROLLBACK;
    22. status = 1;
    23. }
    24. String json = new String((byte[]) message.getPayload());
    25. String xid = JsonUtil.getString(json, "xid");
    26. txMapper.insert(new TxInfo(xid, status, System.currentTimeMillis()));
    27. return state;
    28. }
    29. /**
    30. * 处理事务回查
    31. *
    32. * @param message 消息
    33. * @return 状态
    34. */
    35. @Override
    36. public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    37. String json = new String((byte[]) message.getPayload());
    38. String xid = JsonUtil.getString(json, "xid");
    39. TxInfo txInfo = txMapper.selectById(xid);
    40. if (txInfo == null) {
    41. return RocketMQLocalTransactionState.UNKNOWN;
    42. }
    43. switch (txInfo.getStatus()) {
    44. case 0:
    45. return RocketMQLocalTransactionState.COMMIT;
    46. case 1:
    47. return RocketMQLocalTransactionState.ROLLBACK;
    48. default:
    49. return RocketMQLocalTransactionState.UNKNOWN;
    50. }
    51. }

    账户接受事务消息,扣减账户

  6. 添加依赖

  7. 修改yml配置,添加rocketMQ的name server连接地址
  8. 创建消息对象AccountMessage
  9. JsonUtil
  10. 新建消费者类,AccountConsumer,实现消费者接口
  11. 通过注解配置接收消息
  12. 扣减账户

    1. @Component
    2. @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "account-consumer")
    3. public class AccountConsumer implements RocketMQListener<String> {
    4. private final AccountService accountService;
    5. @Autowired
    6. public AccountConsumer(AccountService accountService) {
    7. this.accountService = accountService;
    8. }
    9. @Override
    10. public void onMessage(String json) {
    11. AccountMessage accountMessage = JsonUtil.from(json, AccountMessage.class);
    12. assert accountMessage != null;
    13. accountService.decrease(accountMessage.getUserId(), accountMessage.getMoney());
    14. }
    15. }