并发之父

Doug Lea(小名:李二狗)
image.png
Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
image.png

在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类: 1、FairSync 公平锁的实现 2、NonfairSync 非公平锁的实现 这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。 上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现

AQS具备特性

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现

  • 一般通过定义内部类Sync继承AQS
  • 将同步器所有调用都映射到Sync对应的方法

AQS内部维护属性volatile int state (32位)

  • state表示资源的可用状态

State三种访问方式
getState()、setState()、compareAndSetState()
AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

AQS定义两种队列

  • 同步等待队列
  • 条件等待队列

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    同步等待队列

    AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
    image.png

    条件等待队列

    Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁image.png

    AQS源码分析

    public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable {
  4. private static final long serialVersionUID = 7373984972572414691L;
  5. protected AbstractQueuedSynchronizer() { }
  6. /**
  7. * Wait queue node class.
  8. *
  9. * 不管是条件队列,还是CLH等待队列
  10. * 都是基于Node类
  11. *
  12. * AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人
  13. * 发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的
  14. * CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
  15. */
  16. static final class Node {
  17. /**
  18. * 标记节点未共享模式
  19. * */
  20. static final Node SHARED = new Node();
  21. /**
  22. * 标记节点为独占模式
  23. */
  24. static final Node EXCLUSIVE = null;
  25. /**
  26. * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
  27. * */
  28. static final int CANCELLED = 1;
  29. /**
  30. * 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
  31. * 将会通知后继节点,使后继节点的线程得以运行。
  32. */
  33. static final int SIGNAL = -1;
  34. /**
  35. * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了
  36. signal()方法后,
  37. * 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
  38. */
  39. static final int CONDITION = -2;
  40. /**
  41. * 表示下一次共享式同步状态获取将会被无条件地传播下去
  42. */
  43. static final int PROPAGATE = -3;
  44. /**
  45. * 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
  46. * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
  47. * 即被一个线程修改后,状态会立马让其他线程可见。
  48. */
  49. volatile int waitStatus;
  50. /**
  51. * 前驱节点,当前节点加入到同步队列中被设置
  52. */
  53. volatile Node prev;
  54. /**
  55. * 后继节点
  56. */
  57. volatile Node next;
  58. /**
  59. * 节点同步状态的线程
  60. */
  61. volatile Thread thread;
  62. /**
  63. * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
  64. * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
  65. */
  66. Node nextWaiter;
  67. /**
  68. * Returns true if node is waiting in shared mode.
  69. */
  70. final boolean isShared() {
  71. return nextWaiter == SHARED;
  72. }
  73. /**
  74. * 返回前驱节点
  75. */
  76. final Node predecessor() throws NullPointerException {
  77. Node p = prev;
  78. if (p == null)
  79. throw new NullPointerException();
  80. else
  81. return p;
  82. }
  83. //空节点,用于标记共享模式
  84. Node() { // Used to establish initial head or SHARED marker
  85. }
  86. //用于同步队列CLH
  87. Node(Thread thread, Node mode) { // Used by addWaiter
  88. this.nextWaiter = mode;
  89. this.thread = thread;
  90. }
  91. //用于条件队列
  92. Node(Thread thread, int waitStatus) { // Used by Condition
  93. this.waitStatus = waitStatus;
  94. this.thread = thread;
  95. }
  96. }
  97. /**
  98. * 指向同步等待队列的头节点
  99. */
  100. private transient volatile Node head;
  101. /**
  102. * 指向同步等待队列的尾节点
  103. */
  104. private transient volatile Node tail;
  105. /**
  106. * 同步资源状态
  107. */
  108. private volatile int state;
  109. /**
  110. *
  111. * @return current state value
  112. */
  113. protected final int getState() {
  114. return state;
  115. }
  116. protected final void setState(int newState) {
  117. state = newState;
  118. }
  119. //cas替换
  120. protected final boolean compareAndSetState(int expect, int update) {
  121. // See below for intrinsics setup to support this
  122. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  123. }
  124. // Queuing utilities
  125. static final long spinForTimeoutThreshold = 1000L;
  126. /**
  127. * 节点加入CLH同步队列
  128. */
  129. private Node enq(final Node node) {
  130. for (;;) {
  131. Node t = tail;
  132. if (t == null) { // Must initialize
  133. //队列为空需要初始化,创建空的头节点
  134. if (compareAndSetHead(new Node()))
  135. tail = head;
  136. } else {
  137. node.prev = t;
  138. //set尾部节点
  139. if (compareAndSetTail(t, node)) {//当前节点置为尾部
  140. t.next = node;//前驱节点的next指针指向当前节点
  141. return t;
  142. }
  143. }
  144. }
  145. }
  146. private Node addWaiter(Node mode) {
  147. // 1. 将当前线程构建成Node类型
  148. Node node = new Node(Thread.currentThread(), mode);
  149. // Try the fast path of enq; backup to full enq on failure
  150. Node pred = tail;
  151. // 2. 1当前尾节点是否为null?
  152. if (pred != null) {
  153. // 2.2 将当前节点尾插入的方式
  154. node.prev = pred;
  155. // 2.3 CAS将节点插入同步队列的尾部
  156. if (compareAndSetTail(pred, node)) {
  157. pred.next = node;
  158. return node;
  159. }
  160. }
  161. enq(node);
  162. return node;
  163. }
  164. private void setHead(Node node) {
  165. head = node;
  166. node.thread = null;
  167. node.prev = null;
  168. }
  169. //释放锁
  170. private void unparkSuccessor(Node node) {
  171. //获取wait状态
  172. int ws = node.waitStatus;
  173. if (ws < 0)
  174. compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0
  175. /**
  176. * 若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
  177. * 进行唤醒
  178. */
  179. Node s = node.next; //head.next = Node1 ,thread = T3
  180. if (s == null || s.waitStatus > 0) {
  181. s = null;
  182. for (Node t = tail; t != null && t != node; t = t.prev)
  183. if (t.waitStatus <= 0)
  184. s = t;
  185. }
  186. if (s != null)
  187. LockSupport.unpark(s.thread);//唤醒线程,T3唤醒
  188. }
  189. /**
  190. * 把当前结点设置为SIGNAL(-1)或者PROPAGATE(-3)
  191. * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
  192. * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
  193. * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
  194. */
  195. private void doReleaseShared() {
  196. for (;;) {
  197. Node h = head;
  198. if (h != null && h != tail) {
  199. int ws = h.waitStatus;
  200. if (ws == Node.SIGNAL) {//head是SIGNAL状态
  201. /* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGAT,
  202. * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
  203. * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
  204. */
  205. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  206. continue; //设置失败,重新循环
  207. /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
  208. * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
  209. * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
  210. */
  211. unparkSuccessor(h);
  212. /*
  213. * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
  214. * 意味着需要将状态向后一个节点传播
  215. */
  216. }
  217. else if (ws == 0 &&
  218. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  219. continue; // loop on failed CAS
  220. }
  221. if (h == head)//如果head变了,重新循环
  222. break;
  223. }
  224. }
  225. /**
  226. * 把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE
  227. */
  228. private void setHeadAndPropagate(Node node, int propagate) {
  229. Node h = head; //h用来保存旧的head节点
  230. setHead(node);//head引用指向node节点
  231. /* 这里意思有两种情况是需要执行唤醒操作
  232. * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
  233. * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
  234. */
  235. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  236. (h = head) == null || h.waitStatus < 0) {
  237. Node s = node.next;
  238. if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
  239. /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
  240. * head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
  241. */
  242. doReleaseShared();
  243. }
  244. }
  245. /**
  246. * 终结掉正在尝试去获取锁的节点
  247. * @param node the node
  248. */
  249. private void cancelAcquire(Node node) {
  250. // Ignore if node doesn't exist
  251. if (node == null)
  252. return;
  253. node.thread = null;
  254. // 剔除掉一件被cancel掉的节点
  255. Node pred = node.prev;
  256. while (pred.waitStatus > 0)
  257. node.prev = pred = pred.prev;
  258. // predNext is the apparent node to unsplice. CASes below will
  259. // fail if not, in which case, we lost race vs another cancel
  260. // or signal, so no further action is necessary.
  261. Node predNext = pred.next;
  262. // Can use unconditional write instead of CAS here.
  263. // After this atomic step, other Nodes can skip past us.
  264. // Before, we are free of interference from other threads.
  265. node.waitStatus = Node.CANCELLED;
  266. // If we are the tail, remove ourselves.
  267. if (node == tail && compareAndSetTail(node, pred)) {
  268. compareAndSetNext(pred, predNext, null);
  269. } else {
  270. // If successor needs signal, try to set pred's next-link
  271. // so it will get one. Otherwise wake it up to propagate.
  272. int ws;
  273. if (pred != head &&
  274. ((ws = pred.waitStatus) == Node.SIGNAL ||
  275. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  276. pred.thread != null) {
  277. Node next = node.next;
  278. if (next != null && next.waitStatus <= 0)
  279. compareAndSetNext(pred, predNext, next);
  280. } else {
  281. unparkSuccessor(node);
  282. }
  283. node.next = node; // help GC
  284. }
  285. }
  286. //设置waitStatus 状态
  287. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  288. int ws = pred.waitStatus;
  289. if (ws == Node.SIGNAL)
  290. /*
  291. * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
  292. */
  293. return true;
  294. if (ws > 0) {
  295. /*
  296. * 前驱节点状态如果被取消状态,将被移除出队列
  297. */
  298. do {
  299. node.prev = pred = pred.prev;
  300. } while (pred.waitStatus > 0);
  301. pred.next = node;
  302. } else {
  303. /*
  304. * 当前驱节点waitStatus为 0 or PROPAGATE状态时
  305. * 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
  306. */
  307. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  308. }
  309. return false;
  310. }
  311. /**
  312. * 中断当前线程
  313. */
  314. static void selfInterrupt() {
  315. Thread.currentThread().interrupt();
  316. }
  317. /**
  318. * 阻塞当前节点,返回当前Thread的中断状态
  319. * LockSupport.park 底层实现逻辑调用系统内核功能 pthread_mutex_lock 阻塞线程
  320. */
  321. private final boolean parkAndCheckInterrupt() {
  322. LockSupport.park(this);
  323. return Thread.interrupted();
  324. }
  325. /**
  326. * 已经在队列当中的Thread节点,准备阻塞等待获取锁
  327. */
  328. final boolean acquireQueued(final Node node, int arg) {
  329. boolean failed = true;
  330. try {
  331. boolean interrupted = false;
  332. for (;;) {//死循环
  333. final Node p = node.predecessor();/找到当前结点的前驱结点
  334. if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
  335. setHead(node);//获取同步状态成功,将当前结点设置为头结点。
  336. p.next = null; // help GC
  337. failed = false;
  338. return interrupted;
  339. }
  340. /**
  341. * 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞
  342. * 前驱节点信号量为-1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
  343. */
  344. if (shouldParkAfterFailedAcquire(p, node) &&
  345. parkAndCheckInterrupt())
  346. interrupted = true;
  347. }
  348. } finally {
  349. if (failed)
  350. cancelAcquire(node);
  351. }
  352. }
  353. /**
  354. * 与acquireQueued逻辑相似,唯一区别节点还不在队列当中需要先进行入队操作
  355. */
  356. private void doAcquireInterruptibly(int arg)
  357. throws InterruptedException {
  358. final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
  359. boolean failed = true;
  360. try {
  361. for (;;) {
  362. final Node p = node.predecessor();
  363. if (p == head && tryAcquire(arg)) {
  364. setHead(node);
  365. p.next = null; // help GC
  366. failed = false;
  367. return;
  368. }
  369. if (shouldParkAfterFailedAcquire(p, node) &&
  370. parkAndCheckInterrupt())
  371. throw new InterruptedException();
  372. }
  373. } finally {
  374. if (failed)
  375. cancelAcquire(node);
  376. }
  377. }
  378. /**
  379. * 独占模式定时获取
  380. */
  381. private boolean doAcquireNanos(int arg, long nanosTimeout)
  382. throws InterruptedException {
  383. if (nanosTimeout <= 0L)
  384. return false;
  385. final long deadline = System.nanoTime() + nanosTimeout;
  386. final Node node = addWaiter(Node.EXCLUSIVE);//加入队列
  387. boolean failed = true;
  388. try {
  389. for (;;) {
  390. final Node p = node.predecessor();
  391. if (p == head && tryAcquire(arg)) {
  392. setHead(node);
  393. p.next = null; // help GC
  394. failed = false;
  395. return true;
  396. }
  397. nanosTimeout = deadline - System.nanoTime();
  398. if (nanosTimeout <= 0L)
  399. return false;//超时直接返回获取失败
  400. if (shouldParkAfterFailedAcquire(p, node) &&
  401. nanosTimeout > spinForTimeoutThreshold)
  402. //阻塞指定时长,超时则线程自动被唤醒
  403. LockSupport.parkNanos(this, nanosTimeout);
  404. if (Thread.interrupted())//当前线程中断状态
  405. throw new InterruptedException();
  406. }
  407. } finally {
  408. if (failed)
  409. cancelAcquire(node);
  410. }
  411. }
  412. /**
  413. * 尝试获取共享锁
  414. */
  415. private void doAcquireShared(int arg) {
  416. final Node node = addWaiter(Node.SHARED);//入队
  417. boolean failed = true;
  418. try {
  419. boolean interrupted = false;
  420. for (;;) {
  421. final Node p = node.predecessor();//前驱节点
  422. if (p == head) {
  423. int r = tryAcquireShared(arg);//非公平锁实现,再尝试获取锁
  424. //state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。
  425. // state为0说明共享次数已经到了,可以获取锁了
  426. if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
  427. //这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
  428. setHeadAndPropagate(node, r);
  429. p.next = null; // help GC
  430. if (interrupted)
  431. selfInterrupt();
  432. failed = false;
  433. return;
  434. }
  435. }
  436. //前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
  437. if (shouldParkAfterFailedAcquire(p, node) &&
  438. parkAndCheckInterrupt())
  439. interrupted = true;
  440. }
  441. } finally {
  442. if (failed)
  443. cancelAcquire(node);
  444. }
  445. }
  446. /**
  447. * Acquires in shared interruptible mode.
  448. * @param arg the acquire argument
  449. */
  450. private void doAcquireSharedInterruptibly(int arg)
  451. throws InterruptedException {
  452. final Node node = addWaiter(Node.SHARED);
  453. boolean failed = true;
  454. try {
  455. for (;;) {
  456. final Node p = node.predecessor();
  457. if (p == head) {
  458. int r = tryAcquireShared(arg);
  459. if (r >= 0) {
  460. setHeadAndPropagate(node, r);
  461. p.next = null; // help GC
  462. failed = false;
  463. return;
  464. }
  465. }
  466. if (shouldParkAfterFailedAcquire(p, node) &&
  467. parkAndCheckInterrupt())
  468. throw new InterruptedException();
  469. }
  470. } finally {
  471. if (failed)
  472. cancelAcquire(node);
  473. }
  474. }
  475. /**
  476. * Acquires in shared timed mode.
  477. *
  478. * @param arg the acquire argument
  479. * @param nanosTimeout max wait time
  480. * @return {@code true} if acquired
  481. */
  482. private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  483. throws InterruptedException {
  484. if (nanosTimeout <= 0L)
  485. return false;
  486. final long deadline = System.nanoTime() + nanosTimeout;
  487. final Node node = addWaiter(Node.SHARED);
  488. boolean failed = true;
  489. try {
  490. for (;;) {
  491. final Node p = node.predecessor();
  492. if (p == head) {
  493. int r = tryAcquireShared(arg);
  494. if (r >= 0) {
  495. setHeadAndPropagate(node, r);
  496. p.next = null; // help GC
  497. failed = false;
  498. return true;
  499. }
  500. }
  501. nanosTimeout = deadline - System.nanoTime();
  502. if (nanosTimeout <= 0L)
  503. return false;
  504. if (shouldParkAfterFailedAcquire(p, node) &&
  505. nanosTimeout > spinForTimeoutThreshold)
  506. LockSupport.parkNanos(this, nanosTimeout);
  507. if (Thread.interrupted())
  508. throw new InterruptedException();
  509. }
  510. } finally {
  511. if (failed)
  512. cancelAcquire(node);
  513. }
  514. }
  515. /**
  516. * 尝试获取独占锁,可指定锁的获取数量
  517. */
  518. protected boolean tryAcquire(int arg) {
  519. throw new UnsupportedOperationException();
  520. }
  521. /**
  522. * 尝试释放独占锁,在子类当中实现
  523. */
  524. protected boolean tryRelease(int arg) {
  525. throw new UnsupportedOperationException();
  526. }
  527. /**
  528. * 共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,
  529. * 其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;
  530. * 对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。
  531. * 本方法待被之类覆盖实现具体逻辑
  532. * 1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
  533. *
  534. * 2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
  535. * 3.当返回值小于0时,表示获取同步状态失败。
  536. */
  537. protected int tryAcquireShared(int arg) {
  538. throw new UnsupportedOperationException();
  539. }
  540. /**
  541. * 释放共享锁,具体实现在子类当中实现
  542. */
  543. protected boolean tryReleaseShared(int arg) {
  544. throw new UnsupportedOperationException();
  545. }
  546. /**
  547. * 当前线程是否持有独占锁
  548. */
  549. protected boolean isHeldExclusively() {
  550. throw new UnsupportedOperationException();
  551. }
  552. /**
  553. * 获取独占锁
  554. */
  555. public final void acquire(int arg) {
  556. //尝试获取锁
  557. if (!tryAcquire(arg) &&
  558. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式
  559. selfInterrupt();
  560. }
  561. /**
  562. *
  563. */
  564. public final void acquireInterruptibly(int arg)
  565. throws InterruptedException {
  566. if (Thread.interrupted())
  567. throw new InterruptedException();
  568. if (!tryAcquire(arg))
  569. doAcquireInterruptibly(arg);
  570. }
  571. /**
  572. * 获取独占锁,设置最大等待时间
  573. */
  574. public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  575. throws InterruptedException {
  576. if (Thread.interrupted())
  577. throw new InterruptedException();
  578. return tryAcquire(arg) ||
  579. doAcquireNanos(arg, nanosTimeout);
  580. }
  581. /**
  582. * 释放独占模式持有的锁
  583. */
  584. public final boolean release(int arg) {
  585. if (tryRelease(arg)) {//释放一次锁
  586. Node h = head;
  587. if (h != null && h.waitStatus != 0)
  588. unparkSuccessor(h);//唤醒后继结点
  589. return true;
  590. }
  591. return false;
  592. }
  593. /**
  594. * 请求获取共享锁
  595. */
  596. public final void acquireShared(int arg) {
  597. if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
  598. doAcquireShared(arg);
  599. }
  600. /**
  601. * Releases in shared mode. Implemented by unblocking one or more
  602. * threads if {@link #tryReleaseShared} returns true.
  603. *
  604. * @param arg the release argument. This value is conveyed to
  605. * {@link #tryReleaseShared} but is otherwise uninterpreted
  606. * and can represent anything you like.
  607. * @return the value returned from {@link #tryReleaseShared}
  608. */
  609. public final boolean releaseShared(int arg) {
  610. if (tryReleaseShared(arg)) {
  611. doReleaseShared();
  612. return true;
  613. }
  614. return false;
  615. }
  616. // Queue inspection methods
  617. public final boolean hasQueuedThreads() {
  618. return head != tail;
  619. }
  620. public final boolean hasContended() {
  621. return head != null;
  622. }
  623. public final Thread getFirstQueuedThread() {
  624. // handle only fast path, else relay
  625. return (head == tail) ? null : fullGetFirstQueuedThread();
  626. }
  627. /**
  628. * Version of getFirstQueuedThread called when fastpath fails
  629. */
  630. private Thread fullGetFirstQueuedThread() {
  631. Node h, s;
  632. Thread st;
  633. if (((h = head) != null && (s = h.next) != null &&
  634. s.prev == head && (st = s.thread) != null) ||
  635. ((h = head) != null && (s = h.next) != null &&
  636. s.prev == head && (st = s.thread) != null))
  637. return st;
  638. Node t = tail;
  639. Thread firstThread = null;
  640. while (t != null && t != head) {
  641. Thread tt = t.thread;
  642. if (tt != null)
  643. firstThread = tt;
  644. t = t.prev;
  645. }
  646. return firstThread;
  647. }
  648. /**
  649. * 判断当前线程是否在队列当中
  650. */
  651. public final boolean isQueued(Thread thread) {
  652. if (thread == null)
  653. throw new NullPointerException();
  654. for (Node p = tail; p != null; p = p.prev)
  655. if (p.thread == thread)
  656. return true;
  657. return false;
  658. }
  659. final boolean apparentlyFirstQueuedIsExclusive() {
  660. Node h, s;
  661. return (h = head) != null &&
  662. (s = h.next) != null &&
  663. !s.isShared() &&
  664. s.thread != null;
  665. }
  666. /**
  667. * 判断当前节点是否有前驱节点
  668. */
  669. public final boolean hasQueuedPredecessors() {
  670. Node t = tail; // Read fields in reverse initialization order
  671. Node h = head;
  672. Node s;
  673. return h != t &&
  674. ((s = h.next) == null || s.thread != Thread.currentThread());
  675. }
  676. /**
  677. * 同步队列长度
  678. */
  679. public final int getQueueLength() {
  680. int n = 0;
  681. for (Node p = tail; p != null; p = p.prev) {
  682. if (p.thread != null)
  683. ++n;
  684. }
  685. return n;
  686. }
  687. /**
  688. * 获取队列等待thread集合
  689. */
  690. public final Collection<Thread> getQueuedThreads() {
  691. ArrayList<Thread> list = new ArrayList<Thread>();
  692. for (Node p = tail; p != null; p = p.prev) {
  693. Thread t = p.thread;
  694. if (t != null)
  695. list.add(t);
  696. }
  697. return list;
  698. }
  699. /**
  700. * 获取独占模式等待thread线程集合
  701. */
  702. public final Collection<Thread> getExclusiveQueuedThreads() {
  703. ArrayList<Thread> list = new ArrayList<Thread>();
  704. for (Node p = tail; p != null; p = p.prev) {
  705. if (!p.isShared()) {
  706. Thread t = p.thread;
  707. if (t != null)
  708. list.add(t);
  709. }
  710. }
  711. return list;
  712. }
  713. /**
  714. * 获取共享模式等待thread集合
  715. */
  716. public final Collection<Thread> getSharedQueuedThreads() {
  717. ArrayList<Thread> list = new ArrayList<Thread>();
  718. for (Node p = tail; p != null; p = p.prev) {
  719. if (p.isShared()) {
  720. Thread t = p.thread;
  721. if (t != null)
  722. list.add(t);
  723. }
  724. }
  725. return list;
  726. }
  727. /**
  728. * 判断节点是否在同步队列中
  729. */
  730. final boolean isOnSyncQueue(Node node) {
  731. //快速判断1:节点状态或者节点没有前置节点
  732. //注:同步队列是有头节点的,而条件队列没有
  733. if (node.waitStatus == Node.CONDITION || node.prev == null)
  734. return false;
  735. //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
  736. if (node.next != null) // If has successor, it must be on queue
  737. return true;
  738. //上面如果无法判断则进入复杂判断
  739. return findNodeFromTail(node);
  740. }
  741. private boolean findNodeFromTail(Node node) {
  742. Node t = tail;
  743. for (;;) {
  744. if (t == node)
  745. return true;
  746. if (t == null)
  747. return false;
  748. t = t.prev;
  749. }
  750. }
  751. /**
  752. * 将节点从条件队列当中移动到同步队列当中,等待获取锁
  753. */
  754. final boolean transferForSignal(Node node) {
  755. /*
  756. * 修改节点信号量状态为0,失败直接返回false
  757. */
  758. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  759. return false;
  760. /*
  761. * 加入同步队列尾部当中,返回前驱节点
  762. */
  763. Node p = enq(node);
  764. int ws = p.waitStatus;
  765. //前驱节点不可用 或者 修改信号量状态失败
  766. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  767. LockSupport.unpark(node.thread); //唤醒当前节点
  768. return true;
  769. }
  770. final boolean transferAfterCancelledWait(Node node) {
  771. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  772. enq(node);
  773. return true;
  774. }
  775. /*
  776. * If we lost out to a signal(), then we can't proceed
  777. * until it finishes its enq(). Cancelling during an
  778. * incomplete transfer is both rare and transient, so just
  779. * spin.
  780. */
  781. while (!isOnSyncQueue(node))
  782. Thread.yield();
  783. return false;
  784. }
  785. /**
  786. * 入参就是新创建的节点,即当前节点
  787. */
  788. final int fullyRelease(Node node) {
  789. boolean failed = true;
  790. try {
  791. //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
  792. //可以考虑下这个逻辑放在共享锁下面会发生什么?
  793. int savedState = getState();
  794. if (release(savedState)) {
  795. failed = false;
  796. return savedState;
  797. } else {
  798. //如果这里释放失败,则抛出异常
  799. throw new IllegalMonitorStateException();
  800. }
  801. } finally {
  802. /**
  803. * 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
  804. * 只需要判断最后一个节点是否被取消就可以了
  805. */
  806. if (failed)
  807. node.waitStatus = Node.CANCELLED;
  808. }
  809. }
  810. public final boolean hasWaiters(ConditionObject condition) {
  811. if (!owns(condition))
  812. throw new IllegalArgumentException("Not owner");
  813. return condition.hasWaiters();
  814. }
  815. /**
  816. * 获取条件队列长度
  817. */
  818. public final int getWaitQueueLength(ConditionObject condition) {
  819. if (!owns(condition))
  820. throw new IllegalArgumentException("Not owner");
  821. return condition.getWaitQueueLength();
  822. }
  823. /**
  824. * 获取条件队列当中所有等待的thread集合
  825. */
  826. public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
  827. if (!owns(condition))
  828. throw new IllegalArgumentException("Not owner");
  829. return condition.getWaitingThreads();
  830. }
  831. /**
  832. * 条件对象,实现基于条件的具体行为
  833. */
  834. public class ConditionObject implements Condition, java.io.Serializable {
  835. private static final long serialVersionUID = 1173984872572414699L;
  836. /** First node of condition queue. */
  837. private transient Node firstWaiter;
  838. /** Last node of condition queue. */
  839. private transient Node lastWaiter;
  840. /**
  841. * Creates a new {@code ConditionObject} instance.
  842. */
  843. public ConditionObject() { }
  844. /**
  845. * 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
  846. * 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
  847. */
  848. private Node addConditionWaiter() {
  849. Node t = lastWaiter;
  850. //如果最后一个节点被取消,则删除队列中被取消的节点
  851. //至于为啥是最后一个节点后面会分析
  852. if (t != null && t.waitStatus != Node.CONDITION) {
  853. //删除所有被取消的节点
  854. unlinkCancelledWaiters();
  855. t = lastWaiter;
  856. }
  857. //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
  858. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  859. if (t == null)
  860. firstWaiter = node;
  861. else
  862. t.nextWaiter = node;
  863. lastWaiter = node;
  864. return node;
  865. }
  866. /**
  867. * 发信号,通知遍历条件队列当中的节点转移到同步队列当中,准备排队获取锁
  868. */
  869. private void doSignal(Node first) {
  870. do {
  871. if ( (firstWaiter = first.nextWaiter) == null)
  872. lastWaiter = null;
  873. first.nextWaiter = null;
  874. } while (!transferForSignal(first) && //转移节点
  875. (first = firstWaiter) != null);
  876. }
  877. /**
  878. * 通知所有节点移动到同步队列当中,并将节点从条件队列删除
  879. */
  880. private void doSignalAll(Node first) {
  881. lastWaiter = firstWaiter = null;
  882. do {
  883. Node next = first.nextWaiter;
  884. first.nextWaiter = null;
  885. transferForSignal(first);
  886. first = next;
  887. } while (first != null);
  888. }
  889. /**
  890. * 删除条件队列当中被取消的节点
  891. */
  892. private void unlinkCancelledWaiters() {
  893. Node t = firstWaiter;
  894. Node trail = null;
  895. while (t != null) {
  896. Node next = t.nextWaiter;
  897. if (t.waitStatus != Node.CONDITION) {
  898. t.nextWaiter = null;
  899. if (trail == null)
  900. firstWaiter = next;
  901. else
  902. trail.nextWaiter = next;
  903. if (next == null)
  904. lastWaiter = trail;
  905. }
  906. else
  907. trail = t;
  908. t = next;
  909. }
  910. }
  911. /**
  912. * 发新号,通知条件队列当中节点到同步队列当中去排队
  913. */
  914. public final void signal() {
  915. if (!isHeldExclusively())//节点不能已经持有独占锁
  916. throw new IllegalMonitorStateException();
  917. Node first = firstWaiter;
  918. if (first != null)
  919. /**
  920. * 发信号通知条件队列的节点准备到同步队列当中去排队
  921. */
  922. doSignal(first);
  923. }
  924. /**
  925. * 唤醒所有条件队列的节点转移到同步队列当中
  926. */
  927. public final void signalAll() {
  928. if (!isHeldExclusively())
  929. throw new IllegalMonitorStateException();
  930. Node first = firstWaiter;
  931. if (first != null)
  932. doSignalAll(first);
  933. }
  934. /**
  935. * Implements uninterruptible condition wait.
  936. * <ol>
  937. * <li> Save lock state returned by {@link #getState}.
  938. * <li> Invoke {@link #release} with saved state as argument,
  939. * throwing IllegalMonitorStateException if it fails.
  940. * <li> Block until signalled.
  941. * <li> Reacquire by invoking specialized version of
  942. * {@link #acquire} with saved state as argument.
  943. * </ol>
  944. */
  945. public final void awaitUninterruptibly() {
  946. Node node = addConditionWaiter();
  947. int savedState = fullyRelease(node);
  948. boolean interrupted = false;
  949. while (!isOnSyncQueue(node)) {
  950. LockSupport.park(this);
  951. if (Thread.interrupted())
  952. interrupted = true;
  953. }
  954. if (acquireQueued(node, savedState) || interrupted)
  955. selfInterrupt();
  956. }
  957. /** 该模式表示在退出等待时重新中断 */
  958. private static final int REINTERRUPT = 1;
  959. /** 异常中断 */
  960. private static final int THROW_IE = -1;
  961. /**
  962. * 这里的判断逻辑是:
  963. * 1.如果现在不是中断的,即正常被signal唤醒则返回0
  964. * 2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
  965. */
  966. private int checkInterruptWhileWaiting(Node node) {
  967. return Thread.interrupted() ?
  968. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  969. 0;
  970. }
  971. /**
  972. * 根据中断时机选择抛出异常或者设置线程中断状态
  973. */
  974. private void reportInterruptAfterWait(int interruptMode)
  975. throws InterruptedException {
  976. if (interruptMode == THROW_IE)
  977. throw new InterruptedException();
  978. else if (interruptMode == REINTERRUPT)
  979. selfInterrupt();
  980. }
  981. /**
  982. * 加入条件队列等待,条件队列入口
  983. */
  984. public final void await() throws InterruptedException {
  985. //T2进来
  986. //如果当前线程被中断则直接抛出异常
  987. if (Thread.interrupted())
  988. throw new InterruptedException();
  989. //把当前节点加入条件队列
  990. Node node = addConditionWaiter();
  991. //释放掉已经获取的独占锁资源
  992. int savedState = fullyRelease(node);//T2释放锁
  993. int interruptMode = 0;
  994. //如果不在同步队列中则不断挂起
  995. while (!isOnSyncQueue(node)) {
  996. LockSupport.park(this);//T1被阻塞
  997. //这里被唤醒可能是正常的signal操作也可能是中断
  998. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  999. break;
  1000. }
  1001. /**
  1002. * 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
  1003. * 这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
  1004. * 在处理中断之前首先要做的是从同步队列中成功获取锁资源
  1005. */
  1006. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1007. interruptMode = REINTERRUPT;
  1008. //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
  1009. //删除条件队列中被取消的节点
  1010. if (node.nextWaiter != null) // clean up if cancelled
  1011. unlinkCancelledWaiters();
  1012. //根据不同模式处理中断
  1013. if (interruptMode != 0)
  1014. reportInterruptAfterWait(interruptMode);
  1015. }
  1016. /**
  1017. * Implements timed condition wait.
  1018. * <ol>
  1019. * <li> If current thread is interrupted, throw InterruptedException.
  1020. * <li> Save lock state returned by {@link #getState}.
  1021. * <li> Invoke {@link #release} with saved state as argument,
  1022. * throwing IllegalMonitorStateException if it fails.
  1023. * <li> Block until signalled, interrupted, or timed out.
  1024. * <li> Reacquire by invoking specialized version of
  1025. * {@link #acquire} with saved state as argument.
  1026. * <li> If interrupted while blocked in step 4, throw InterruptedException.
  1027. * <li> If timed out while blocked in step 4, return false, else true.
  1028. * </ol>
  1029. */
  1030. public final boolean await(long time, TimeUnit unit)
  1031. throws InterruptedException {
  1032. long nanosTimeout = unit.toNanos(time);
  1033. if (Thread.interrupted())
  1034. throw new InterruptedException();
  1035. Node node = addConditionWaiter();
  1036. int savedState = fullyRelease(node);
  1037. final long deadline = System.nanoTime() + nanosTimeout;
  1038. boolean timedout = false;
  1039. int interruptMode = 0;
  1040. while (!isOnSyncQueue(node)) {
  1041. if (nanosTimeout <= 0L) {
  1042. timedout = transferAfterCancelledWait(node);
  1043. break;
  1044. }
  1045. if (nanosTimeout >= spinForTimeoutThreshold)
  1046. LockSupport.parkNanos(this, nanosTimeout);
  1047. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  1048. break;
  1049. nanosTimeout = deadline - System.nanoTime();
  1050. }
  1051. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  1052. interruptMode = REINTERRUPT;
  1053. if (node.nextWaiter != null)
  1054. unlinkCancelledWaiters();
  1055. if (interruptMode != 0)
  1056. reportInterruptAfterWait(interruptMode);
  1057. return !timedout;
  1058. }
  1059. final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
  1060. return sync == AbstractQueuedSynchronizer.this;
  1061. }
  1062. /**
  1063. * Queries whether any threads are waiting on this condition.
  1064. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
  1065. *
  1066. * @return {@code true} if there are any waiting threads
  1067. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1068. * returns {@code false}
  1069. */
  1070. protected final boolean hasWaiters() {
  1071. if (!isHeldExclusively())
  1072. throw new IllegalMonitorStateException();
  1073. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1074. if (w.waitStatus == Node.CONDITION)
  1075. return true;
  1076. }
  1077. return false;
  1078. }
  1079. /**
  1080. * Returns an estimate of the number of threads waiting on
  1081. * this condition.
  1082. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
  1083. *
  1084. * @return the estimated number of waiting threads
  1085. * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  1086. * returns {@code false}
  1087. */
  1088. protected final int getWaitQueueLength() {
  1089. if (!isHeldExclusively())
  1090. throw new IllegalMonitorStateException();
  1091. int n = 0;
  1092. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1093. if (w.waitStatus == Node.CONDITION)
  1094. ++n;
  1095. }
  1096. return n;
  1097. }
  1098. /**
  1099. * 得到同步队列当中所有在等待的Thread集合
  1100. */
  1101. protected final Collection<Thread> getWaitingThreads() {
  1102. if (!isHeldExclusively())
  1103. throw new IllegalMonitorStateException();
  1104. ArrayList<Thread> list = new ArrayList<Thread>();
  1105. for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  1106. if (w.waitStatus == Node.CONDITION) {
  1107. Thread t = w.thread;
  1108. if (t != null)
  1109. list.add(t);
  1110. }
  1111. }
  1112. return list;
  1113. }
  1114. }
  1115. /**
  1116. * Setup to support compareAndSet. We need to natively implement
  1117. * this here: For the sake of permitting future enhancements, we
  1118. * cannot explicitly subclass AtomicInteger, which would be
  1119. * efficient and useful otherwise. So, as the lesser of evils, we
  1120. * natively implement using hotspot intrinsics API. And while we
  1121. * are at it, we do the same for other CASable fields (which could
  1122. * otherwise be done with atomic field updaters).
  1123. * unsafe魔法类,直接绕过虚拟机内存管理机制,修改内存
  1124. */
  1125. private static final Unsafe unsafe = Unsafe.getUnsafe();
  1126. //偏移量
  1127. private static final long stateOffset;
  1128. private static final long headOffset;
  1129. private static final long tailOffset;
  1130. private static final long waitStatusOffset;
  1131. private static final long nextOffset;
  1132. static {
  1133. try {
  1134. //状态偏移量
  1135. stateOffset = unsafe.objectFieldOffset
  1136. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  1137. //head指针偏移量,head指向CLH队列的头部
  1138. headOffset = unsafe.objectFieldOffset
  1139. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  1140. tailOffset = unsafe.objectFieldOffset
  1141. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  1142. waitStatusOffset = unsafe.objectFieldOffset
  1143. (Node.class.getDeclaredField("waitStatus"));
  1144. nextOffset = unsafe.objectFieldOffset
  1145. (Node.class.getDeclaredField("next"));
  1146. } catch (Exception ex) { throw new Error(ex); }
  1147. }
  1148. /**
  1149. * CAS 修改头部节点指向. 并发入队时使用.
  1150. */
  1151. private final boolean compareAndSetHead(Node update) {
  1152. return unsafe.compareAndSwapObject(this, headOffset, null, update);
  1153. }
  1154. /**
  1155. * CAS 修改尾部节点指向. 并发入队时使用.
  1156. */
  1157. private final boolean compareAndSetTail(Node expect, Node update) {
  1158. return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  1159. }
  1160. /**
  1161. * CAS 修改信号量状态.
  1162. */
  1163. private static final boolean compareAndSetWaitStatus(Node node,
  1164. int expect,
  1165. int update) {
  1166. return unsafe.compareAndSwapInt(node, waitStatusOffset,
  1167. expect, update);
  1168. }
  1169. /**
  1170. * 修改节点的后继指针.
  1171. */
  1172. private static final boolean compareAndSetNext(Node node,
  1173. Node expect,
  1174. Node update) {
  1175. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  1176. }
  1177. }

AQS框架具体实现-独占锁实现ReentrantLock

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private static final long serialVersionUID = 7373984872572414699L;
  3. /**
  4. * 内部调用AQS的动作,都基于该成员属性实现
  5. */
  6. private final Sync sync;
  7. /**
  8. * ReentrantLock锁同步操作的基础类,继承自AQS框架.
  9. * 该类有两个继承类,1、NonfairSync 非公平锁,2、FairSync公平锁
  10. */
  11. abstract static class Sync extends AbstractQueuedSynchronizer {
  12. private static final long serialVersionUID = -5179523762034025860L;
  13. /**
  14. * 加锁的具体行为由子类实现
  15. */
  16. abstract void lock();
  17. /**
  18. * 尝试获取非公平锁
  19. */
  20. final boolean nonfairTryAcquire(int acquires) {
  21. //acquires = 1
  22. final Thread current = Thread.currentThread();
  23. int c = getState();
  24. /**
  25. * 不需要判断同步队列(CLH)中是否有排队等待线程
  26. * 判断state状态是否为0,不为0可以加锁
  27. */
  28. if (c == 0) {
  29. //unsafe操作,cas修改state状态
  30. if (compareAndSetState(0, acquires)) {
  31. //独占状态锁持有者指向当前线程
  32. setExclusiveOwnerThread(current);
  33. return true;
  34. }
  35. }
  36. /**
  37. * state状态不为0,判断锁持有者是否是当前线程,
  38. * 如果是当前线程持有 则state+1
  39. */
  40. else if (current == getExclusiveOwnerThread()) {
  41. int nextc = c + acquires;
  42. if (nextc < 0) // overflow
  43. throw new Error("Maximum lock count exceeded");
  44. setState(nextc);
  45. return true;
  46. }
  47. //加锁失败
  48. return false;
  49. }
  50. /**
  51. * 释放锁
  52. */
  53. protected final boolean tryRelease(int releases) {
  54. int c = getState() - releases;
  55. if (Thread.currentThread() != getExclusiveOwnerThread())
  56. throw new IllegalMonitorStateException();
  57. boolean free = false;
  58. if (c == 0) {
  59. free = true;
  60. setExclusiveOwnerThread(null);
  61. }
  62. setState(c);
  63. return free;
  64. }
  65. /**
  66. * 判断持有独占锁的线程是否是当前线程
  67. */
  68. protected final boolean isHeldExclusively() {
  69. return getExclusiveOwnerThread() == Thread.currentThread();
  70. }
  71. //返回条件对象
  72. final ConditionObject newCondition() {
  73. return new ConditionObject();
  74. }
  75. final Thread getOwner() {
  76. return getState() == 0 ? null : getExclusiveOwnerThread();
  77. }
  78. final int getHoldCount() {
  79. return isHeldExclusively() ? getState() : 0;
  80. }
  81. final boolean isLocked() {
  82. return getState() != 0;
  83. }
  84. /**
  85. * Reconstitutes the instance from a stream (that is, deserializes it).
  86. */
  87. private void readObject(java.io.ObjectInputStream s)
  88. throws java.io.IOException, ClassNotFoundException {
  89. s.defaultReadObject();
  90. setState(0); // reset to unlocked state
  91. }
  92. }
  93. static final class NonfairSync extends Sync {
  94. private static final long serialVersionUID = 7316153563782823691L;
  95. /**
  96. * Performs lock. Try immediate barge, backing up to normal
  97. * acquire on failure.
  98. */
  99. @ReservedStackAccess
  100. final void lock() {
  101. /**
  102. * 第一步:直接尝试加锁
  103. * 与公平锁实现的加锁行为一个最大的区别在于,此处不会去判断同步队列(CLH队列)中
  104. * 是否有排队等待加锁的节点,上来直接加锁(判断state是否为0,CAS修改state为1)
  105. * ,并将独占锁持有者 exclusiveOwnerThread 属性指向当前线程
  106. * 如果当前有人占用锁,再尝试去加一次锁
  107. */
  108. if (compareAndSetState(0, 1))
  109. setExclusiveOwnerThread(Thread.currentThread());
  110. else
  111. //AQS定义的方法,加锁
  112. acquire(1);
  113. }
  114. /**
  115. * 父类AbstractQueuedSynchronizer.acquire()中调用本方法
  116. */
  117. protected final boolean tryAcquire(int acquires) {
  118. return nonfairTryAcquire(acquires);
  119. }
  120. }
  121. /**
  122. * 公平锁
  123. */
  124. static final class FairSync extends Sync {
  125. private static final long serialVersionUID = -3000897897090466540L;
  126. final void lock() {
  127. acquire(1);
  128. }
  129. /**
  130. * 重写aqs中的方法逻辑
  131. * 尝试加锁,被AQS的acquire()方法调用
  132. */
  133. protected final boolean tryAcquire(int acquires) {
  134. final Thread current = Thread.currentThread();
  135. int c = getState();
  136. if (c == 0) {
  137. /**
  138. * 与非公平锁中的区别,需要先判断队列当中是否有等待的节点
  139. * 如果没有则可以尝试CAS获取锁
  140. */
  141. if (!hasQueuedPredecessors() &&
  142. compareAndSetState(0, acquires)) {
  143. //独占线程指向当前线程
  144. setExclusiveOwnerThread(current);
  145. return true;
  146. }
  147. }
  148. else if (current == getExclusiveOwnerThread()) {
  149. int nextc = c + acquires;
  150. if (nextc < 0)
  151. throw new Error("Maximum lock count exceeded");
  152. setState(nextc);
  153. return true;
  154. }
  155. return false;
  156. }
  157. }
  158. /**
  159. * 默认构造函数,创建非公平锁对象
  160. */
  161. public ReentrantLock() {
  162. sync = new NonfairSync();
  163. }
  164. /**
  165. * 根据要求创建公平锁或非公平锁
  166. */
  167. public ReentrantLock(boolean fair) {
  168. sync = fair ? new FairSync() : new NonfairSync();
  169. }
  170. /**
  171. * 加锁
  172. */
  173. public void lock() {
  174. sync.lock();
  175. }
  176. /**
  177. * 尝试获去取锁,获取失败被阻塞,线程被中断直接抛出异常
  178. */
  179. public void lockInterruptibly() throws InterruptedException {
  180. sync.acquireInterruptibly(1);
  181. }
  182. /**
  183. * 尝试加锁
  184. */
  185. public boolean tryLock() {
  186. return sync.nonfairTryAcquire(1);
  187. }
  188. /**
  189. * 指定等待时间内尝试加锁
  190. */
  191. public boolean tryLock(long timeout, TimeUnit unit)
  192. throws InterruptedException {
  193. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  194. }
  195. /**
  196. * 尝试去释放锁
  197. */
  198. public void unlock() {
  199. sync.release(1);
  200. }
  201. /**
  202. * 返回条件对象
  203. */
  204. public Condition newCondition() {
  205. return sync.newCondition();
  206. }
  207. /**
  208. * 返回当前线程持有的state状态数量
  209. */
  210. public int getHoldCount() {
  211. return sync.getHoldCount();
  212. }
  213. /**
  214. * 查询当前线程是否持有锁
  215. */
  216. public boolean isHeldByCurrentThread() {
  217. return sync.isHeldExclusively();
  218. }
  219. /**
  220. * 状态表示是否被Thread加锁持有
  221. */
  222. public boolean isLocked() {
  223. return sync.isLocked();
  224. }
  225. /**
  226. * 是否公平锁?是返回true 否则返回 false
  227. */
  228. public final boolean isFair() {
  229. return sync instanceof FairSync;
  230. }
  231. /**
  232. * 获取持有锁的当前线程
  233. */
  234. protected Thread getOwner() {
  235. return sync.getOwner();
  236. }
  237. /**
  238. * 判断队列当中是否有在等待获取锁的Thread节点
  239. */
  240. public final boolean hasQueuedThreads() {
  241. return sync.hasQueuedThreads();
  242. }
  243. /**
  244. * 当前线程是否在同步队列中等待
  245. */
  246. public final boolean hasQueuedThread(Thread thread) {
  247. return sync.isQueued(thread);
  248. }
  249. /**
  250. * 获取同步队列长度
  251. */
  252. public final int getQueueLength() {
  253. return sync.getQueueLength();
  254. }
  255. /**
  256. * 返回Thread集合,排队中的所有节点Thread会被返回
  257. */
  258. protected Collection<Thread> getQueuedThreads() {
  259. return sync.getQueuedThreads();
  260. }
  261. /**
  262. * 条件队列当中是否有正在等待的节点
  263. */
  264. public boolean hasWaiters(Condition condition) {
  265. if (condition == null)
  266. throw new NullPointerException();
  267. if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
  268. throw new IllegalArgumentException("not owner");
  269. return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
  270. }
  271. }

ReentrantLock中的AbstractQueuedSynchronized的流程.png