基础AQS实现,维护一个线程对象

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  2. private static final long serialVersionUID = 6138294804551838833L;
  3. final Thread thread;
  4. Runnable firstTask;
  5. volatile long completedTasks;
  6. Worker(Runnable firstTask) {
  7. setState(-1); // inhibit interrupts until runWorker
  8. this.firstTask = firstTask;
  9. this.thread = getThreadFactory().newThread(this);
  10. }
  11. public void run() {
  12. runWorker(this);
  13. }
  14. protected boolean isHeldExclusively() {
  15. return getState() != 0;
  16. }
  17. protected boolean tryAcquire(int unused) {
  18. if (compareAndSetState(0, 1)) {
  19. setExclusiveOwnerThread(Thread.currentThread());
  20. return true;
  21. }
  22. return false;
  23. }
  24. protected boolean tryRelease(int unused) {
  25. setExclusiveOwnerThread(null);
  26. setState(0);
  27. return true;
  28. }
  29. public void lock() { acquire(1); }
  30. public boolean tryLock() { return tryAcquire(1); }
  31. public void unlock() { release(1); }
  32. public boolean isLocked() { return isHeldExclusively(); }
  33. void interruptIfStarted() {
  34. Thread t;
  35. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  36. try {
  37. t.interrupt();
  38. } catch (SecurityException ignore) {
  39. }
  40. }
  41. }
  42. }

runWorker()

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null // worker中的第一个任务不为空,就运行这个任务
  9. || (task = getTask()) != null)// worker中的第一个任务为空,就getTask()从阻塞队列中获取第一个任务去运行
  10. {
  11. w.lock();
  12. // If pool is stopping, ensure thread is interrupted;
  13. // if not, ensure thread is not interrupted. This
  14. // requires a recheck in second case to deal with
  15. // shutdownNow race while clearing interrupt
  16. if ((runStateAtLeast(ctl.get(), STOP) ||
  17. (Thread.interrupted() &&
  18. runStateAtLeast(ctl.get(), STOP))) &&
  19. !wt.isInterrupted())
  20. wt.interrupt();
  21. try {
  22. beforeExecute(wt, task);
  23. Throwable thrown = null;
  24. try {
  25. task.run();
  26. } catch (RuntimeException x) {
  27. thrown = x; throw x;
  28. } catch (Error x) {
  29. thrown = x; throw x;
  30. } catch (Throwable x) {
  31. thrown = x; throw new Error(x);
  32. } finally {
  33. afterExecute(task, thrown);
  34. }
  35. } finally {
  36. task = null;
  37. w.completedTasks++;
  38. w.unlock();
  39. }
  40. }
  41. completedAbruptly = false;
  42. } finally {
  43. processWorkerExit(w, completedAbruptly); //处理worker退出
  44. }
  45. }

getTask()

image.png