一、线程池工具类Executors
Executors是juc包中用来快速创建线程池的工具类,可以调用这个类的静态方法来创建线程池,不过用这种方法创建的线程池的一些线程已经被配置好了,建议在逻辑比较简单的情况下使用。
创建的线程池对于不同的业务场景和逻辑复杂度,处理的速度是不一样的。
ExecutorService executorService1 = Executors.newCachedThreadPool();ExecutorService executorService2 = Executors.newFixedThreadPool(10);ExecutorService executorService3 = Executors.newSingleThreadExecutor();
1、newCachedThreadPool()
使用线程工具类创建的newCachedThreadPool线程池,指定的核心线程数是0,最大线程数是Integer的最大值(即全部是非核心线程),非核心线程的生命周期是60秒,阻塞队列使用SynchronousQueue,这个队列的容量为0,每个生产者都需要一个消费者线程来消费。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
2、newFixedThreadPool()
需要指定线程数量,核心线程数是指定的数量,也是最大的线程数量,所有没有非核心线程。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
3、newSingleThreadExecutor()
只有一个线程来处理任务。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
自带的工具类对于线程池的配置非常有限,例如LinkedBlockingQueue阻塞队列的容量等,所以大多数情况都不去使用工具类来创建线程池,除非是非常简单的任务逻辑。
通常使用new来创建线程池,可以根据业务场景来自行配置。
线程池的构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}
corePoolSize:核心线程数
maximumPoolSize:最大线程数(非核心线程数 = 最大线程数 - 核心线程数)
keepAliveTime:非核心线程空闲保活时间
unit:时间的单位
workQueue:阻塞队列,用来保存任务
handler:拒绝策略,当没有线程可执行任务且阻塞队列放满的情况下执行的逻辑
ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(10, 20, 0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());threadPoolExecutor.execute(() -> {System.out.println("111");});
二、线程池源码
在线程池提交任务时,首先分配给核心线程来执行任务,如果核心线程没有空闲,那么会先进入阻塞队列,当阻塞队列容量已满,还是没有核心线程来处理时,新提交的任务由非核心线程来处理,如果核心线程和非核心线程的总数到达最大线程数时,执行拒绝策略。
线程池的顶层接口是Executor,这个接口只定义了一个execute()方法,这个方法是没有返回值的。
public interface Executor {void execute(Runnable command);}
通常在线程池提交任务时,还有会使用submit()方法,是在其子接口ExecutorService中定义的,具体的实现实在其实现类中。<br /><br />在ThreadPoolExecutor的父类AbstractExecutorService类中实现的submit方法,其中也是调用了execute()方法。
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
线程池定义了一些状态。
RUNNING 状态是正常处理和接受任务的
SHUTDOWN 状态,不接受新的任务,已经接受的任务会继续处理
STOP 状态,不接受新的任务,也不会处理已经接受的任务
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
1、ThreadPoolExecutor.execute()
在addWorker()方法中,创建的worker对象是继承自AQS,实现了Runnable的类。
2、ScheduledThreadPoolExecutor
常用的定时线程池,周期性执行的线程器,底层使用的是优先级队列来保存,提交的任务先会保存到队列中。
ScheduledThreadPoolExecutor executor =(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5)// 延迟2秒,只会执行一次executor.schedule(task, 2, TimeUnit.SECONDS);// 周期性的执行,在每个任务执行完成后,再延迟2秒执行下一个任务executor.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS);// 周期性的执行,在每个任务执行完成后,直接执行下一个任务executor.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
scheduleAtFixedRate源码
使用延迟线程池需要注意,在run方法中要使用try-catch捕获异常,因为在线程池中,如果抛出异常,在调用任务的run方法的上一层就把任务吃掉了,并没有再抛出,所以在run方法中需要把异常处理掉。
// 在myTask封装的类ScheduledFutureTask中的run方法中,调用了其父类的方法// ScheduledFutureTask.super.runAndReset(),在父类中封装了callable对象// 其中封装了myTask,在调用callable的call()方法时,即myTask的run方法,如果// 抛出异常,则直接被catch后保存到了set中,所以需要直接在myTask的run方法中处理掉异常try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}
三、拒绝策略
在线程池的拒绝策略一共有4中实现
CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务。
AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务。
DiscardPolicy,直接抛弃任务,不做任何处理。
DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交。
在平时的使用中,可能以上四种实现都不能满足需求,所以可以重写拒绝策略,并当做参数传入,重写拒绝策略有两种方法。
直接写一个RejectedExecutionHandler拒绝策略的实现类
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {// 自己实现的拒绝策略}};// 把自定义的拒绝策略传入线程池ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),rejectedExecutionHandler);
或者直接写一个类,继承RejectedExecutionHandler来实现逻辑,并传入线程池
public class MonkeyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}}ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());
