一.线程保活和销毁

创建线程

  1. Worker w = null;
  2. w = new Worker(firstTask);
  3. final Thread t = w.thread;
  4. // 执行Worker的run方法
  5. t.start();
  6. Worker(Runnable firstTask) {
  7. this.firstTask = firstTask;
  8. this.thread = getThreadFactory().newThread(this);
  9. }
  10. public Thread newThread(Runnable r) {
  11. Thread t = new Thread(group, r,
  12. namePrefix + threadNumber.getAndIncrement(),
  13. 0);
  14. return t;
  15. }

Worker的run方法

  1. public void run() {
  2. runWorker(this);
  3. }
  4. final void runWorker(Worker w) {
  5. // firstTask是execute方法的入参
  6. Runnable task = w.firstTask;
  7. w.firstTask = null;
  8. while (task != null || (task = getTask()) != null) {
  9. try {
  10. task.run();
  11. } finally {
  12. task = null;
  13. }
  14. }
  15. }
  16. private Runnable getTask() {
  17. boolean timedOut = false;
  18. for (;;) {
  19. int c = ctl.get();
  20. // 线程数
  21. int wc = workerCountOf(c);
  22. // allowCoreThreadTimeOut设置为true,核心线程可销毁
  23. // wc > corePoolSize,超出核心线程数的部分
  24. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  25. // 长时间空闲返回null,线程销毁
  26. if (timed && timedOut) {
  27. if (compareAndDecrementWorkerCount(c))
  28. return null;
  29. continue;
  30. }
  31. try {
  32. Runnable r = timed ?
  33. // 没有数据返回null
  34. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  35. // 阻塞直到有数据
  36. workQueue.take();
  37. if (r != null)
  38. return r;
  39. timedOut = true;
  40. } catch (InterruptedException retry) {
  41. timedOut = false;
  42. }
  43. }
  44. }

二.DiscardOldestPolicy

丢弃队列最前面的任务,然后重新提交被拒绝的任务。

  1. final void reject(Runnable command) {
  2. handler.rejectedExecution(command, this);
  3. }
  4. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  5. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  6. if (!e.isShutdown()) {
  7. // 删除队首元素
  8. e.getQueue().poll();
  9. e.execute(r);
  10. }
  11. }
  12. }
  13. public void execute(Runnable command) {
  14. int c = ctl.get();
  15. if (workerCountOf(c) < corePoolSize) {
  16. if (addWorker(command, true))
  17. return;
  18. c = ctl.get();
  19. }
  20. // 可能执行到这,队首元素被删了,可以入队了
  21. if (isRunning(c) && workQueue.offer(command)) {
  22. }
  23. else if (!addWorker(command, false))
  24. reject(command);
  25. }

�三.自带线程池

1.newSingleThreadExecutor
单线程,所有任务的执行顺序按照任务的提交顺序执行。

  1. public class Test {
  2. public static void main(String[] args) {
  3. ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  4. for (int i = 0; i < 10; i++) {
  5. final int index = i;
  6. singleThreadExecutor.execute(new Runnable() {
  7. @SneakyThrows
  8. @Override
  9. public void run() {
  10. System.out.println(index);
  11. Thread.sleep(2000);
  12. }
  13. });
  14. }
  15. }
  16. }
  17. 输出:
  18. 0
  19. 1
  20. 2
  21. 3
  22. 4
  23. 5
  24. 6
  25. 7
  26. 8
  27. 9
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

2.newFixedThreadPool
定长线程池

  1. public class Test {
  2. public static void main(String[] args) {
  3. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
  4. for (int i = 0; i < 10; i++) {
  5. final int index = i;
  6. fixedThreadPool.execute(new Runnable() {
  7. @SneakyThrows
  8. @Override
  9. public void run() {
  10. System.out.println(index + ":" + System.currentTimeMillis());
  11. Thread.sleep(3000);
  12. }
  13. });
  14. }
  15. }
  16. }
  17. 输出:
  18. 0:1643030738793
  19. 3:1643030738793
  20. 1:1643030738793
  21. 2:1643030738793
  22. 4:1643030738793
  23. 5:1643030741795
  24. 6:1643030741795
  25. 9:1643030741795
  26. 8:1643030741795
  27. 7:1643030741795
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

3.newCachedThreadPool
可缓存的线程池,线程可用时重用线程,否则创建新的线程。

  1. public class Test {
  2. private static AtomicInteger atomicInteger = new AtomicInteger();
  3. public static void main(String[] args) throws InterruptedException {
  4. ExecutorService executorService = Executors.newCachedThreadPool();
  5. for (int i = 0; i < 5; i++) {
  6. Thread.sleep(100L);
  7. executorService.submit(Test::print);
  8. }
  9. }
  10. public static void print() {
  11. System.out.println(Thread.currentThread().getName());
  12. System.out.println(atomicInteger.incrementAndGet());
  13. }
  14. }
  15. 输出:
  16. pool-1-thread-1
  17. 1
  18. pool-1-thread-1
  19. 2
  20. pool-1-thread-1
  21. 3
  22. pool-1-thread-1
  23. 4
  24. pool-1-thread-1
  25. 5
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  6. public void execute(Runnable command) {
  7. int c = ctl.get();
  8. if (workerCountOf(c) < corePoolSize) {
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. // 入队成功,复用线程
  14. if (isRunning(c) && workQueue.offer(command)) {
  15. int recheck = ctl.get();
  16. if (! isRunning(recheck) && remove(command))
  17. reject(command);
  18. else if (workerCountOf(recheck) == 0)
  19. addWorker(null, false);
  20. }
  21. // 入队不成功,新建线程
  22. else if (!addWorker(command, false))
  23. reject(command);
  24. }
  25. public boolean offer(E e) {
  26. if (e == null) throw new NullPointerException();
  27. return transferer.transfer(e, true, 0) != null;
  28. }
  29. E transfer(E e, boolean timed, long nanos) {
  30. SNode s = null;
  31. int mode = (e == null) ? REQUEST : DATA;
  32. for (;;) {
  33. SNode h = head;
  34. if (h == null || h.mode == mode) {
  35. if (timed && nanos <= 0) {
  36. // 没有线程可用
  37. return null;
  38. }
  39. } else if (!isFulfilling(h.mode)) {
  40. // 有线程可用
  41. if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
  42. for (;;) {
  43. SNode m = s.next;
  44. SNode mn = m.next;
  45. if (m.tryMatch(s)) {
  46. casHead(s, mn);
  47. return (E) ((mode == REQUEST) ? m.item : s.item);
  48. }
  49. }
  50. }
  51. }
  52. }
  53. }

4.newScheduledThreadPool
支持定时及周期性任务执行。

  1. // 延迟3秒执行
  2. public class Test {
  3. public static void main(String[] args) throws InterruptedException {
  4. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  5. System.out.println(System.currentTimeMillis());
  6. scheduledThreadPool.schedule(new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println("********************");
  10. System.out.println(System.currentTimeMillis());
  11. }
  12. }, 3, TimeUnit.SECONDS);
  13. }
  14. }
  15. 输出:
  16. 1643101162540
  17. ********************
  18. 1643101165544
  1. public class Test {
  2. public static void main(String[] args) throws InterruptedException {
  3. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  4. System.out.println(System.currentTimeMillis());
  5. scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
  6. @Override
  7. public void run() {
  8. System.out.println("**************");
  9. System.out.println(System.currentTimeMillis());
  10. }
  11. }, 1, 3, TimeUnit.SECONDS);
  12. }
  13. }
  14. 输出:
  15. 1643101766597
  16. **************
  17. 1643101767603
  18. **************
  19. 1643101770600
  20. **************
  21. 1643101773602
  22. **************
  23. 1643101776600
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  6. new DelayedWorkQueue());
  7. }
  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit) {
  5. //triggerTime(initialDelay, unit):当前时间+initialDelay,纳秒
  6. ScheduledFutureTask<Void> sft =
  7. new ScheduledFutureTask<Void>(command,
  8. null,
  9. triggerTime(initialDelay, unit),
  10. unit.toNanos(period));
  11. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  12. sft.outerTask = t;
  13. delayedExecute(t);
  14. return t;
  15. }
  16. ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  17. super(r, result);
  18. this.time = ns;
  19. this.period = period;
  20. }
  21. protected <V> RunnableScheduledFuture<V> decorateTask(
  22. Runnable runnable, RunnableScheduledFuture<V> task) {
  23. return task;
  24. }
  25. private void delayedExecute(RunnableScheduledFuture<?> task) {
  26. super.getQueue().add(task);
  27. ensurePrestart();
  28. }
  29. void ensurePrestart() {
  30. int wc = workerCountOf(ctl.get());
  31. if (wc < corePoolSize)
  32. addWorker(null, true);
  33. else if (wc == 0)
  34. addWorker(null, false);
  35. }
  36. // private static final int INITIAL_CAPACITY = 16;
  37. // private RunnableScheduledFuture<?>[] queue =
  38. // new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  39. // 小根堆
  40. public boolean offer(Runnable x) {
  41. if (x == null)
  42. throw new NullPointerException();
  43. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  44. final ReentrantLock lock = this.lock;
  45. lock.lock();
  46. try {
  47. int i = size;
  48. if (i >= queue.length)
  49. grow();
  50. size = i + 1;
  51. if (i == 0) {
  52. queue[0] = e;
  53. setIndex(e, 0);
  54. } else {
  55. siftUp(i, e);
  56. }
  57. if (queue[0] == e) {
  58. leader = null;
  59. available.signal();
  60. }
  61. } finally {
  62. lock.unlock();
  63. }
  64. return true;
  65. }
  1. public RunnableScheduledFuture<?> take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. for (;;) {
  6. RunnableScheduledFuture<?> first = queue[0];
  7. if (first == null)
  8. available.await();
  9. else {
  10. long delay = first.getDelay(NANOSECONDS);
  11. if (delay <= 0)
  12. return finishPoll(first);
  13. first = null;
  14. if (leader != null)
  15. available.await();
  16. else {
  17. Thread thisThread = Thread.currentThread();
  18. leader = thisThread;
  19. try {
  20. available.awaitNanos(delay);
  21. } finally {
  22. if (leader == thisThread)
  23. leader = null;
  24. }
  25. }
  26. }
  27. }
  28. } finally {
  29. if (leader == null && queue[0] != null)
  30. available.signal();
  31. lock.unlock();
  32. }
  33. }
  34. private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
  35. int s = --size;
  36. RunnableScheduledFuture<?> x = queue[s];
  37. queue[s] = null;
  38. if (s != 0)
  39. siftDown(0, x);
  40. setIndex(f, -1);
  41. return f;
  42. }

ScheduledFutureTask.run

  1. public void run() {
  2. // 执行run方法**************
  3. if (ScheduledFutureTask.super.runAndReset()) {
  4. setNextRunTime();
  5. reExecutePeriodic(outerTask);
  6. }
  7. }
  8. private void setNextRunTime() {
  9. long p = period;
  10. if (p > 0)
  11. time += p;
  12. else
  13. time = triggerTime(-p);
  14. }
  15. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  16. if (canRunInCurrentRunState(true)) {
  17. super.getQueue().add(task);
  18. ensurePrestart();
  19. }
  20. }