如何确定线程池线程数

https://zhuanlan.zhihu.com/p/306732526

三种任务类型:

  • cpu 密集型:占用大量 cpu 资源,典型的如从 1 累加到 1000 亿;
  • io 密集型:不太占用 cpu 资源,大部分时间在阻塞等待 io 返回;
  • 混合型:既有大量耗 cpu 操作,又有 io 操作。


cpu 密集型**
对于 cpu 密集型任务,一个 cpu 核心一个线程即可,继续增加线程只会增加上下文切换,不会带来任何性能提升。

io 密集型
对于 io 密集型任务,大部分时间耗在阻塞等待 io 操作返回,这段时间不占用 cpu,所以可以增加线程数,把这部分 cpu 利用起来。

对于 io 密集型任务,最大 cpu 利用率的线程数与 io 操作耗时有关。理想情况,io 操作阻塞期间,所有线程可以切换一遍。如,io 操作耗时 2 ms,线程上下文切换耗时 2 us,那么设置线程数为 cpu 核心数 * 1000(实际上可能会小很多,如果考虑非 io 操作耗时和其它线程的影响,以及进程间切换耗时长得多等因素),可以获得最大 cpu 利用率。如果少了,所有线程轮一遍,io 还没有完成,cpu 空闲;如果多了,cpu 忙不过来,没有意义。

混合型
对于混合型任务,如:

  1. private static class MixedTask implements Callable<Integer> {
  2. private int taskNo;
  3. public MixedTask(int taskNo) {
  4. this.taskNo = taskNo;
  5. }
  6. @Override
  7. public Integer call() {
  8. //cpu 操作
  9. int result = cpuOperation(taskNo);
  10. //io 操作
  11. mockIoOperation(100, TimeUnit.MILLISECONDS);
  12. return result;
  13. }
  14. }

如果 cpu 操作和 io 操作耗时相差不多,可以考虑把任务拆成两部分,io 操作和 cpu 操作分别提交到不同的线程池执行。实际上相当于异步 io 操作。
如果两者耗时相差很大,如 cpu 操作耗时极短,io 操作相对耗时很长,拆分任务并不能获得明显的效率提升,就显得费力不讨好,这种情况可以把整个任务当作 io 操作处理。

ListenableFuture

Java中有几种实现异步的方式(FutureTask/ListenableFuture/CompletableFuture)
这篇介绍的是ListenableFuture,相比FutureTask,本质上只是增加了任务的回调函数,这个是google框架里面的一个东西。

详见:

  1. public class ListenableFutureTest {
  2. public static void main(String[] args) {
  3. testListenFuture();
  4. }
  5. public static void testListenFuture() {
  6. System.out.println("主线程start");
  7. ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
  8. Task task1 = new Task();
  9. task1.args = "task1";
  10. Task task2 = new Task();
  11. task2.args = "task2";
  12. ListenableFuture<String> future = pool.submit(task1);
  13. ListenableFuture<String> future2 = pool.submit(task2);
  14. future2.addListener(() -> System.out.println("addListener 不能带返回值"), pool);
  15. /**
  16. * FutureCallBack接口可以对每个任务的成功或失败单独做出响应
  17. */
  18. FutureCallback<String> futureCallback = new FutureCallback<String>() {
  19. @Override
  20. public void onSuccess(String result) {
  21. System.out.println("Futures.addCallback 能带返回值:" + result);
  22. }
  23. @Override
  24. public void onFailure(Throwable t) {
  25. System.out.println("出错,业务回滚或补偿");
  26. }
  27. };
  28. //为任务绑定回调接口
  29. Futures.addCallback(future, futureCallback, pool);
  30. System.out.println("主线程end");
  31. }
  32. }
  33. class Task implements Callable<String> {
  34. String args;
  35. @Override
  36. public String call() throws Exception {
  37. Thread.sleep(1000);
  38. System.out.println("任务:" + args);
  39. return "dong";
  40. }
  41. }
  42. ————————————————
  43. 版权声明:本文为CSDN博主「帅东」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
  44. 原文链接:https://blog.csdn.net/PROGRAM_anywhere/java/article/details/83552126

Futures.transformAsync

个人理解:当上一个Future执行完成后,执行特定的方法,以上一个Future的结果为输入做下一步处理

https://juejin.im/post/5cb48bcd6fb9a0687015c9c7
image.png