由于 list.parallelStream()
使用默认的公共的线程池ForkJoinPool._commonPool_
,所以没法单独配置一些属性,比如并行线程数量、异常处理器、线程工厂等,但是可以设置全局的,下面一个一个来说明。先来看看源码,构建 公共线程池的源码如下
// java.util.concurrent.ForkJoinPool#makeCommonPool
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
// 设置并行度
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
// 设置线程工厂
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
// 设置一次处理器
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
并行度设置
// 方式在 java.util.concurrent.ForkJoinPool 的类文档上
默认情况下,公共池是使用默认参数构造的,但可以通过设置三个系统属性来控制这些参数:
java.util.concurrent.ForkJoinPool.common.parallelism - 并行度,非负整数
java.util.concurrent.ForkJoinPool.common.threadFactory - ForkJoinPool.ForkJoinWorkerThreadFactory 的类名
java.util.concurrent.ForkJoinPool.common.exceptionHandler - Thread.UncaughtExceptionHandler 的类名
可以参考下面的的代码:
private static void setForkJoinPoolCommonPolParallelism() {
// 获取当前虚拟机可用 CPU 核数
int availableProcessors = Runtime.getRuntime().availableProcessors();
// 默认使用 10 倍的 CPU 线程数量
int ap = availableProcessors * 10;
// 最小 100 个线程
int min = 100;
int finalCpu = Integer.max(ap, min);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", finalCpu + "");
}
:::warning
在 Spring Boot 中设置的话,需要在 SpringApplication.run(Application.class, args); 代码之前
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone(“Asia/Shanghai”));
比如这里设置
SpringApplication.run(Application.class, args);
}
:::
异常处理器设置
异常处理器的详细说明在 前面笔记中有 说明,简单说:
- 在开发环境如果没有异常处理器,业务处理出现异常的话(如下所示),你能在控制台看到出现异常的地方
- 在线上环境:一般都会不收集控制台的信息,而是使用日志框架的日志信息,所以这个时候你从日志文件里面是看不到出错信息的;(这个结论是错的, 前面笔记中 的例子由于在自定义线程池中使用了并行流,被自定义的线程池没有设置异常处理器,所以异常被打印到了 System.err 上,导致看不到异常)=
所以:我们必须要提前设置一次处理器的异常信息用日志框架打印。设置方式如下:list.parallelStream().forEach(item -> {
处理出现异常的话
VocAttrs record = new VocAttrs();
record.setId(item.getId());
record.setReviewDay(DateUtils.dateToDayInt(item.getReviewDate()));
record.setReviewMonth(DateUtils.dateToMonthInt(item.getReviewDate()));
record.setReviewYearweek(DateUtils.dateToWeekInt(item.getReviewDate()));
vocAttrsService.update(record);
});
第一步:先实现一个自定义的异常处理器
package cn.mrcode;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
/**
* ForkJoinPool 公共线程池异常处理器
*/
@Slf4j
public class ForkJoinPoolCommonPolExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(StrUtil.format("ForkJoinPool 公共线程异常,threadName={}", t.getName()), e);
}
}
第二步:设置系统属性
// 设置异常处理器
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",
ForkJoinPoolCommonPolExceptionHandler.class.getName());
但是这样设置之后,你在使用并行流的时候还是会发现看不到日志,好像并没有设置生效一样,你可以通过 demo 程序来 debug ,最终你会发现:公共池中的线程上确实是有我们设置的异常处理器,但是:异常传递到了父线程(你使用并行流的时候,当前的线程是不是还是等待结果的返回,这个就是父线程),但是这个 父线程上的异常处理器是空的,最后就使用了线程组上的异常处理器,使用了 System.error 打印了日志,在生产环境的日志信息中看不到这些异常消息。
下面是这个例子
package cn.mrcode;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* @author mrcode
*/
public class TimeDemo {
public static void main(String[] args) throws InterruptedException {
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",
ForkJoinPoolCommonPolExceptionHandler.class.getName());
// 获取并行度
int parallelism = ForkJoinPool.getCommonPoolParallelism();
// 列表比并行度大1,以启用并行度
Stream<Integer> stream = IntStream.range(1, parallelism + 1).mapToObj(Integer::valueOf).collect(Collectors.toList())
.parallelStream();
stream.forEach(item -> {
System.out.println(item + ":" + Thread.currentThread() + " " + Thread.currentThread().getThreadGroup());
int a = 1 / 0;
System.out.println("item:" + item);
});
TimeUnit.SECONDS.sleep(10);
}
运行后的控制台输出,和异常信息
7:Thread[main,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
8:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
6:Thread[ForkJoinPool.commonPool-worker-2,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
3:Thread[ForkJoinPool.commonPool-worker-9,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
1:Thread[ForkJoinPool.commonPool-worker-4,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
4:Thread[ForkJoinPool.commonPool-worker-9,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
2:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
5:Thread[ForkJoinPool.commonPool-worker-2,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
Exception in thread "main" java.lang.ArithmeticException: / by zero
at cr.mrcode.TimeDemocr.lambda$main$0(TimeDemo.java:25)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at cn.mrcode.TimeDemo.main(TimeDemo.java:23)
从输出结果来看:所有的 worker 线程都是 main 线程 fork 出来的,也都有一个共同的 线程组。还有一个 work 线程是 main 线程。由于是等待结果的 fork/join 操作(这里我很多线程知识已经忘记了,所以不是很清楚原理了),所以 worker 线程报错之后就被 main 线程捕获了,而 man 线程也是一个 workder 线程,它也会走异常处理这一流程,从而导致在异常调度的时候,获取不到 main 线程的异常处理器。
明白了原因,那么我们有几种方式:
- 自己在使用并行流的时候,每个操作都手动 try 下:这种方法,代码也太不优雅了,但是由于异常会传递到当前线程上来,也相当于是被处理过了,所以这个还好
- 在使用并行流前,手动设置下当前线程的异常处理器
在使用前,手动设置当前线程的异常处理器
```java package cn.mrcode;
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream;
/**
@author mrcode */ public class TimeDemo { public static void main(String[] args) throws InterruptedException {
// 这里获取始终都有值,有可能是 线程组的
Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.currentThread().getUncaughtExceptionHandler();
Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("自定义程处理器:" + t + " ; 异常:" + e);
// 这里我还不清楚是否有其他功能会针对这种异常处理设置特别的线程组异常处理器
// 所以转调下
uncaughtExceptionHandler.uncaughtException(t,e);
}
});
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",
ForkJoinPoolCommonPolExceptionHandler.class.getName());
// 获取并行度
int parallelism = ForkJoinPool.getCommonPoolParallelism();
// 列表比并行度大1,以启用并行度
Stream<Integer> stream = IntStream.range(1, parallelism + 1).mapToObj(Integer::valueOf).collect(Collectors.toList())
.parallelStream();
stream.forEach(item -> {
System.out.println(item + ":" + Thread.currentThread() + " " + Thread.currentThread().getThreadGroup());
int a = 1 / 0;
System.out.println("item:" + item);
});
TimeUnit.SECONDS.sleep(10);
} }
控制台输出如下
```bash
7:Thread[main,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
8:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
6:Thread[ForkJoinPool.commonPool-worker-2,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
2:Thread[ForkJoinPool.commonPool-worker-4,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
1:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
3:Thread[ForkJoinPool.commonPool-worker-9,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
5:Thread[ForkJoinPool.commonPool-worker-2,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
4:Thread[ForkJoinPool.commonPool-worker-13,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
自定义程处理器:Thread[main,5,main] ; 异常:java.lang.ArithmeticException: / by zero
Exception in thread "main" java.lang.ArithmeticException: / by zero
at cn.mrcode.TimeDemo.lambda$main$0(TimeDemo.java:37)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at cn.mrcode.TimeDemo.main(TimeDemo.java:35)
:::danger 可以看到:在并行流中还存在其他的知识点,从结果来看,执行了 8 条数据,但是只有一条数据的异常被捕获到了。所以从这里来看这种多线程出错都是一个通病,一个地方出错,都停止处理,还是都处理的一个决策。 这里不去讨论。 ::: 如果你在主线程上加上 try
try {
stream.forEach(item -> {
System.out.println(item + ":" + Thread.currentThread() + " " + Thread.currentThread().getThreadGroup());
int a = 1 / 0;
System.out.println("item:" + item);
});
} catch (Exception e) {
System.out.println(e);
}
你会发现,异常的时候走了 try 的异常处理,所以这一块,我感觉我的基础知识已经完全混乱了; :::danger 但是也明白了并不是 stream 偶尔会出现异常被吞的情况,而是自己在自定义的多线程中使用了并行流,然而调用处又没有 try。 :::
7:Thread[main,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
6:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
java.lang.ArithmeticException: / by zero
9:Thread[ForkJoinPool.commonPool-worker-4,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
4:Thread[ForkJoinPool.commonPool-worker-4,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
2:Thread[ForkJoinPool.commonPool-worker-11,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
1:Thread[ForkJoinPool.commonPool-worker-6,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
5:Thread[ForkJoinPool.commonPool-worker-13,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
3:Thread[ForkJoinPool.commonPool-worker-9,5,main] java.lang.ThreadGroup[name=main,maxpri=10]
10:Thread[ForkJoinPool.commonPool-worker-2,5,main] java.lang.ThreadGroup[name=main,maxpri=10]