Executor
ExecutorService
Executors-线程池的工厂
SingleThreadExecutor(ThreadPoolExecutor)
单线程的线程池-一个线程跑完,才到下一个线程
public class T07_SingleThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();for(int i=0; i<5; i++) {final int j = i;service.execute(()->{System.out.println(j + " " + Thread.currentThread().getName());});}}}// 结果/**0 pool-1-thread-11 pool-1-thread-12 pool-1-thread-13 pool-1-thread-14 pool-1-thread-1*/
CachedThreadPool(ThreadPoolExecutor)
来一个线程,如果线程池里线程都在忙,则增加一个线程,且线程不会自动销毁(一直存在于线程池)
Executors.newCachedThreadPool();public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
FixedThreadPool(ThreadPoolExecutor)
固定线程池中线程数量
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
并行计算
public class T09_FixedThreadPool {public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();getPrime(1, 200000);long end = System.currentTimeMillis();System.out.println(end - start);final int cpuCoreNum = 4;ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20MyTask t2 = new MyTask(80001, 130000);MyTask t3 = new MyTask(130001, 170000);MyTask t4 = new MyTask(170001, 200000);Future<List<Integer>> f1 = service.submit(t1);Future<List<Integer>> f2 = service.submit(t2);Future<List<Integer>> f3 = service.submit(t3);Future<List<Integer>> f4 = service.submit(t4);start = System.currentTimeMillis();f1.get();f2.get();f3.get();f4.get();end = System.currentTimeMillis();System.out.println(end - start);}static class MyTask implements Callable<List<Integer>> {int startPos, endPos;MyTask(int s, int e) {this.startPos = s;this.endPos = e;}@Overridepublic List<Integer> call() throws Exception {List<Integer> r = getPrime(startPos, endPos);return r;}}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;}static List<Integer> getPrime(int start, int end) {List<Integer> results = new ArrayList<>();for(int i=start; i<=end; i++) {if(isPrime(i)) results.add(i);}return results;}}// 结果/**用一个线程执行时间:3670并行执行时间:1412*/
ScheduledThreadPool(ThreadPoolExecutor)
定时任务线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
WorkStealingPool(ForkJoinPool)
每个线程维护一个单独的任务队列,当某一线程执行完时,回去其它线程’偷’任务。
push和pop不用加锁(因为都是一个线程在操作),pop(偷任务)需要加锁
Callable
类似Runnale,但是有返回值
public class T03_Callable {public static void main(String[] args) throws ExecutionException, InterruptedException {Callable<String> c = new Callable() {@Overridepublic String call() throws Exception {return "Hello Callable";}};ExecutorService service = Executors.newCachedThreadPool();Future<String> future = service.submit(c); //异步System.out.println(future.get());//阻塞service.shutdown();}}
Future
FutureTask -> Runnable + Future
CompletableFuture
ThreadPoolExecutor
构造参数的各个用处:开始创建的线程池中线程为0,来了一个任务才启动一个线程,如果核心线程 满了,则任务加入阻塞队列 ,如果阻塞队列也满了,则扩展线程 ,如果扩展线程数也到限制数了,则执行拒绝策略
| 参数 |
含义 |
|---|---|
| corePoolSize | 核心线程数,不会结束的 |
| MaximumPoolSize | 当线程不够用时,线程数最大扩展到这个值 |
| keepAliveTime | 扩展线程存活时间 |
| TimeUnit unit | 存活时间单位 |
| BlockingQueue |
任务队列 |
| ThreadFactory threadFactory | 线程工厂 |
| RejectedExecutionHandler handler | 拒绝策略 |
RejectedExecutionHandler handler
拒绝策略
JDK默认提供四种拒绝策略
AbortPolicy 异常
DiscardPolicy 扔掉,不抛异常
DiscardOldestPolicy 扔掉排队时间最旧的
CallerRunsPolicy 当前调用者处理任务
ForkJoinPool
原理图解


分解汇总的任务
可以用很少的线程执行很多任务(子任务),ThreadPoolExecutor做不到
CPU密集型
