XA 协议

1.在 mysql 实例中开启一个 XA 事务,指定一个全局唯一标识

  1. XA START 'any_unique_id';

2.执行业务操作


3.XA 事务的操作结束

  1. XA END 'any_unique_id';

4.告知 mysql 准备提交这个 xa 事务

  1. XA PREPARE 'any_unique_id';

5.告知 mysql 提交这个 xa 事务

  1. XA COMMIT 'any_unique_id';

6.告知 mysql 回滚这个 xa 事务

  1. XA ROLLBACK 'any_unique_id';

7.查看本机 mysql 目前有哪些 xa 事务处于 prepare 状态

  1. XA RECOVER;

XA 协议的特点
1、同一个会话(同一个用户)跟普通是互斥的
2、有隔离性
3、xa prepare xid 有返回值 0

XA执行流程

1.开启XA事务
xa start xid1

2.连接1执行操作
Connection1.execulte(xa start xid1 )
Connection1.insert()
Connection1.end;

3.连接2执行操作
Connection2 。。。

3.拿到两个连接的返回结果
Result1 = Connection1.prepare();
Result2 = Conneciont2.parepare();

4.根据结果判断是提交还是回滚
If( ALL OK) {
connection1.commit;
connection2.commit;
} else {
Connection1.rollback;
Connection2.rollback;
}

image.png
image.png

  1. import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
  2. import com.mysql.jdbc.jdbc2.optional.MysqlXid;
  3. import javax.sql.XAConnection;
  4. import javax.transaction.xa.XAResource;
  5. import javax.transaction.xa.Xid;
  6. import java.sql.Connection;
  7. import java.sql.Statement;
  8. public class XADemo {
  9. public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) {
  10. try {
  11. MysqlXADataSource ds = new MysqlXADataSource();
  12. ds.setUrl(connStr);
  13. ds.setUser(user);
  14. ds.setPassword(pwd);
  15. return ds;
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. return null;
  20. }
  21. public static void main(String[] arg) {
  22. String connStr1 = "jdbc:mysql://118.89.107.162:3306/wjq";
  23. String connStr2 = "jdbc:mysql://118.89.107.162:3307/wjq";
  24. try {
  25. //从不同数据库获取数据库数据源
  26. MysqlXADataSource ds1 = getDataSource(connStr1, "root", "XXXXXXXX");
  27. MysqlXADataSource ds2 = getDataSource(connStr2, "root", "XXXXXXXX");
  28. //数据库1获取连接
  29. XAConnection xaConnection1 = ds1.getXAConnection();
  30. XAResource xaResource1 = xaConnection1.getXAResource();
  31. Connection connection1 = xaConnection1.getConnection();
  32. Statement statement1 = connection1.createStatement();
  33. //数据库2获取连接
  34. XAConnection xaConnection2 = ds2.getXAConnection();
  35. XAResource xaResource2 = xaConnection2.getXAResource();
  36. Connection connection2 = xaConnection2.getConnection();
  37. Statement statement2 = connection2.createStatement();
  38. //创建事务分支的xid
  39. Xid xid1 = new MysqlXid(new byte[]{0x01}, new byte[]{0x02}, 100);
  40. Xid xid2 = new MysqlXid(new byte[]{0x011}, new byte[]{0x012}, 100);
  41. try {
  42. //事务分支1关联分支事务sql语句
  43. xaResource1.start(xid1, XAResource.TMNOFLAGS);
  44. int update1Result = statement1.executeUpdate("UPDATE accounts SET BALANCE = CAST('9700.00' AS DECIMAL) WHERE CUSTOMER_NO = '001'");
  45. xaResource1.end(xid1, XAResource.TMSUCCESS);
  46. //事务分支2关联分支事务sql语句
  47. xaResource2.start(xid2, XAResource.TMNOFLAGS);
  48. int update2Result = statement2.executeUpdate("INSERT INTO user_purchase_his(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
  49. + " VALUES ('001', '20190303204700000001', 200, 'CNY', '购物消费')");
  50. xaResource2.end(xid2, XAResource.TMSUCCESS);
  51. // 两阶段提交协议第一阶段
  52. int ret1 = xaResource1.prepare(xid1);
  53. int ret2 = xaResource2.prepare(xid2);
  54. // 两阶段提交协议第二阶段
  55. if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
  56. //引擎级别提交 mysql 5.6 之前没有补偿提交 5.7 repare会写 binlog日志,可以通过程序补偿提交
  57. xaResource1.commit(xid1, false);
  58. xaResource2.commit(xid2, false);
  59. System.out.println("reslut1:" + update1Result + ", result2:" + update2Result);
  60. } else {
  61. xaResource1.rollback(xid1);
  62. xaResource2.rollback(xid2);
  63. }
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }