Executor

用来线程的执行

ExecutorService

线程的生命周期 列全了

Executors-线程池的工厂

用来产生各种各样的线程池

SingleThreadExecutor(ThreadPoolExecutor)

单线程的线程池-一个线程跑完,才到下一个线程

  1. public class T07_SingleThreadPool {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newSingleThreadExecutor();
  4. for(int i=0; i<5; i++) {
  5. final int j = i;
  6. service.execute(()->{
  7. System.out.println(j + " " + Thread.currentThread().getName());
  8. });
  9. }
  10. }
  11. }
  12. // 结果
  13. /**
  14. 0 pool-1-thread-1
  15. 1 pool-1-thread-1
  16. 2 pool-1-thread-1
  17. 3 pool-1-thread-1
  18. 4 pool-1-thread-1
  19. */

CachedThreadPool(ThreadPoolExecutor)

来一个线程,如果线程池里线程都在忙,则增加一个线程,且线程不会自动销毁(一直存在于线程池)

  1. Executors.newCachedThreadPool();
  2. public static ExecutorService newCachedThreadPool() {
  3. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  4. 60L, TimeUnit.SECONDS,
  5. new SynchronousQueue<Runnable>());
  6. }

FixedThreadPool(ThreadPoolExecutor)

固定线程池中线程数量

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

并行计算

  1. public class T09_FixedThreadPool {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. long start = System.currentTimeMillis();
  4. getPrime(1, 200000);
  5. long end = System.currentTimeMillis();
  6. System.out.println(end - start);
  7. final int cpuCoreNum = 4;
  8. ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
  9. MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
  10. MyTask t2 = new MyTask(80001, 130000);
  11. MyTask t3 = new MyTask(130001, 170000);
  12. MyTask t4 = new MyTask(170001, 200000);
  13. Future<List<Integer>> f1 = service.submit(t1);
  14. Future<List<Integer>> f2 = service.submit(t2);
  15. Future<List<Integer>> f3 = service.submit(t3);
  16. Future<List<Integer>> f4 = service.submit(t4);
  17. start = System.currentTimeMillis();
  18. f1.get();
  19. f2.get();
  20. f3.get();
  21. f4.get();
  22. end = System.currentTimeMillis();
  23. System.out.println(end - start);
  24. }
  25. static class MyTask implements Callable<List<Integer>> {
  26. int startPos, endPos;
  27. MyTask(int s, int e) {
  28. this.startPos = s;
  29. this.endPos = e;
  30. }
  31. @Override
  32. public List<Integer> call() throws Exception {
  33. List<Integer> r = getPrime(startPos, endPos);
  34. return r;
  35. }
  36. }
  37. static boolean isPrime(int num) {
  38. for(int i=2; i<=num/2; i++) {
  39. if(num % i == 0) return false;
  40. }
  41. return true;
  42. }
  43. static List<Integer> getPrime(int start, int end) {
  44. List<Integer> results = new ArrayList<>();
  45. for(int i=start; i<=end; i++) {
  46. if(isPrime(i)) results.add(i);
  47. }
  48. return results;
  49. }
  50. }
  51. // 结果
  52. /**
  53. 用一个线程执行时间:3670
  54. 并行执行时间:1412
  55. */

ScheduledThreadPool(ThreadPoolExecutor)

定时任务线程池

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

WorkStealingPool(ForkJoinPool)

每个线程维护一个单独的任务队列,当某一线程执行完时,回去其它线程’偷’任务。
push和pop不用加锁(因为都是一个线程在操作),pop(偷任务)需要加锁

Callable

类似Runnale,但是有返回值

  1. public class T03_Callable {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Callable<String> c = new Callable() {
  4. @Override
  5. public String call() throws Exception {
  6. return "Hello Callable";
  7. }
  8. };
  9. ExecutorService service = Executors.newCachedThreadPool();
  10. Future<String> future = service.submit(c); //异步
  11. System.out.println(future.get());//阻塞
  12. service.shutdown();
  13. }
  14. }

Future

存储执行的,将来才会有结果

FutureTask -> Runnable + Future

CompletableFuture

类似Promise.all()

ThreadPoolExecutor

构造参数的各个用处:开始创建的线程池中线程为0,来了一个任务才启动一个线程,如果核心线程 满了,则任务加入阻塞队列 ,如果阻塞队列也满了,则扩展线程 ,如果扩展线程数也到限制数了,则执行拒绝策略

参数
含义
corePoolSize 核心线程数,不会结束的
MaximumPoolSize 当线程不够用时,线程数最大扩展到这个值
keepAliveTime 扩展线程存活时间
TimeUnit unit 存活时间单位
BlockingQueue bq 任务队列
ThreadFactory threadFactory 线程工厂
RejectedExecutionHandler handler 拒绝策略

RejectedExecutionHandler handler

拒绝策略
JDK默认提供四种拒绝策略
AbortPolicy 异常
DiscardPolicy 扔掉,不抛异常
DiscardOldestPolicy 扔掉排队时间最旧的
CallerRunsPolicy 当前调用者处理任务

ForkJoinPool

每个线程有自己的队列

原理图解

选区_115.png
图片.png
分解汇总的任务
可以用很少的线程执行很多任务(子任务),ThreadPoolExecutor做不到
CPU密集型

parallelStream