Java 多线程
业务中遇到一个需求,需要同时修改最多约5万条数据,而且还不支持批量或异步修改操作。于是只能写个for循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。
具体操作如下:

1、循环操作的代码

先写一个最简单的for循环代码,看看耗时情况怎么样。

  1. /***
  2. * 一条一条依次对50000条数据进行更新操作
  3. * 耗时:2m27s,1m54s
  4. */
  5. @Test
  6. void updateStudent() {
  7. List<Student> allStudents = studentMapper.getAll();
  8. allStudents.forEach(s -> {
  9. //更新教师信息
  10. String teacher = s.getTeacher();
  11. String newTeacher = "TNO_" + new Random().nextInt(100);
  12. s.setTeacher(newTeacher);
  13. studentMapper.update(s);
  14. });
  15. }

循环修改整体耗时约 1分54秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,先对代码中添加手动事务控制,看查询效率怎样。

2、使用手动事务的操作代码

修改后的代码如下:

  1. @Autowired
  2. private DataSourceTransactionManager dataSourceTransactionManager;
  3. @Autowired
  4. private TransactionDefinition transactionDefinition;
  5. /**
  6. * 由于希望更新操作 一次性完成,需要手动控制添加事务
  7. * 耗时:24s
  8. * 从测试结果可以看出,添加事务后插入数据的效率有明显的提升
  9. */
  10. @Test
  11. void updateStudentWithTrans() {
  12. List<Student> allStudents = studentMapper.getAll();
  13. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  14. try {
  15. allStudents.forEach(s -> {
  16. //更新教师信息
  17. String teacher = s.getTeacher();
  18. String newTeacher = "TNO_" + new Random().nextInt(100);
  19. s.setTeacher(newTeacher);
  20. studentMapper.update(s);
  21. });
  22. dataSourceTransactionManager.commit(transactionStatus);
  23. } catch (Throwable e) {
  24. dataSourceTransactionManager.rollback(transactionStatus);
  25. throw e;
  26. }
  27. }

添加手动事务操控制后,整体耗时约 24秒,这相对于自动事务提交的代码,快了约5倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

3、尝试多线程进行数据修改

添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。
先添加一个Service将批量修改操作整合一下,具体代码如下:

StudentServiceImpl.java

  1. @Service
  2. public class StudentServiceImpl implements StudentService {
  3. @Autowired
  4. private StudentMapper studentMapper;
  5. @Autowired
  6. private DataSourceTransactionManager dataSourceTransactionManager;
  7. @Autowired
  8. private TransactionDefinition transactionDefinition;
  9. @Override
  10. public void updateStudents(List<Student> students, CountDownLatch threadLatch) {
  11. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  12. System.out.println("子线程:" + Thread.currentThread().getName());
  13. try {
  14. students.forEach(s -> {
  15. // 更新教师信息
  16. // String teacher = s.getTeacher();
  17. String newTeacher = "TNO_" + new Random().nextInt(100);
  18. s.setTeacher(newTeacher);
  19. studentMapper.update(s);
  20. });
  21. dataSourceTransactionManager.commit(transactionStatus);
  22. threadLatch.countDown();
  23. } catch (Throwable e) {
  24. e.printStackTrace();
  25. dataSourceTransactionManager.rollback(transactionStatus);
  26. }
  27. }
  28. }

批量测试代码,采用了多线程进行提交,修改后测试代码如下:

  1. @Autowired
  2. private DataSourceTransactionManager dataSourceTransactionManager;
  3. @Autowired
  4. private TransactionDefinition transactionDefinition;
  5. @Autowired
  6. private StudentService studentService;
  7. /**
  8. * 对用户而言,27s 任是一个较长的时间,尝试用多线程的方式来经行修改操作看能否加快处理速度
  9. * 预计创建10个线程,每个线程进行5000条数据修改操作
  10. * 耗时统计
  11. * 1 线程数:1 耗时:25s
  12. * 2 线程数:2 耗时:14s
  13. * 3 线程数:5 耗时:15s
  14. * 4 线程数:10 耗时:15s
  15. * 5 线程数:100 耗时:15s
  16. * 6 线程数:200 耗时:15s
  17. * 7 线程数:500 耗时:17s
  18. * 8 线程数:1000 耗时:19s
  19. * 8 线程数:2000 耗时:23s
  20. * 8 线程数:5000 耗时:29s
  21. */
  22. @Test
  23. void updateStudentWithThreads() {
  24. //查询总数据
  25. List<Student> allStudents = studentMapper.getAll();
  26. // 线程数量
  27. final Integer threadCount = 100;
  28. //每个线程处理的数据量
  29. final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  30. // 创建多线程处理任务
  31. ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  32. CountDownLatch threadLatchs = new CountDownLatch(threadCount);
  33. for (int i = 0; i < threadCount; i++) {
  34. // 每个线程处理的数据
  35. List<Student> threadDatas = allStudents.stream()
  36. .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
  37. studentThreadPool.execute(() -> {
  38. studentService.updateStudents(threadDatas, threadLatchs);
  39. });
  40. }
  41. try {
  42. // 倒计时锁设置超时时间 30s
  43. threadLatchs.await(30, TimeUnit.SECONDS);
  44. } catch (Throwable e) {
  45. e.printStackTrace();
  46. }
  47. System.out.println("主线程完成");
  48. }

多线程提交修改时,尝试了不同线程数对提交速度的影响,具体可以看下面表格,

线程数 1 2 5 10 100 500 1000 2000 5000
耗时/s 25 14 15 15 15 17 19 23 29

根据表格,线程数增大提交速度并非一直增大,在当前情况下约在2-5个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

4、基于两个CountDownLatch控制多线程事务提交

由于多线程提交时,每个线程事务时单独的,无法保证一致性,尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务,
这里使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。对代码进行了一点修改:

  1. @Override
  2. public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
  3. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  4. System.out.println("子线程:" + Thread.currentThread().getName());
  5. try {
  6. students.forEach(s -> {
  7. // 更新教师信息
  8. // String teacher = s.getTeacher();
  9. String newTeacher = "TNO_" + new Random().nextInt(100);
  10. s.setTeacher(newTeacher);
  11. studentMapper.update(s);
  12. });
  13. } catch (Throwable e) {
  14. taskStatus.setIsError();
  15. } finally {
  16. threadLatch.countDown(); // 切换到主线程执行
  17. }
  18. try {
  19. mainLatch.await(); //等待主线程执行
  20. } catch (Throwable e) {
  21. taskStatus.setIsError();
  22. }
  23. // 判断是否有错误,如有错误 就回滚事务
  24. if (taskStatus.getIsError()) {
  25. dataSourceTransactionManager.rollback(transactionStatus);
  26. } else {
  27. dataSourceTransactionManager.commit(transactionStatus);
  28. }
  29. }
  30. /**
  31. * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制
  32. * 这边使用两个 CountDownLatch 对子线程的事务进行控制
  33. */
  34. @Test
  35. void updateStudentWithThreadsAndTrans() {
  36. //查询总数据
  37. List<Student> allStudents = studentMapper.getAll();
  38. // 线程数量
  39. final Integer threadCount = 4;
  40. //每个线程处理的数据量
  41. final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  42. // 创建多线程处理任务
  43. ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  44. CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
  45. CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
  46. StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误
  47. for (int i = 0; i < threadCount; i++) {
  48. // 每个线程处理的数据
  49. List<Student> threadDatas = allStudents.stream()
  50. .skip(i * dataPartionLength).limit(dataPartionLength)
  51. .collect(Collectors.toList());
  52. studentThreadPool.execute(() -> {
  53. studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
  54. });
  55. }
  56. try {
  57. // 倒计时锁设置超时时间 30s
  58. boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
  59. if (!await) { // 等待超时,事务回滚
  60. taskStatus.setIsError();
  61. }
  62. } catch (Throwable e) {
  63. e.printStackTrace();
  64. taskStatus.setIsError();
  65. }
  66. mainLatch.countDown(); // 切换到子线程执行
  67. studentThreadPool.shutdown(); //关闭线程池
  68. System.out.println("主线程完成");
  69. }

本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过10个时,执行时就报错。具体错误内容如下:

  1. Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
  2. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
  3. at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
  4. at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
  5. at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
  6. at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
  7. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  8. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  9. at java.lang.Thread.run(Thread.java:748)
  10. Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
  11. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
  12. at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
  13. at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
  14. at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
  15. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
  16. ... 7 more

错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在30s的时候超时了。由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。
看错误日志中错误的来源是 HikariPool ,来重新配置一下这个连接池的参数,将最大连接数修改为100,具体配置如下:

  1. # 连接池中允许的最小连接数。缺省值:10
  2. spring.datasource.hikari.minimum-idle=10
  3. # 连接池中允许的最大连接数。缺省值:10
  4. spring.datasource.hikari.maximum-pool-size=100
  5. # 自动提交
  6. spring.datasource.hikari.auto-commit=true
  7. # 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟
  8. spring.datasource.hikari.idle-timeout=30000
  9. # 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒
  10. spring.datasource.hikari.max-lifetime=1800000
  11. # 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒

再次执行测试发现没有报错,修改线程数为20又执行了一下,同样执行成功了。

5、基于TransactionStatus集合来控制多线程事务提交

使用事务集合来进行多线程事务控制,主要代码如下

  1. @Service
  2. public class StudentsTransactionThread {
  3. @Autowired
  4. private StudentMapper studentMapper;
  5. @Autowired
  6. private StudentService studentService;
  7. @Autowired
  8. private PlatformTransactionManager transactionManager;
  9. List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
  10. @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
  11. public void updateStudentWithThreadsAndTrans() throws InterruptedException {
  12. //查询总数据
  13. List<Student> allStudents = studentMapper.getAll();
  14. // 线程数量
  15. final Integer threadCount = 2;
  16. //每个线程处理的数据量
  17. final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
  18. // 创建多线程处理任务
  19. ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
  20. CountDownLatch threadLatchs = new CountDownLatch(threadCount);
  21. AtomicBoolean isError = new AtomicBoolean(false);
  22. try {
  23. for (int i = 0; i < threadCount; i++) {
  24. // 每个线程处理的数据
  25. List<Student> threadDatas = allStudents.stream()
  26. .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
  27. studentThreadPool.execute(() -> {
  28. try {
  29. try {
  30. studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
  31. } catch (Throwable e) {
  32. e.printStackTrace();
  33. isError.set(true);
  34. }finally {
  35. threadLatchs.countDown();
  36. }
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. isError.set(true);
  40. }
  41. });
  42. }
  43. // 倒计时锁设置超时时间 30s
  44. boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
  45. // 判断是否超时
  46. if (!await) {
  47. isError.set(true);
  48. }
  49. } catch (Throwable e) {
  50. e.printStackTrace();
  51. isError.set(true);
  52. }
  53. if (!transactionStatuses.isEmpty()) {
  54. if (isError.get()) {
  55. transactionStatuses.forEach(s -> transactionManager.rollback(s));
  56. } else {
  57. transactionStatuses.forEach(s -> transactionManager.commit(s));
  58. }
  59. }
  60. System.out.println("主线程完成");
  61. }
  62. }
  63. @Override
  64. @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
  65. public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {
  66. // 使用这种方式将事务状态都放在同一个事务里面
  67. DefaultTransactionDefinition def = new DefaultTransactionDefinition();
  68. def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
  69. TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
  70. transactionStatuses.add(status);
  71. students.forEach(s -> {
  72. // 更新教师信息
  73. // String teacher = s.getTeacher();
  74. String newTeacher = "TNO_" + new Random().nextInt(100);
  75. s.setTeacher(newTeacher);
  76. studentMapper.update(s);
  77. });
  78. System.out.println("子线程:" + Thread.currentThread().getName());
  79. }

由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用Jdbc连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量,

6、使用union连接多个select实现批量update

有些情况写不支持,批量update,但支持insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条select 语句,然后使用union 连接起来,再使用update 关联这个数据进行update,具体代码演示如下:

  1. update student,(
  2. (select 1 as id,'teacher_A' as teacher) union
  3. (select 2 as id,'teacher_A' as teacher) union
  4. (select 3 as id,'teacher_A' as teacher) union
  5. (select 4 as id,'teacher_A' as teacher)
  6. /* ....more data ... */
  7. ) as new_teacher
  8. set
  9. student.teacher=new_teacher.teacher
  10. where
  11. student.id=new_teacher.id

这种方式在MySQL数据库没有配置 allowMultiQueries=true 也可以实现批量更新。

总结

  • 对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率
  • 多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在2-5个线程时操作时间最快。
  • 对于多线程阻塞事务提交时,线程数量不能过多。
  • 如果能有办法实现批量更新那是最好