TransactionOperator 遵循与其他反应式操作符相似的操作符设计。它使用回调方法(使应用程序代码不必做模板式的获取和释放交易资源),导致代码是意图驱动的,即你的代码只关注你想做的事情。

:::info 正如下面的例子所示,使用 TransactionOperator 绝对能让你与 Spring 的事务基础架构和 API 结合起来。编程式事务管理是否适合你的开发需求,你必须自己做出决定。 :::

必须在事务性上下文中运行并明确使用 TransactionOperator 的应用程序代码类似于下一个例子:

  1. public class SimpleService implements Service {
  2. // single TransactionOperator shared amongst all methods in this instance
  3. private final TransactionalOperator transactionalOperator;
  4. // use constructor-injection to supply the ReactiveTransactionManager
  5. public SimpleService(ReactiveTransactionManager transactionManager) {
  6. this.transactionOperator = TransactionalOperator.create(transactionManager);
  7. }
  8. public Mono<Object> someServiceMethod() {
  9. // 该方法中的代码在事务性环境中运行
  10. Mono<Object> update = updateOperation1();
  11. return update.then(resultOfUpdateOperation2).as(transactionalOperator::transactional);
  12. }
  13. }

TransactionalOperator 可以以两种方式使用:

  • 使用项目反应器类型的操作员式(mono.as(transactionalOperator::transactional)

  • 回调式用于其他各种情况(transactionalOperator.execute(TransactionCallback<T>))

回调中的代码可以通过调用所提供的 ReactiveTransaction 对象的 setRollbackOnly()方法来回滚事务,如下所示:

  1. transactionalOperator.execute(new TransactionCallback<>() {
  2. public Mono<Object> doInTransaction(ReactiveTransaction status) {
  3. return updateOperation1().then(updateOperation2)
  4. .doOnError(SomeBusinessException.class, e -> status.setRollbackOnly());
  5. }
  6. }
  7. });

取消信号 / Cancel Signals

在 Reactive Streams 中,订阅者可以取消其订阅并停止其发布者。Project Reactor 以及其他库中的操作符,如 next()take(long)timeout(Duration)等,都可以发出取消指令。我们无法知道取消的原因,是由于错误还是仅仅是对进一步消费缺乏兴趣。从 5.3 版本开始,取消信号会导致回滚。因此,考虑事务发布者下游使用的操作者是很重要的。特别是在 Flux 或其他多值 Publisher 的情况下,必须消耗全部输出,以使事务完成。

指定事务设置

你可以为 TransactionalOperator 指定事务设置(如传播模式、隔离级别、超时等等)。默认情况下,TransactionalOperator 实例有 默认的事务设置。下面的例子显示了对一个特定的 TransactionalOperator 的事务性设置的定制:

  1. public class SimpleService implements Service {
  2. private final TransactionalOperator transactionalOperator;
  3. public SimpleService(ReactiveTransactionManager transactionManager) {
  4. DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
  5. // the transaction settings can be set here explicitly if so desired
  6. definition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_UNCOMMITTED);
  7. definition.setTimeout(30); // 30 seconds
  8. // and so forth...
  9. this.transactionalOperator = TransactionalOperator.create(transactionManager, definition);
  10. }
  11. }