XA 协议
1.在 mysql 实例中开启一个 XA 事务,指定一个全局唯一标识
XA START 'any_unique_id';
2.执行业务操作
3.XA 事务的操作结束
XA END 'any_unique_id';
4.告知 mysql 准备提交这个 xa 事务
XA PREPARE 'any_unique_id';
5.告知 mysql 提交这个 xa 事务
XA COMMIT 'any_unique_id';
6.告知 mysql 回滚这个 xa 事务
XA ROLLBACK 'any_unique_id';
7.查看本机 mysql 目前有哪些 xa 事务处于 prepare 状态
XA RECOVER;
XA 协议的特点
1、同一个会话(同一个用户)跟普通是互斥的
2、有隔离性
3、xa prepare xid 有返回值 0
XA执行流程
1.开启XA事务
xa start xid1
2.连接1执行操作
Connection1.execulte(xa start xid1 )
Connection1.insert()
Connection1.end;
3.连接2执行操作
Connection2 。。。
3.拿到两个连接的返回结果
Result1 = Connection1.prepare();
Result2 = Conneciont2.parepare();
4.根据结果判断是提交还是回滚
If( ALL OK) {
connection1.commit;
connection2.commit;
} else {
Connection1.rollback;
Connection2.rollback;
}
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import com.mysql.jdbc.jdbc2.optional.MysqlXid;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.Statement;
public class XADemo {
public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) {
try {
MysqlXADataSource ds = new MysqlXADataSource();
ds.setUrl(connStr);
ds.setUser(user);
ds.setPassword(pwd);
return ds;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] arg) {
String connStr1 = "jdbc:mysql://118.89.107.162:3306/wjq";
String connStr2 = "jdbc:mysql://118.89.107.162:3307/wjq";
try {
//从不同数据库获取数据库数据源
MysqlXADataSource ds1 = getDataSource(connStr1, "root", "XXXXXXXX");
MysqlXADataSource ds2 = getDataSource(connStr2, "root", "XXXXXXXX");
//数据库1获取连接
XAConnection xaConnection1 = ds1.getXAConnection();
XAResource xaResource1 = xaConnection1.getXAResource();
Connection connection1 = xaConnection1.getConnection();
Statement statement1 = connection1.createStatement();
//数据库2获取连接
XAConnection xaConnection2 = ds2.getXAConnection();
XAResource xaResource2 = xaConnection2.getXAResource();
Connection connection2 = xaConnection2.getConnection();
Statement statement2 = connection2.createStatement();
//创建事务分支的xid
Xid xid1 = new MysqlXid(new byte[]{0x01}, new byte[]{0x02}, 100);
Xid xid2 = new MysqlXid(new byte[]{0x011}, new byte[]{0x012}, 100);
try {
//事务分支1关联分支事务sql语句
xaResource1.start(xid1, XAResource.TMNOFLAGS);
int update1Result = statement1.executeUpdate("UPDATE accounts SET BALANCE = CAST('9700.00' AS DECIMAL) WHERE CUSTOMER_NO = '001'");
xaResource1.end(xid1, XAResource.TMSUCCESS);
//事务分支2关联分支事务sql语句
xaResource2.start(xid2, XAResource.TMNOFLAGS);
int update2Result = statement2.executeUpdate("INSERT INTO user_purchase_his(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) "
+ " VALUES ('001', '20190303204700000001', 200, 'CNY', '购物消费')");
xaResource2.end(xid2, XAResource.TMSUCCESS);
// 两阶段提交协议第一阶段
int ret1 = xaResource1.prepare(xid1);
int ret2 = xaResource2.prepare(xid2);
// 两阶段提交协议第二阶段
if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
//引擎级别提交 mysql 5.6 之前没有补偿提交 5.7 repare会写 binlog日志,可以通过程序补偿提交
xaResource1.commit(xid1, false);
xaResource2.commit(xid2, false);
System.out.println("reslut1:" + update1Result + ", result2:" + update2Result);
} else {
xaResource1.rollback(xid1);
xaResource2.rollback(xid2);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}