image.png

newCachedThreadPool()

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

newFIxedThreadPool()

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

newSingleThreadPool()

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

自定义线程池

  1. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 2 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));

提交优先级
执行优先级

拒绝策略

(1)、CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
(2)、AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
(3)、DiscardPolicy,直接抛弃任务,不做任何处理;
(4)、DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交;

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get();
  25. if (workerCountOf(c) < corePoolSize) {
  26. if (addWorker(command, true))
  27. return;
  28. c = ctl.get();
  29. }
  30. if (isRunning(c) && workQueue.offer(command)) {
  31. int recheck = ctl.get();
  32. if (! isRunning(recheck) && remove(command))
  33. reject(command);
  34. else if (workerCountOf(recheck) == 0)
  35. addWorker(null, false);
  36. }
  37. else if (!addWorker(command, false))
  38. reject(command);
  39. }
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }
  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

ScheduledThreadPoolExecutor

schedule:延迟多长时间之后只执行一次;
scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行;
scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;

  1. ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
  2. scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
  3. System.out.println("time:" + System.currentTimeMillis());
  4. }, 1,3, TimeUnit.SECONDS);
  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. if (isShutdown())
  3. reject(task);
  4. else {
  5. super.getQueue().add(task);
  6. if (isShutdown() &&
  7. !canRunInCurrentRunState(task.isPeriodic()) &&
  8. remove(task))
  9. task.cancel(false);
  10. else
  11. ensurePrestart();
  12. }
  13. }
  1. public boolean offer(Runnable x) {
  2. if (x == null)
  3. throw new NullPointerException();
  4. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. int i = size;
  9. if (i >= queue.length)
  10. grow();//扩容50%
  11. size = i + 1;
  12. if (i == 0) {
  13. queue[0] = e;
  14. setIndex(e, 0);
  15. } else {
  16. siftUp(i, e);//插入堆尾
  17. }
  18. if (queue[0] == e) {
  19. leader = null;//如果新加入的元素成为了堆顶,则原先的leader就无效了。
  20. available.signal();// 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来 取走堆顶任务。
  21. }
  22. } finally {
  23. lock.unlock();
  24. }
  25. return true;
  26. }
  1. private void siftUp(int k, RunnableScheduledFuture<?> key) {
  2. while (k > 0) {
  3. int parent = (k - 1) >>> 1;
  4. RunnableScheduledFuture<?> e = queue[parent];
  5. if (key.compareTo(e) >= 0)
  6. break;
  7. queue[k] = e;
  8. setIndex(e, k);
  9. k = parent;
  10. }
  11. queue[k] = key;
  12. setIndex(key, k);
  13. }
  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. else if (!periodic)
  6. ScheduledFutureTask.super.run();
  7. else if (ScheduledFutureTask.super.runAndReset()) {
  8. setNextRunTime();
  9. reExecutePeriodic(outerTask);
  10. }
  11. }