TransactionOperator 遵循与其他反应式操作符相似的操作符设计。它使用回调方法(使应用程序代码不必做模板式的获取和释放交易资源),导致代码是意图驱动的,即你的代码只关注你想做的事情。
:::info 正如下面的例子所示,使用 TransactionOperator 绝对能让你与 Spring 的事务基础架构和 API 结合起来。编程式事务管理是否适合你的开发需求,你必须自己做出决定。 :::
必须在事务性上下文中运行并明确使用 TransactionOperator 的应用程序代码类似于下一个例子:
public class SimpleService implements Service {
// single TransactionOperator shared amongst all methods in this instance
private final TransactionalOperator transactionalOperator;
// use constructor-injection to supply the ReactiveTransactionManager
public SimpleService(ReactiveTransactionManager transactionManager) {
this.transactionOperator = TransactionalOperator.create(transactionManager);
}
public Mono<Object> someServiceMethod() {
// 该方法中的代码在事务性环境中运行
Mono<Object> update = updateOperation1();
return update.then(resultOfUpdateOperation2).as(transactionalOperator::transactional);
}
}
TransactionalOperator 可以以两种方式使用:
使用项目反应器类型的操作员式(
mono.as(transactionalOperator::transactional
)回调式用于其他各种情况(
transactionalOperator.execute(TransactionCallback<T>)
)
回调中的代码可以通过调用所提供的 ReactiveTransaction 对象的 setRollbackOnly()
方法来回滚事务,如下所示:
transactionalOperator.execute(new TransactionCallback<>() {
public Mono<Object> doInTransaction(ReactiveTransaction status) {
return updateOperation1().then(updateOperation2)
.doOnError(SomeBusinessException.class, e -> status.setRollbackOnly());
}
}
});
取消信号 / Cancel Signals
在 Reactive Streams 中,订阅者可以取消其订阅并停止其发布者。Project Reactor 以及其他库中的操作符,如 next()
、take(long)
、timeout(Duration)
等,都可以发出取消指令。我们无法知道取消的原因,是由于错误还是仅仅是对进一步消费缺乏兴趣。从 5.3 版本开始,取消信号会导致回滚。因此,考虑事务发布者下游使用的操作者是很重要的。特别是在 Flux 或其他多值 Publisher 的情况下,必须消耗全部输出,以使事务完成。
指定事务设置
你可以为 TransactionalOperator 指定事务设置(如传播模式、隔离级别、超时等等)。默认情况下,TransactionalOperator 实例有 默认的事务设置。下面的例子显示了对一个特定的 TransactionalOperator 的事务性设置的定制:
public class SimpleService implements Service {
private final TransactionalOperator transactionalOperator;
public SimpleService(ReactiveTransactionManager transactionManager) {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
// the transaction settings can be set here explicitly if so desired
definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED);
definition.setTimeout(30); // 30 seconds
// and so forth...
this.transactionalOperator = TransactionalOperator.create(transactionManager, definition);
}
}