众所周知,java8提供了stream API, 使开发者可以方便的对数据进行处理,这其中有steam和parallelStream。需要注意的是,parallelStream使用不当会引发一些奇怪的问题。

问题的发现

线上环境在进行数据处理的时候,产生异常

问题排查

经过排查日志,我们发现后端代码产生了空指针异常,我们排查了代码,其中有这么一块

  1. List<com.amazonaws.services.ec2.model.Image> images = new ArrayList<>();
  2. DescribeImagesResult imagesResponse = contextThreadLocal.get().getEC2Client()
  3. .describeImages(imagesRequest);
  4. imagesResponse.getImages().parallelStream().forEach(image -> {
  5. if (Constants.OS_TYPE.WINDOWS.equalsIgnoreCase(image.getPlatform())) {
  6. boolean isValid = StrUtil.containsIgnoreCase(image.getName(), "Base") &&
  7. StrUtil.containsIgnoreCase(image.getName(), "English") ||
  8. StrUtil.containsIgnoreCase(image.getName(), "Chinese");
  9. if (isValid) {
  10. images.add(image);
  11. }
  12. } else {
  13. images.add(image);
  14. }
  15. });

问题的原因

Java8的paralleStream用fork/join框架提供了并发执行能力。直白点说,这里list().parallelStream().forEach是一个多线程的并发环境,对一个images进行add操作具有不可预期的结果,可能会数组越界,也可能会元素丢失,也可能会部分index的引用为null。

简单的验证

  1. for (int i = 0; i < 100; i++) {
  2. List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  3. List<Integer> list2 = new ArrayList<>();
  4. list.parallelStream().forEach(list2::add);
  5. // 由于多线程的不确定性,排序确保我们可以清晰的看到运行结果的差异性
  6. list2.sort(Comparator.comparingInt(o -> o));
  7. System.out.println(list2);
  8. }

可以看出来,我在外部加了一个循环,循环100次,内部循环体的内容就是将list的数据赋值到list2,我们期望的结果当然是不管运行多少次,list都被正常赋值到list2中,下面我们可以来看看运行结果

  1. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  2. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  3. [2, 3, 4, 5, 6, 7, 8, 9, 10]
  4. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  5. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  6. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  7. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  8. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  9. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  10. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  11. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  12. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  13. [1, 2, 3, 4, 5, 7, 8, 9, 10]
  14. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  15. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  16. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  17. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  18. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  19. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  20. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  21. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  22. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  23. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  24. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  25. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  26. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  27. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  28. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  29. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  30. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  31. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  32. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  33. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  34. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  35. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  36. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  37. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  38. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  39. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  40. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  41. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  42. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  43. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  44. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  45. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  46. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  47. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  48. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  49. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  50. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  51. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  52. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  53. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  54. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  55. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  56. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  57. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  58. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  59. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  60. [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  61. Exception in thread "main" java.lang.NullPointerException
  62. at cn.com.concurrency.example.test.main.ParallelStream.lambda$main$0(ParallelStream.java:23)
  63. at cn.com.concurrency.example.test.main.ParallelStream$$Lambda$2/806353501.applyAsInt(Unknown Source)
  64. at java.util.Comparator.lambda$comparingInt$7b0bb60$1(Comparator.java:490)
  65. at java.util.Comparator$$Lambda$3/935044096.compare(Unknown Source)
  66. at java.util.TimSort.countRunAndMakeAscending(TimSort.java:351)
  67. at java.util.TimSort.sort(TimSort.java:216)
  68. at java.util.Arrays.sort(Arrays.java:1512)
  69. at java.util.ArrayList.sort(ArrayList.java:1454)
  70. at cn.com.concurrency.example.test.main.ParallelStream.main(ParallelStream.java:23)

从这个运行结果我们可以看出来

  1. 在第三行和第十三行发生了数据的缺失,list2并没有拿到正确的结果
  2. 甚至最后程序没有正常退出,而是发生了空指针异常

结论:这就是我们之前是所分析的,由于线程不安全带来的运行结果的不确定性,产生了不可预期的结果

问题的解决

  1. 考虑是否真的需要并发,如果不需要,改为单线程的stream即可处理
  2. 将容器images改为线程安全的容器,如Collections.synchronizedList(_new ArrayList<>())_;