概况

spring对多线程无法进行事务控制,是因为多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),线程之间事务隔离,每个线程有自己的连接,事务肯定不是同一个。

思路解析

  • spring自动提交事务修改为手动提交事务
  • 子线程执行完了再去通知主线程检查执行结果
  • 主线程去控制子线程是否提交或者回滚

    解决办法

    原理:使用两个CountDownLatch实现子线程的二阶段提交
    注:对CountDownLatch不熟悉的可以看:CountDownLatch解析
    步骤:
  1. 主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务,并使用BlockingDeque存储线程的返回结果。
  2. 使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。
  3. 主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。

    代码实现

    思路说到这里,代码实现已经很简单了,代码如下,暂未考虑超时以及撑爆线程池的问题,后续有空再补充。

    ThreadPoolExecutor.java

    线程池工具类主要在之前的线程池工具类的基础上,冲在了一个可以支持多线程情况下事务的execute方法。主要初始化了两个重要的门闩以及结果集回滚等,这里由主线程控制,唤醒(暂且叫唤醒吧,和唤醒效果差不多,实际上是计数器来实现的)子线程或者被子线程唤醒。 ```java import cn.hutool.core.thread.ThreadFactoryBuilder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import java.lang.reflect.Constructor; import java.util.Queue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit;

/**

  • 线程池工具类 *
  • @author anin */ @Slf4j public class ThreadPoolExecutor {

    private static java.util.concurrent.ThreadPoolExecutor executor;

    private ThreadPoolExecutor() {

    1. final int cpuNum = Runtime.getRuntime().availableProcessors();
    2. // 线程池中所保存的核心线程数
    3. final int corePoolSize = cpuNum + 1;
    4. // 池中允许的最大线程数
    5. final int maxPoolSize = corePoolSize * 2;
    6. // 线程池中的空闲线程所能持续的最长时间
    7. final long keepAliveTime = 0L;
    8. // 时间单位
    9. final TimeUnit unit = TimeUnit.MILLISECONDS;
    10. // 线程队列
    11. final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(corePoolSize * 64);
    12. // 线程工厂
    13. final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix("test-").build();
    14. executor = new java.util.concurrent.ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, namedThreadFactory, new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
    15. log.info(" >>>>>>>>>>>>>>> 线程池开始初始化 <<<<<<<<<<<<<<< ");
    16. log.info(" >>>>>>>>>>>>>>> 核心线程数: {} <<<<<<<<<<<<<<<", corePoolSize);
    17. log.info(" >>>>>>>>>>>>>>> 最大线程数: {} <<<<<<<<<<<<<<<", maxPoolSize);
    18. log.info(" >>>>>>>>>>>>>>> 线程队列长度: {} <<<<<<<<<<<<<<<", (corePoolSize * 8));
    19. log.info(" >>>>>>>>>>>>>>> 线程池结束初始化 <<<<<<<<<<<<<<< ");
    20. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    21. log.info("线程池释放成功");
    22. executor.shutdownNow();
    23. }));

    }

    private static class ThreadPoolExecutorHolder {

    1. private static final ThreadPoolExecutor<?> THREAD_POOL_EXECUTOR = new ThreadPoolExecutor<>();

    }

    public static ThreadPoolExecutor<?> getThreadPoolExecutor() {

    1. return ThreadPoolExecutor.ThreadPoolExecutorHolder.THREAD_POOL_EXECUTOR;

    }

    /**

    • 执行线程 *
    • @param runnable 任务 */ public void execute(Runnable runnable) { executor.execute(runnable); }

      /**

    • 多线程任务 *
    • @param transactionManager 数据库事务管理 */ @SneakyThrows public void execute(DataSourceTransactionManager transactionManager, TaskData taskData) { //监控子线程的任务执行 CountDownLatch childMonitor = new CountDownLatch(taskData.getTaskCount()); //监控主线程,是否需要回滚 CountDownLatch mainMonitor = new CountDownLatch(1); //存储任务的返回结果,返回true表示不需要回滚,反之,则回滚 BlockingDeque results = new LinkedBlockingDeque<>(executor.getCorePoolSize()); RollBack rollback = new RollBack(false);

      Queue> queue = list2Queue(taskData);

      while (queue.peek() != null) {

      1. TaskData.TaskDataInfo<T> taskDataInfo = queue.poll();
      2. Constructor<?> constructor = taskDataInfo.getClazz().getConstructor(
      3. CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, TaskData.TaskDataInfo.class);
      4. AbstractThreadTask<T> task = (AbstractThreadTask<T>) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, taskDataInfo);
      5. // TODO 这里需要处理任务提交数量 防止线程池被撑爆
      6. executor.execute(task);

      }

      // 阻塞主线程,等待所有子线程处理向数据库中插入的业务。 // TODO 需要考虑超时 childMonitor.await(); log.info(“ >>>>>>>>>>>>>>> 主线程开始执行任务”);

      // 根据返回结果来确定是否回滚 for (int i = 0; i < taskData.getTaskCount(); i++) {

      1. boolean result = results.take();
      2. if (!result) {
      3. log.info(" >>>>>>>>>>>>>>> 主线程检测到有任务执行失败,通知子线程进行回滚");
      4. //有线程执行异常,需要回滚子线程
      5. rollback.setNeedRollBack(true);
      6. break;
      7. }

      } // 唤醒子线程,子线程检测回滚标志,判断是否回滚。 mainMonitor.countDown(); }

      private Queue> list2Queue(TaskData taskData) { Queue> queue = new LinkedBlockingQueue<>(taskData.getTaskCount()); queue.addAll(taskData.getData()); return queue; }

      /**

    • 提交任务 *
    • @param runnable 任务
    • @return Future */ public Future submit(Runnable runnable, T result) { return executor.submit(runnable, result); }

      /**

    • 提交任务 *
    • @param callable 任务
    • @return Future */ public Future submit(Callable callable) { return executor.submit(callable); }

}

  1. <a name="Lx5FT"></a>
  2. #### AbstractThreadTask.java
  3. 子线程任务执行类是一个抽象方法,主要使用了模板方法模式,定义了子线程执行任务的流程以及在执行完毕子线程任务之后阻塞自己,并且唤醒主线程去校验自己的结果。
  4. ```java
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  7. import org.springframework.transaction.TransactionDefinition;
  8. import org.springframework.transaction.TransactionStatus;
  9. import org.springframework.transaction.support.DefaultTransactionDefinition;
  10. import java.util.concurrent.BlockingDeque;
  11. import java.util.concurrent.CountDownLatch;
  12. /***
  13. * 子线程任务执行类
  14. *
  15. * @author anin
  16. */
  17. @Slf4j
  18. public abstract class AbstractThreadTask<T> implements Runnable {
  19. /**
  20. * 监控子任务的执行
  21. */
  22. private CountDownLatch childMonitor;
  23. /**
  24. * 监控主线程
  25. */
  26. private CountDownLatch mainMonitor;
  27. /**
  28. * 存储线程的返回结果
  29. */
  30. private BlockingDeque<Boolean> resultList;
  31. /**
  32. * 回滚类
  33. */
  34. private RollBack rollback;
  35. private TaskData.TaskDataInfo<T> taskDataInfo;
  36. protected DataSourceTransactionManager transactionManager;
  37. protected TransactionStatus status;
  38. public AbstractThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, TaskData.TaskDataInfo<T> taskDataInfo) {
  39. this.childMonitor = childCountDown;
  40. this.mainMonitor = mainCountDown;
  41. this.resultList = result;
  42. this.rollback = rollback;
  43. this.transactionManager = transactionManager;
  44. this.taskDataInfo = taskDataInfo;
  45. initParam();
  46. }
  47. /**
  48. * 事务回滚
  49. */
  50. private void rollBack() {
  51. log.info(" >>>>>>>>>>>>>>> 子线程:{} 开始回滚事务", Thread.currentThread().getName());
  52. transactionManager.rollback(status);
  53. }
  54. /**
  55. * 事务提交
  56. */
  57. private void submit() {
  58. log.info(" >>>>>>>>>>>>>>> 子线程:{} 开始提交事务", Thread.currentThread().getName());
  59. transactionManager.commit(status);
  60. }
  61. protected TaskData.TaskDataInfo<T> getDataInfo() {
  62. return taskDataInfo;
  63. }
  64. /**
  65. * 初始化方法:作用是把线程池工具任务执行类所需的外部资源通过 ThreadTask.class的构造方法中 Map<String,Obejct> params参数进行初始化传递进来
  66. */
  67. public abstract void initParam();
  68. /**
  69. * 执行任务,返回false表示任务执行错误,需要回滚
  70. */
  71. public abstract boolean processTask();
  72. @Override
  73. public void run() {
  74. log.info(" >>>>>>>>>>>>>>> 子线程:{} 开始执行任务", Thread.currentThread().getName());
  75. DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  76. def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
  77. status = transactionManager.getTransaction(def);
  78. Boolean result = processTask();
  79. // 向队列中添加处理结果
  80. resultList.add(result);
  81. // 告诉主线程我执行完了
  82. childMonitor.countDown();
  83. try {
  84. // 等待主线程的判断逻辑执行完,执行下面的是否回滚逻辑
  85. mainMonitor.await();
  86. } catch (Exception e) {
  87. log.error(e.getMessage());
  88. }
  89. log.info(" >>>>>>>>>>>>>>> 子线程:{} 执行剩下的任务", Thread.currentThread().getName());
  90. // 判断是否回滚
  91. if (rollback.isNeedRollBack()) {
  92. rollBack();
  93. } else {
  94. submit();
  95. }
  96. }
  97. }

RollBack.java

定义了一个是否需要回滚的标识

  1. import lombok.Data;
  2. /**
  3. * 回滚标记类
  4. * @author anin
  5. */
  6. @Data
  7. public class RollBack {
  8. /**
  9. * 是否需要回滚
  10. */
  11. private boolean needRollBack;
  12. public RollBack(boolean needRollBack) {
  13. this.needRollBack = needRollBack;
  14. }
  15. }

TaskData.java

考虑到实际业务场景下,多线程执行的总是不通的业务逻辑,所以封装了这个任务参数。

  1. import lombok.Data;
  2. import java.util.List;
  3. /***
  4. * 任务参数
  5. *
  6. * @author anin
  7. */
  8. @Data
  9. public class TaskData<T> {
  10. /**
  11. * 实现类:数据
  12. */
  13. private List<TaskDataInfo<T>> data;
  14. public Integer getTaskCount() {
  15. return data.size();
  16. }
  17. @Data
  18. public static class TaskDataInfo<T> {
  19. /**
  20. * 任务名称
  21. */
  22. private String name;
  23. /**
  24. * 任务实现服务
  25. */
  26. private TaskService taskService;
  27. /**
  28. * task class
  29. */
  30. private Class<?> clazz;
  31. /**
  32. * 任务数据
  33. */
  34. private T data;
  35. }
  36. }

TaskService.java

任务执行接口,需要实现该方法。

  1. /***
  2. * 任务执行接口
  3. *
  4. * @author anin
  5. */
  6. public interface TaskService {
  7. /**
  8. * 业务执行
  9. */
  10. <T> Boolean execute(T param);
  11. }

UserServiceImpl.java

一个示例实现方法。

  1. import com.example.threadinsertdemo.executor.TaskData;
  2. import com.example.threadinsertdemo.executor.TaskService;
  3. import com.example.threadinsertdemo.executor.ThreadPoolExecutor;
  4. import com.example.threadinsertdemo.mapper.UserMapper;
  5. import com.example.threadinsertdemo.pojo.User;
  6. import com.example.threadinsertdemo.pojo.UserData;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  10. import org.springframework.stereotype.Service;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. /***
  14. * 任务执行接口
  15. *
  16. * @author anin
  17. */
  18. @Slf4j
  19. @Service
  20. public class UserServiceImpl implements TaskService {
  21. @Autowired
  22. private UserMapper userMapper;
  23. @Autowired
  24. private DataSourceTransactionManager dataSourceTransactionManager;
  25. public void insert2(UserData userData){
  26. TaskData<User> taskData = new TaskData<>();
  27. List<TaskData.TaskDataInfo<User>> users = new ArrayList<>();
  28. TaskData.TaskDataInfo<User> taskDataInfo1 = new TaskData.TaskDataInfo<>();
  29. taskDataInfo1.setName("user1");
  30. taskDataInfo1.setTaskService(this);
  31. taskDataInfo1.setClazz(UserTask1.class);
  32. taskDataInfo1.setData(userData.getUser1());
  33. TaskData.TaskDataInfo<User> taskDataInfo2 = new TaskData.TaskDataInfo<>();
  34. taskDataInfo2.setName("user2");
  35. taskDataInfo2.setTaskService(this);
  36. taskDataInfo2.setClazz(UserTask2.class);
  37. taskDataInfo2.setData(userData.getUser2());
  38. TaskData.TaskDataInfo<User> taskDataInfo3 = new TaskData.TaskDataInfo<>();
  39. taskDataInfo3.setName("user3");
  40. taskDataInfo3.setTaskService(this);
  41. taskDataInfo3.setClazz(UserTask3.class);
  42. taskDataInfo3.setData(userData.getUser3());
  43. users.add(taskDataInfo1);
  44. users.add(taskDataInfo2);
  45. users.add(taskDataInfo3);
  46. taskData.setData(users);
  47. //调用多线程工具方法
  48. ThreadPoolExecutor.getThreadPoolExecutor().execute(dataSourceTransactionManager,taskData);
  49. }
  50. @Override
  51. public Boolean execute(Object param) {
  52. return userMapper.insert((User) param) == 1;
  53. }
  54. }

UserTask1.java

UserTask1、UserTask2、UserTask3类一致

  1. import com.example.threadinsertdemo.executor.RollBack;
  2. import com.example.threadinsertdemo.executor.TaskData;
  3. import com.example.threadinsertdemo.executor.TaskService;
  4. import com.example.threadinsertdemo.executor.AbstractThreadTask;
  5. import com.example.threadinsertdemo.pojo.User;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  8. import java.util.concurrent.BlockingDeque;
  9. import java.util.concurrent.CountDownLatch;
  10. /***
  11. * 测试Task类
  12. *
  13. * @author anin
  14. */
  15. @Slf4j
  16. public class UserTask1 extends AbstractThreadTask<User> {
  17. /**
  18. * 分批处理的数据
  19. */
  20. private User data;
  21. /**
  22. * 可能需要注入的某些服务
  23. */
  24. private TaskService taskService;
  25. public UserTask1(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque<Boolean> result, RollBack rollback, DataSourceTransactionManager transactionManager, TaskData.TaskDataInfo<User> taskDataInfo) {
  26. super(childCountDown, mainCountDown, result, rollback, transactionManager, taskDataInfo);
  27. }
  28. @Override
  29. public void initParam() {
  30. TaskData.TaskDataInfo<User> dataInfo = getDataInfo();
  31. this.data = dataInfo.getData();
  32. this.taskService = dataInfo.getTaskService();
  33. }
  34. /**
  35. * 执行任务
  36. *
  37. * @return 返回false表示任务执行错误,需要回滚
  38. */
  39. @Override
  40. public boolean processTask() {
  41. log.info(" >>>>>>>>>>>>>>> 执行task1多线程任务逻辑");
  42. try {
  43. taskService.execute(data);
  44. return true;
  45. } catch (Exception e) {
  46. return false;
  47. }
  48. }
  49. }

正常提交测试

  1. @RestController
  2. public class WebController {
  3. @Autowired
  4. private UserServiceImpl userService;
  5. @GetMapping("/test2")
  6. public String test2(){
  7. User user1 = new User();
  8. user1.setId(1);
  9. user1.setName("wangwu1");
  10. User user2 = new User();
  11. user2.setId(2);
  12. user2.setName("wangwu2");
  13. User user3 = new User();
  14. user3.setId(3);
  15. user3.setName("wangwu3");
  16. UserData userData = new UserData();
  17. userData.setUser1(user1);
  18. userData.setUser2(user2);
  19. userData.setUser3(user3);
  20. userService.insert2(userData);
  21. return "suc";
  22. }
  23. }

终端输出
  1. 2021-06-01 15:15:53.764 INFO 17604 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
  2. 2021-06-01 15:15:53.764 INFO 17604 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
  3. 2021-06-01 15:15:53.765 INFO 17604 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
  4. 2021-06-01 15:15:53.810 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程池开始初始化 <<<<<<<<<<<<<<<
  5. 2021-06-01 15:15:53.811 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 核心线程数: 9 <<<<<<<<<<<<<<<
  6. 2021-06-01 15:15:53.818 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 最大线程数: 18 <<<<<<<<<<<<<<<
  7. 2021-06-01 15:15:53.818 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程队列长度: 72 <<<<<<<<<<<<<<<
  8. 2021-06-01 15:15:53.818 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程池结束初始化 <<<<<<<<<<<<<<<
  9. 2021-06-01 15:15:53.824 INFO 17604 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 开始执行任务
  10. 2021-06-01 15:15:53.824 INFO 17604 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 开始执行任务
  11. 2021-06-01 15:15:53.824 INFO 17604 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 开始执行任务
  12. 2021-06-01 15:15:53.845 INFO 17604 --- [ test-0] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
  13. 2021-06-01 15:15:54.624 INFO 17604 --- [ test-0] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
  14. 2021-06-01 15:15:54.635 INFO 17604 --- [ test-0] c.e.threadinsertdemo.service.UserTask1 : >>>>>>>>>>>>>>> 执行task1多线程任务逻辑
  15. 2021-06-01 15:15:54.667 DEBUG 17604 --- [ test-0] c.e.t.mapper.UserMapper.insert : ==> Preparing: insert into`user` (`name`) values (?)
  16. 2021-06-01 15:15:54.675 INFO 17604 --- [ test-2] c.e.threadinsertdemo.service.UserTask3 : >>>>>>>>>>>>>>> 执行task3多线程任务逻辑
  17. 2021-06-01 15:15:54.675 DEBUG 17604 --- [ test-2] c.e.t.mapper.UserMapper.insert : ==> Preparing: insert into`user` (`name`) values (?)
  18. 2021-06-01 15:15:54.697 INFO 17604 --- [ test-1] c.e.threadinsertdemo.service.UserTask2 : >>>>>>>>>>>>>>> 执行task2多线程任务逻辑
  19. 2021-06-01 15:15:54.697 DEBUG 17604 --- [ test-1] c.e.t.mapper.UserMapper.insert : ==> Preparing: insert into`user` (`name`) values (?)
  20. 2021-06-01 15:15:54.697 DEBUG 17604 --- [ test-0] c.e.t.mapper.UserMapper.insert : ==> Parameters: wangwu1(String)
  21. 2021-06-01 15:15:54.697 DEBUG 17604 --- [ test-2] c.e.t.mapper.UserMapper.insert : ==> Parameters: wangwu3(String)
  22. 2021-06-01 15:15:54.697 DEBUG 17604 --- [ test-1] c.e.t.mapper.UserMapper.insert : ==> Parameters: wangwu2(String)
  23. 2021-06-01 15:15:54.699 DEBUG 17604 --- [ test-0] c.e.t.mapper.UserMapper.insert : <== Updates: 1
  24. 2021-06-01 15:15:54.699 DEBUG 17604 --- [ test-1] c.e.t.mapper.UserMapper.insert : <== Updates: 1
  25. 2021-06-01 15:15:54.701 DEBUG 17604 --- [ test-2] c.e.t.mapper.UserMapper.insert : <== Updates: 1
  26. 2021-06-01 15:15:54.701 INFO 17604 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 主线程开始执行任务
  27. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 执行剩下的任务
  28. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 执行剩下的任务
  29. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 执行剩下的任务
  30. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 开始提交事务
  31. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 开始提交事务
  32. 2021-06-01 15:15:54.701 INFO 17604 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 开始提交事务

查看数据库
  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/12699730/1622532814409-b04da942-ed7d-4509-978a-3b7129b6e61a.png#align=left&display=inline&height=130&margin=%5Bobject%20Object%5D&name=image.png&originHeight=130&originWidth=193&size=5971&status=done&style=none&width=193)<br />成功插入数据。

异常回滚测试

将任意一个task的processTask方法返回改为false,如下

  1. /**
  2. * 执行任务
  3. *
  4. * @return 返回false表示任务执行错误,需要回滚
  5. */
  6. @Override
  7. public boolean processTask() {
  8. log.info(" >>>>>>>>>>>>>>> 执行task3多线程任务逻辑");
  9. return false;
  10. }

终端输出
  1. 2021-06-01 15:31:14.410 INFO 18044 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
  2. 2021-06-01 15:31:14.410 INFO 18044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
  3. 2021-06-01 15:31:14.411 INFO 18044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
  4. 2021-06-01 15:31:14.431 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程池开始初始化 <<<<<<<<<<<<<<<
  5. 2021-06-01 15:31:14.431 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 核心线程数: 9 <<<<<<<<<<<<<<<
  6. 2021-06-01 15:31:14.432 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 最大线程数: 18 <<<<<<<<<<<<<<<
  7. 2021-06-01 15:31:14.432 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程队列长度: 72 <<<<<<<<<<<<<<<
  8. 2021-06-01 15:31:14.432 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 线程池结束初始化 <<<<<<<<<<<<<<<
  9. 2021-06-01 15:31:14.434 INFO 18044 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 开始执行任务
  10. 2021-06-01 15:31:14.434 INFO 18044 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 开始执行任务
  11. 2021-06-01 15:31:14.434 INFO 18044 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 开始执行任务
  12. 2021-06-01 15:31:14.438 INFO 18044 --- [ test-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
  13. 2021-06-01 15:31:14.701 INFO 18044 --- [ test-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
  14. 2021-06-01 15:31:14.712 INFO 18044 --- [ test-2] c.e.threadinsertdemo.service.UserTask3 : >>>>>>>>>>>>>>> 执行task3多线程任务逻辑
  15. 2021-06-01 15:31:14.728 INFO 18044 --- [ test-1] c.e.threadinsertdemo.service.UserTask2 : >>>>>>>>>>>>>>> 执行task2多线程任务逻辑
  16. 2021-06-01 15:31:14.748 DEBUG 18044 --- [ test-1] c.e.t.mapper.UserMapper.insert : ==> Preparing: insert into`user` (`name`) values (?)
  17. 2021-06-01 15:31:14.761 INFO 18044 --- [ test-0] c.e.threadinsertdemo.service.UserTask1 : >>>>>>>>>>>>>>> 执行task1多线程任务逻辑
  18. 2021-06-01 15:31:14.761 DEBUG 18044 --- [ test-0] c.e.t.mapper.UserMapper.insert : ==> Preparing: insert into`user` (`name`) values (?)
  19. 2021-06-01 15:31:14.772 DEBUG 18044 --- [ test-1] c.e.t.mapper.UserMapper.insert : ==> Parameters: wangwu2(String)
  20. 2021-06-01 15:31:14.772 DEBUG 18044 --- [ test-0] c.e.t.mapper.UserMapper.insert : ==> Parameters: wangwu1(String)
  21. 2021-06-01 15:31:14.774 DEBUG 18044 --- [ test-1] c.e.t.mapper.UserMapper.insert : <== Updates: 1
  22. 2021-06-01 15:31:14.775 DEBUG 18044 --- [ test-0] c.e.t.mapper.UserMapper.insert : <== Updates: 1
  23. 2021-06-01 15:31:14.775 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 主线程开始执行任务
  24. 2021-06-01 15:31:14.775 INFO 18044 --- [nio-8080-exec-1] c.e.t.executor.ThreadPoolExecutor : >>>>>>>>>>>>>>> 主线程检测到有任务执行失败,通知子线程进行回滚
  25. 2021-06-01 15:31:14.775 INFO 18044 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 执行剩下的任务
  26. 2021-06-01 15:31:14.775 INFO 18044 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 执行剩下的任务
  27. 2021-06-01 15:31:14.775 INFO 18044 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 执行剩下的任务
  28. 2021-06-01 15:31:14.776 INFO 18044 --- [ test-0] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-0 开始回滚事务
  29. 2021-06-01 15:31:14.776 INFO 18044 --- [ test-1] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-1 开始回滚事务
  30. 2021-06-01 15:31:14.776 INFO 18044 --- [ test-2] c.e.t.executor.AbstractThreadTask : >>>>>>>>>>>>>>> 子线程:test-2 开始回滚事务

查看数据库
  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/12699730/1622532708184-a1c24713-dafc-48da-bd19-e55472481baf.png#align=left&display=inline&height=71&margin=%5Bobject%20Object%5D&name=image.png&originHeight=71&originWidth=201&size=3507&status=done&style=none&width=201)<br />插入数据成功回滚。

源码地址

https://gitee.com/aninn/thread-insert-demo