3)newFixedThreadPool

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

特点

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

    评价 适用于任务量已知,相对耗时的任务

4)newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
    • 全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货) ```java SynchronousQueue integers = new SynchronousQueue<>(); new Thread(() -> { try {

    1. log.debug("putting {} ", 1);
    2. integers.put(1);
    3. log.debug("{} putted...", 1);
    4. log.debug("putting...{} ", 2);
    5. integers.put(2);
    6. log.debug("{} putted...", 2);

    } catch (InterruptedException e) {

    1. e.printStackTrace();

    } },”t1”).start();

sleep(1);

new Thread(() -> { try { log.debug(“taking {}”, 1); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },”t2”).start();

sleep(1);

new Thread(() -> { try { log.debug(“taking {}”, 2); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },”t3”).start();

  1. 输出
  2. ```java
  3. 11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
  4. 11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
  5. 11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
  6. 11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
  7. 11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
  8. 11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

5)newSingleTthreadPool

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一
    个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因
      此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

      6)提交任务

      ```java // 执行任务 void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果

Future submit(Callable task);

// 提交 tasks 中所有任务

List> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间

List> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间

T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

  1. <a name="sghOp"></a>
  2. #### 7)关闭线程池
  3. **shutdown**
  4. ```java
  5. /*
  6. 线程池状态变为 SHUTDOWN
  7. - 不会接收新任务
  8. - 但已提交任务会执行完
  9. - 此方法不会阻塞调用线程的执行
  10. */
  11. void shutdown();
  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. // 修改线程池状态
  7. advanceRunState(SHUTDOWN);
  8. // 仅会打断空闲线程
  9. interruptIdleWorkers();
  10. onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
  11. } finally {
  12. mainLock.unlock();
  13. }
  14. // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
  15. tryTerminate();
  16. }

shutdownNow

  1. /*
  2. 线程池状态变为 STOP
  3. - 不会接收新任务
  4. - 会将队列中的任务返回
  5. - 并用 interrupt 的方式中断正在执行的任务
  6. */
  7. List<Runnable> shutdownNow();
  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. // 修改线程池状态
  8. advanceRunState(STOP);
  9. // 打断所有线程
  10. interruptWorkers();
  11. // 获取队列中剩余任务
  12. tasks = drainQueue();
  13. } finally {
  14. mainLock.unlock();
  15. }
  16. // 尝试终结
  17. tryTerminate();
  18. return tasks;
  19. }

其它方法

  1. // 不在 RUNNING 状态的线程池,此方法就返回 true
  2. boolean isShutdown();
  3. // 线程池状态是否是 TERMINATED
  4. boolean isTerminated();
  5. // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
  6. 情,可以利用此方法等待
  7. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

模式之Worker Thread

8)任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

  1. public static void main(String[] args) {
  2. Timer timer = new Timer();
  3. TimerTask task1 = new TimerTask() {
  4. @Override
  5. public void run() {
  6. log.debug("task 1");
  7. sleep(2);
  8. }
  9. };
  10. TimerTask task2 = new TimerTask() {
  11. @Override
  12. public void run() {
  13. log.debug("task 2");
  14. }
  15. };
  16. // 使用 timer 添加两个任务,希望它们都在 1s 后执行
  17. // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
  18. timer.schedule(task1, 1000);
  19. timer.schedule(task2, 1000);
  20. }

输出

  1. 20:46:09.444 c.TestTimer [main] - start...
  2. 20:46:10.447 c.TestTimer [Timer-0] - task 1
  3. 20:46:12.448 c.TestTimer [Timer-0] - task 2

使用 ScheduledExecutorService 改写:

  1. ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  2. // 添加两个任务,希望它们都在 1s 后执行
  3. executor.schedule(() -> {
  4. System.out.println("任务1,执行时间:" + new Date());
  5. try { Thread.sleep(2000); } catch (InterruptedException e) { }
  6. }, 1000, TimeUnit.MILLISECONDS);
  7. executor.schedule(() -> {
  8. System.out.println("任务2,执行时间:" + new Date());
  9. }, 1000, TimeUnit.MILLISECONDS);

输出

  1. 任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
  2. 任务2,执行时间:Thu Jan 03 12:45:17 CST 2019

scheduleAtFixedRate 例子:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleAtFixedRate(() -> {
  4. log.debug("running...");
  5. }, 1, 1, TimeUnit.SECONDS);

输出

  1. 21:45:43.167 c.TestTimer [main] - start...
  2. 21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
  3. 21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
  4. 21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
  5. 21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleAtFixedRate(() -> {
  4. log.debug("running...");
  5. sleep(2);
  6. }, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

  1. 21:44:30.311 c.TestTimer [main] - start...
  2. 21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
  3. 21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
  4. 21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
  5. 21:44:37.362 c.TestTimer [pool-1-thread-1] - running...

scheduleWithFixedDelay 例子:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
  2. log.debug("start...");
  3. pool.scheduleWithFixedDelay(()-> {
  4. log.debug("running...");
  5. sleep(2);
  6. }, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s

  1. 21:40:55.078 c.TestTimer [main] - start...
  2. 21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
  3. 21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
  4. 21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
  5. 21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线
程也不会被释放。用来执行延迟或反复执行的任务

9)正确处理执行任务异常

方法1:主动捉异常

  1. ExecutorService pool = Executors.newFixedThreadPool(1);
  2. pool.submit(() -> {
  3. try {
  4. log.debug("task1");
  5. int i = 1 / 0;
  6. } catch (Exception e) {
  7. log.error("error:", e);
  8. }
  9. });

输出

  1. 21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
  2. 21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
  3. java.lang.ArithmeticException: / by zero
  4. at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
  5. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  6. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  7. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  8. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  9. at java.lang.Thread.run(Thread.java:748)

方法2:使用 Future

  1. ExecutorService pool = Executors.newFixedThreadPool(1);
  2. Future<Boolean> f = pool.submit(() -> {
  3. log.debug("task1");
  4. int i = 1 / 0;
  5. return true;
  6. });
  7. log.debug("result:{}", f.get());

输出

  1. 21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
  2. Exception in thread "main" java.util.concurrent.ExecutionException:
  3. java.lang.ArithmeticException: / by zero
  4. at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  5. at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  6. at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
  7. Caused by: java.lang.ArithmeticException: / by zero
  8. at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
  9. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  12. at java.lang.Thread.run(Thread.java:748)

应用之定时任务

10)Tomcat线程池

Tomcat 在哪里用到了线程池呢

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize
    • 这时不会立刻抛 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

  1. public void execute(Runnable command, long timeout, TimeUnit unit) {
  2. submittedCount.incrementAndGet();
  3. try {
  4. super.execute(command);
  5. } catch (RejectedExecutionException rx) {
  6. if (super.getQueue() instanceof TaskQueue) {
  7. final TaskQueue queue = (TaskQueue)super.getQueue();
  8. try {
  9. if (!queue.force(command, timeout, unit)) {
  10. submittedCount.decrementAndGet();
  11. throw new RejectedExecutionException("Queue capacity is full.");
  12. }
  13. } catch (InterruptedException x) {
  14. submittedCount.decrementAndGet();
  15. Thread.interrupted();
  16. throw new RejectedExecutionException(x);
  17. }
  18. } else {
  19. submittedCount.decrementAndGet();
  20. throw rx;
  21. }
  22. }
  23. }

TaskQueue.java

  1. public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
  2. if ( parent.isShutdown() )
  3. throw new RejectedExecutionException(
  4. "Executor not running, can't force a command into the queue"
  5. );
  6. return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
  7. is rejected
  8. }

Connector 配置
image.png
Executor 线程配置
image.png
image.png

3. Fork/Join

1)概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

2)使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

  1. @Slf4j(topic = "c.AddTask")
  2. class AddTask1 extends RecursiveTask<Integer> {
  3. int n;
  4. public AddTask1(int n) {
  5. this.n = n;
  6. }
  7. @Override
  8. public String toString() {
  9. return "{" + n + '}';
  10. }
  11. @Override
  12. protected Integer compute() {
  13. // 如果 n 已经为 1,可以求得结果了
  14. if (n == 1) {
  15. log.debug("join() {}", n);
  16. return n;
  17. }
  18. // 将任务进行拆分(fork)
  19. AddTask1 t1 = new AddTask1(n - 1);
  20. t1.fork();
  21. log.debug("fork() {} + {}", n, t1);
  22. // 合并(join)结果
  23. int result = n + t1.join();
  24. log.debug("join() {} + {} = {}", n, t1, result);
  25. return result;
  26. }
  27. }

然后提交给 ForkJoinPool 来执行

  1. public static void main(String[] args) {
  2. ForkJoinPool pool = new ForkJoinPool(4);
  3. System.out.println(pool.invoke(new AddTask1(5)));
  4. }

结果

  1. [ForkJoinPool-1-worker-0] - fork() 2 + {1}
  2. [ForkJoinPool-1-worker-1] - fork() 5 + {4}
  3. [ForkJoinPool-1-worker-0] - join() 1
  4. [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
  5. [ForkJoinPool-1-worker-2] - fork() 4 + {3}
  6. [ForkJoinPool-1-worker-3] - fork() 3 + {2}
  7. [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
  8. [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
  9. [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
  10. 15

用图来表示
image.png
改进

  1. class AddTask3 extends RecursiveTask<Integer> {
  2. int begin;
  3. int end;
  4. public AddTask3(int begin, int end) {
  5. this.begin = begin;
  6. this.end = end;
  7. }
  8. @Override
  9. public String toString() {
  10. return "{" + begin + "," + end + '}';
  11. }
  12. @Override
  13. protected Integer compute() {
  14. // 5, 5
  15. if (begin == end) {
  16. log.debug("join() {}", begin);
  17. return begin;
  18. }
  19. // 4, 5
  20. if (end - begin == 1) {
  21. log.debug("join() {} + {} = {}", begin, end, end + begin);
  22. return end + begin;
  23. }
  24. // 1 5
  25. int mid = (end + begin) / 2; // 3
  26. AddTask3 t1 = new AddTask3(begin, mid); // 1,3
  27. t1.fork();
  28. AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
  29. t2.fork();
  30. log.debug("fork() {} + {} = ?", t1, t2);
  31. int result = t1.join() + t2.join();
  32. log.debug("join() {} + {} = {}", t1, t2, result);
  33. return result;
  34. }
  35. }

然后提交给 ForkJoinPool 来执行

  1. public static void main(String[] args) {
  2. ForkJoinPool pool = new ForkJoinPool(4);
  3. System.out.println(pool.invoke(new AddTask3(1, 10)));
  4. }

结果

  1. [ForkJoinPool-1-worker-0] - join() 1 + 2 = 3
  2. [ForkJoinPool-1-worker-3] - join() 4 + 5 = 9
  3. [ForkJoinPool-1-worker-0] - join() 3
  4. [ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?
  5. [ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?
  6. [ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6
  7. [ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15
  8. 15

用图来表示
image.png

8.3 J.U.C

1. AQS原理