一.线程保活和销毁
创建线程
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
// 执行Worker的run方法
t.start();
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
return t;
}
Worker的run方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
// firstTask是execute方法的入参
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// 线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut设置为true,核心线程可销毁
// wc > corePoolSize,超出核心线程数的部分
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 长时间空闲返回null,线程销毁
if (timed && timedOut) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 没有数据返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 阻塞直到有数据
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
二.DiscardOldestPolicy
丢弃队列最前面的任务,然后重新提交被拒绝的任务。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 删除队首元素
e.getQueue().poll();
e.execute(r);
}
}
}
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 可能执行到这,队首元素被删了,可以入队了
if (isRunning(c) && workQueue.offer(command)) {
}
else if (!addWorker(command, false))
reject(command);
}
�三.自带线程池
1.newSingleThreadExecutor
单线程,所有任务的执行顺序按照任务的提交顺序执行。
public class Test {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(index);
Thread.sleep(2000);
}
});
}
}
}
输出:
0
1
2
3
4
5
6
7
8
9
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2.newFixedThreadPool
定长线程池
public class Test {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(index + ":" + System.currentTimeMillis());
Thread.sleep(3000);
}
});
}
}
}
输出:
0:1643030738793
3:1643030738793
1:1643030738793
2:1643030738793
4:1643030738793
5:1643030741795
6:1643030741795
9:1643030741795
8:1643030741795
7:1643030741795
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3.newCachedThreadPool
可缓存的线程池,线程可用时重用线程,否则创建新的线程。
public class Test {
private static AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
Thread.sleep(100L);
executorService.submit(Test::print);
}
}
public static void print() {
System.out.println(Thread.currentThread().getName());
System.out.println(atomicInteger.incrementAndGet());
}
}
输出:
pool-1-thread-1
1
pool-1-thread-1
2
pool-1-thread-1
3
pool-1-thread-1
4
pool-1-thread-1
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 入队成功,复用线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 入队不成功,新建线程
else if (!addWorker(command, false))
reject(command);
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) {
if (timed && nanos <= 0) {
// 没有线程可用
return null;
}
} else if (!isFulfilling(h.mode)) {
// 有线程可用
if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
SNode m = s.next;
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
}
}
}
}
4.newScheduledThreadPool
支持定时及周期性任务执行。
// 延迟3秒执行
public class Test {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
System.out.println(System.currentTimeMillis());
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("********************");
System.out.println(System.currentTimeMillis());
}
}, 3, TimeUnit.SECONDS);
}
}
输出:
1643101162540
********************
1643101165544
public class Test {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
System.out.println(System.currentTimeMillis());
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("**************");
System.out.println(System.currentTimeMillis());
}
}, 1, 3, TimeUnit.SECONDS);
}
}
输出:
1643101766597
**************
1643101767603
**************
1643101770600
**************
1643101773602
**************
1643101776600
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//triggerTime(initialDelay, unit):当前时间+initialDelay,纳秒
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
super.getQueue().add(task);
ensurePrestart();
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
// private static final int INITIAL_CAPACITY = 16;
// private RunnableScheduledFuture<?>[] queue =
// new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 小根堆
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
ScheduledFutureTask.run
public void run() {
// 执行run方法**************
if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
ensurePrestart();
}
}