RocketMQ提供了事务消息—->高可靠消息
首先需要搭建RocketMQ服务器:文章地址
导入无事务项目
- 将项目解压至rocketmq-dtx工程目录
- 导入
- 添加模块
- 导入pom文件导入模块
订单服务使用事务消息异步调用账户服务
添加事务消息
步骤如下:
父项目添加spring rocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version></dependency>
yml配置rocketmq连接和生产者组名
rocketmq:name-server: 192.168.64.141.:9876producer:group: order_producer
添加新的数据表tx_table记录事务执行状态
修改 db-init 项目中的 order.sql 文件,创建 tx_table 表
order_rocketmq.sql
添加实体类TXInfo与Mapper
@Data@NoArgsConstructor@AllArgsConstructor@Accessors(chain = true)public class TxInfo {/*** 事务id*/private String xid;/*** 事务执行状态,0成功,1失败,2未知*/private Integer status;/*** 存储时间*/private Long created;}
mapper:
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="cn.tedu.order.mapper.TxMapper"><resultMap id="BaseResultMap" type="cn.tedu.order.entity.TxInfo" ><id column="xid" property="xid" jdbcType="CHAR" /><result column="created_at" property="created" jdbcType="BIGINT" /><result column="status" property="status" jdbcType="INTEGER"/></resultMap><insert id="insert">INSERT INTO `tx_table`(`xid`,`created_at`,`status`) VALUES(#{xid},#{created},#{status});</insert><select id="selectById" resultMap="BaseResultMap">SELECT `xid`,`created_at`,`status` FROM tx_table WHERE xid=#{xid};</select></mapper>
创建AccountMessage,封装发给用户的调用消息:
UserId,money,txId@Data@NoArgsConstructor@AllArgsConstructor@Accessors(chain = true)public class AccountMessage {private Long userId;private BigDecimal money;private String xid;}
将封装的数据转换为Json字符串使用JsonUtil
@Slf4jpublic class JsonUtil {private static ObjectMapper mapper;private static JsonInclude.Include DEFAULT_PROPERTY_INCLUSION = JsonInclude.Include.NON_DEFAULT;private static boolean IS_ENABLE_INDENT_OUTPUT = false;private static String CSV_DEFAULT_COLUMN_SEPARATOR = ",";static {try {initMapper();configPropertyInclusion();configIndentOutput();configCommon();} catch (Exception e) {log.error("jackson config error", e);}}private static void initMapper() {mapper = new ObjectMapper();}private static void configCommon() {config(mapper);}private static void configPropertyInclusion() {mapper.setSerializationInclusion(DEFAULT_PROPERTY_INCLUSION);}private static void configIndentOutput() {mapper.configure(SerializationFeature.INDENT_OUTPUT, IS_ENABLE_INDENT_OUTPUT);}private static void config(ObjectMapper objectMapper) {objectMapper.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);objectMapper.enable(DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);objectMapper.enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);objectMapper.disable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);objectMapper.disable(JsonGenerator.Feature.ESCAPE_NON_ASCII);objectMapper.enable(JsonGenerator.Feature.IGNORE_UNKNOWN);objectMapper.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES);objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));objectMapper.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);objectMapper.registerModule(new ParameterNamesModule());objectMapper.registerModule(new Jdk8Module());objectMapper.registerModule(new JavaTimeModule());}public static void setSerializationInclusion(JsonInclude.Include inclusion) {DEFAULT_PROPERTY_INCLUSION = inclusion;configPropertyInclusion();}public static void setIndentOutput(boolean isEnable) {IS_ENABLE_INDENT_OUTPUT = isEnable;configIndentOutput();}public static <V> V from(URL url, Class<V> c) {try {return mapper.readValue(url, c);} catch (IOException e) {log.error("jackson from error, url: {}, type: {}", url.getPath(), c, e);return null;}}public static <V> V from(InputStream inputStream, Class<V> c) {try {return mapper.readValue(inputStream, c);} catch (IOException e) {log.error("jackson from error, type: {}", c, e);return null;}}public static <V> V from(File file, Class<V> c) {try {return mapper.readValue(file, c);} catch (IOException e) {log.error("jackson from error, file path: {}, type: {}", file.getPath(), c, e);return null;}}public static <V> V from(Object jsonObj, Class<V> c) {try {return mapper.readValue(jsonObj.toString(), c);} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), c, e);return null;}}public static <V> V from(String json, Class<V> c) {try {return mapper.readValue(json, c);} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", json, c, e);return null;}}public static <V> V from(URL url, TypeReference<V> type) {try {return mapper.readValue(url, type);} catch (IOException e) {log.error("jackson from error, url: {}, type: {}", url.getPath(), type, e);return null;}}public static <V> V from(InputStream inputStream, TypeReference<V> type) {try {return mapper.readValue(inputStream, type);} catch (IOException e) {log.error("jackson from error, type: {}", type, e);return null;}}public static <V> V from(File file, TypeReference<V> type) {try {return mapper.readValue(file, type);} catch (IOException e) {log.error("jackson from error, file path: {}, type: {}", file.getPath(), type, e);return null;}}public static <V> V from(Object jsonObj, TypeReference<V> type) {try {return mapper.readValue(jsonObj.toString(), type);} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", jsonObj.toString(), type, e);return null;}}public static <V> V from(String json, TypeReference<V> type) {try {return mapper.readValue(json, type);} catch (IOException e) {log.error("jackson from error, json: {}, type: {}", json, type, e);return null;}}public static <V> String to(List<V> list) {try {return mapper.writeValueAsString(list);} catch (JsonProcessingException e) {log.error("jackson to error, obj: {}", list, e);return null;}}public static <V> String to(V v) {try {return mapper.writeValueAsString(v);} catch (JsonProcessingException e) {log.error("jackson to error, obj: {}", v, e);return null;}}public static <V> void toFile(String path, List<V> list) {try (Writer writer = new FileWriter(new File(path), true)) {mapper.writer().writeValues(writer).writeAll(list);writer.flush();} catch (Exception e) {log.error("jackson to file error, path: {}, list: {}", path, list, e);}}public static <V> void toFile(String path, V v) {try (Writer writer = new FileWriter(new File(path), true)) {mapper.writer().writeValues(writer).write(v);writer.flush();} catch (Exception e) {log.error("jackson to file error, path: {}, obj: {}", path, v, e);}}public static String getString(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).asText();} else {return null;}} catch (IOException e) {log.error("jackson get string error, json: {}, key: {}", json, key, e);return null;}}public static Integer getInt(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).intValue();} else {return null;}} catch (IOException e) {log.error("jackson get int error, json: {}, key: {}", json, key, e);return null;}}public static Long getLong(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).longValue();} else {return null;}} catch (IOException e) {log.error("jackson get long error, json: {}, key: {}", json, key, e);return null;}}public static Double getDouble(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).doubleValue();} else {return null;}} catch (IOException e) {log.error("jackson get double error, json: {}, key: {}", json, key, e);return null;}}public static BigInteger getBigInteger(String json, String key) {if (StringUtils.isEmpty(json)) {return new BigInteger(String.valueOf(0.00));}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).bigIntegerValue();} else {return null;}} catch (IOException e) {log.error("jackson get biginteger error, json: {}, key: {}", json, key, e);return null;}}public static BigDecimal getBigDecimal(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).decimalValue();} else {return null;}} catch (IOException e) {log.error("jackson get bigdecimal error, json: {}, key: {}", json, key, e);return null;}}public static boolean getBoolean(String json, String key) {if (StringUtils.isEmpty(json)) {return false;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).booleanValue();} else {return false;}} catch (IOException e) {log.error("jackson get boolean error, json: {}, key: {}", json, key, e);return false;}}public static byte[] getByte(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}try {JsonNode node = mapper.readTree(json);if (null != node) {return node.get(key).binaryValue();} else {return null;}} catch (IOException e) {log.error("jackson get byte error, json: {}, key: {}", json, key, e);return null;}}public static <T> ArrayList<T> getList(String json, String key) {if (StringUtils.isEmpty(json)) {return null;}String string = getString(json, key);return from(string, new TypeReference<ArrayList<T>>() {});}public static <T> String add(String json, String key, T value) {try {JsonNode node = mapper.readTree(json);add(node, key, value);return node.toString();} catch (IOException e) {log.error("jackson add error, json: {}, key: {}, value: {}", json, key, value, e);return json;}}private static <T> void add(JsonNode jsonNode, String key, T value) {if (value instanceof String) {((ObjectNode) jsonNode).put(key, (String) value);} else if (value instanceof Short) {((ObjectNode) jsonNode).put(key, (Short) value);} else if (value instanceof Integer) {((ObjectNode) jsonNode).put(key, (Integer) value);} else if (value instanceof Long) {((ObjectNode) jsonNode).put(key, (Long) value);} else if (value instanceof Float) {((ObjectNode) jsonNode).put(key, (Float) value);} else if (value instanceof Double) {((ObjectNode) jsonNode).put(key, (Double) value);} else if (value instanceof BigDecimal) {((ObjectNode) jsonNode).put(key, (BigDecimal) value);} else if (value instanceof BigInteger) {((ObjectNode) jsonNode).put(key, (BigInteger) value);} else if (value instanceof Boolean) {((ObjectNode) jsonNode).put(key, (Boolean) value);} else if (value instanceof byte[]) {((ObjectNode) jsonNode).put(key, (byte[]) value);} else {((ObjectNode) jsonNode).put(key, to(value));}}public static String remove(String json, String key) {try {JsonNode node = mapper.readTree(json);((ObjectNode) node).remove(key);return node.toString();} catch (IOException e) {log.error("jackson remove error, json: {}, key: {}", json, key, e);return json;}}public static <T> String update(String json, String key, T value) {try {JsonNode node = mapper.readTree(json);((ObjectNode) node).remove(key);add(node, key, value);return node.toString();} catch (IOException e) {log.error("jackson update error, json: {}, key: {}, value: {}", json, key, value, e);return json;}}public static String format(String json) {try {JsonNode node = mapper.readTree(json);return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node);} catch (IOException e) {log.error("jackson format json error, json: {}", json, e);return json;}}public static boolean isJson(String json) {try {mapper.readTree(json);return true;} catch (Exception e) {log.error("jackson check json error, json: {}", json, e);return false;}}private static InputStream getResourceStream(String name) {return JsonUtil.class.getClassLoader().getResourceAsStream(name);}private static InputStreamReader getResourceReader(InputStream inputStream) {if (null == inputStream) {return null;}return new InputStreamReader(inputStream, StandardCharsets.UTF_8);}}
OrderServiceImpl发送事务消息
@Overridepublic void createOrder(Order order) {//准备消息数据String xid = UUID.randomUUID().toString().replace("-", "");AccountMessage message = new AccountMessage(order.getUserId(), order.getMoney(), xid);String json = JsonUtil.to(message);//将json字符串封装至spring的同用message对象assert json != null;Message<String> msg = MessageBuilder.withPayload(json).build();/*参数 1. 消息主题 2. 消息 3. 触发监听器执行业务时需要的业务数据参数*/rocketMQTemplate.sendMessageInTransaction("orderTopic", msg, order);}
实现事务监听器
/*** 执行本地事务** @param message 消息* @param o 参数* @return 状态*/@Transactional@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {//用于返回状态RocketMQLocalTransactionState state;//用于在数据库中保存状态Integer status;try {doCreateOrder((Order) o);state = RocketMQLocalTransactionState.COMMIT;status = 0;} catch (Exception e) {log.error("创建订单失败", e);state = RocketMQLocalTransactionState.ROLLBACK;status = 1;}String json = new String((byte[]) message.getPayload());String xid = JsonUtil.getString(json, "xid");txMapper.insert(new TxInfo(xid, status, System.currentTimeMillis()));return state;}/*** 处理事务回查** @param message 消息* @return 状态*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String json = new String((byte[]) message.getPayload());String xid = JsonUtil.getString(json, "xid");TxInfo txInfo = txMapper.selectById(xid);if (txInfo == null) {return RocketMQLocalTransactionState.UNKNOWN;}switch (txInfo.getStatus()) {case 0:return RocketMQLocalTransactionState.COMMIT;case 1:return RocketMQLocalTransactionState.ROLLBACK;default:return RocketMQLocalTransactionState.UNKNOWN;}}
账户接受事务消息,扣减账户
添加依赖
- 修改yml配置,添加rocketMQ的name server连接地址
- 创建消息对象AccountMessage
- JsonUtil
- 新建消费者类,AccountConsumer,实现消费者接口
- 通过注解配置接收消息
扣减账户
@Component@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "account-consumer")public class AccountConsumer implements RocketMQListener<String> {private final AccountService accountService;@Autowiredpublic AccountConsumer(AccountService accountService) {this.accountService = accountService;}@Overridepublic void onMessage(String json) {AccountMessage accountMessage = JsonUtil.from(json, AccountMessage.class);assert accountMessage != null;accountService.decrease(accountMessage.getUserId(), accountMessage.getMoney());}}
