一.线程保活和销毁
创建线程
Worker w = null;w = new Worker(firstTask);final Thread t = w.thread;// 执行Worker的run方法t.start();Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);return t;}
Worker的run方法
public void run() {runWorker(this);}final void runWorker(Worker w) {// firstTask是execute方法的入参Runnable task = w.firstTask;w.firstTask = null;while (task != null || (task = getTask()) != null) {try {task.run();} finally {task = null;}}}private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();// 线程数int wc = workerCountOf(c);// allowCoreThreadTimeOut设置为true,核心线程可销毁// wc > corePoolSize,超出核心线程数的部分boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 长时间空闲返回null,线程销毁if (timed && timedOut) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?// 没有数据返回nullworkQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 阻塞直到有数据workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
二.DiscardOldestPolicy
丢弃队列最前面的任务,然后重新提交被拒绝的任务。
final void reject(Runnable command) {handler.rejectedExecution(command, this);}public static class DiscardOldestPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {// 删除队首元素e.getQueue().poll();e.execute(r);}}}public void execute(Runnable command) {int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 可能执行到这,队首元素被删了,可以入队了if (isRunning(c) && workQueue.offer(command)) {}else if (!addWorker(command, false))reject(command);}
�三.自带线程池
1.newSingleThreadExecutor
单线程,所有任务的执行顺序按照任务的提交顺序执行。
public class Test {public static void main(String[] args) {ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;singleThreadExecutor.execute(new Runnable() {@SneakyThrows@Overridepublic void run() {System.out.println(index);Thread.sleep(2000);}});}}}输出:0123456789
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
2.newFixedThreadPool
定长线程池
public class Test {public static void main(String[] args) {ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);for (int i = 0; i < 10; i++) {final int index = i;fixedThreadPool.execute(new Runnable() {@SneakyThrows@Overridepublic void run() {System.out.println(index + ":" + System.currentTimeMillis());Thread.sleep(3000);}});}}}输出:0:16430307387933:16430307387931:16430307387932:16430307387934:16430307387935:16430307417956:16430307417959:16430307417958:16430307417957:1643030741795
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
3.newCachedThreadPool
可缓存的线程池,线程可用时重用线程,否则创建新的线程。
public class Test {private static AtomicInteger atomicInteger = new AtomicInteger();public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < 5; i++) {Thread.sleep(100L);executorService.submit(Test::print);}}public static void print() {System.out.println(Thread.currentThread().getName());System.out.println(atomicInteger.incrementAndGet());}}输出:pool-1-thread-11pool-1-thread-12pool-1-thread-13pool-1-thread-14pool-1-thread-15
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public void execute(Runnable command) {int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 入队成功,复用线程if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 入队不成功,新建线程else if (!addWorker(command, false))reject(command);}public boolean offer(E e) {if (e == null) throw new NullPointerException();return transferer.transfer(e, true, 0) != null;}E transfer(E e, boolean timed, long nanos) {SNode s = null;int mode = (e == null) ? REQUEST : DATA;for (;;) {SNode h = head;if (h == null || h.mode == mode) {if (timed && nanos <= 0) {// 没有线程可用return null;}} else if (!isFulfilling(h.mode)) {// 有线程可用if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) {SNode m = s.next;SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn);return (E) ((mode == REQUEST) ? m.item : s.item);}}}}}}
4.newScheduledThreadPool
支持定时及周期性任务执行。
// 延迟3秒执行public class Test {public static void main(String[] args) throws InterruptedException {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);System.out.println(System.currentTimeMillis());scheduledThreadPool.schedule(new Runnable() {@Overridepublic void run() {System.out.println("********************");System.out.println(System.currentTimeMillis());}}, 3, TimeUnit.SECONDS);}}输出:1643101162540********************1643101165544
public class Test {public static void main(String[] args) throws InterruptedException {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);System.out.println(System.currentTimeMillis());scheduledThreadPool.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("**************");System.out.println(System.currentTimeMillis());}}, 1, 3, TimeUnit.SECONDS);}}输出:1643101766597**************1643101767603**************1643101770600**************1643101773602**************1643101776600
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {//triggerTime(initialDelay, unit):当前时间+initialDelay,纳秒ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;}protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}private void delayedExecute(RunnableScheduledFuture<?> task) {super.getQueue().add(task);ensurePrestart();}void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}// private static final int INITIAL_CAPACITY = 16;// private RunnableScheduledFuture<?>[] queue =// new RunnableScheduledFuture<?>[INITIAL_CAPACITY];// 小根堆public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;}
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null;if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}
ScheduledFutureTask.run
public void run() {// 执行run方法**************if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {super.getQueue().add(task);ensurePrestart();}}
