Lock接口
Lock接口提供的synchronized关键字不具备的主要特性
| 特性 | 描述 |
|---|---|
| 尝试非阻塞的获取锁 | 当前线程尝试获取锁,如果这一时刻没有被其他线程获取到,则成功获取并持有锁 |
| 能被中断的获取锁 | 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放 |
| 超时获取锁 | 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回 |
队列同步器(AQS)
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。其中内部状态state,等待队列的头节点head和尾节点head,都是通过volatile修饰,保证了多线程之间的可见性。
package java.util.concurrent.locks;import java.util.concurrent.TimeUnit;import java.util.ArrayList;import java.util.Collection;import java.util.Date;import sun.misc.Unsafe;public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {private static final long serialVersionUID = 7373984972572414691L;/*** 创建一个初始同步状态为零的新实例*/protected AbstractQueuedSynchronizer() { }/*** 等待队列节点类。** 等待队列是“CLH”(Craig、Landin 和 * Hagersten)锁定队列的变体。 CLH 锁通常用于自旋锁。* 相反,我们将它们用于阻塞同步器,但使用相同的基本策略即在其节点的前身中保存有关线程* 的一些控制信息。每个节点中的“状态”字段跟踪线程是否应该阻塞。*/static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;1. waitStatusa. CANCELLED1:waitStatus value to indicate thread has cancelled(表示当前节点的线程由于超时或中断而被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。)b. SIGNAL-1:waitStatus value to indicate successor's thread needs unparking(当前节点的线程如果释放了同步状态或者被取消,需要通知后继节点)c. CONDITION-2:waitStatus value to indicate thread is waiting on condition(当前节点等待在Condition上,当其他线程对Condition调用了singal()方法时,这个节点会加入到对同步状态的获取中)d. PROPAGATE-3:waitStatus value to indicate the next acquireShared should unconditionallypropagate(当前节点属于共享类型,下一次共享式同步状态获取将会无条件的被传播下去)volatile int waitStatus;/*** 前驱节点*/volatile Node prev;/*** 后继几点*/volatile Node next;/*** 当前节点所在的线程*/volatile Thread thread;/*** 如果当前节点是共享的,则值为SHARED常量,如果当前节点是独占的,则表示等待条件的下一个节点。*/Node nextWaiter;/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}/*** Returns previous node, or throws NullPointerException if null.* Use when predecessor cannot be null. The null check could* be elided, but is present to help the VM.** @return the predecessor of this node*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}/*** 等待队列的头部,延迟初始化。除初始化外,只能通过 setHead 方法进行修改。* 注意:如果 head 存在,它的 waitStatus 保证不会是 CANCELLED*/private transient volatile Node head;/*** 等待队列的尾部,延迟初始化。仅通过方法 enq 修改以添加新的等待节点。*/private transient volatile Node tail;/*** 同步状态。*/private volatile int state;/*** 返回同步状态的当前值。 此操作具有volatile读取的内存语义。*/protected final int getState() {return state;}/*** 设置同步状态的值。 此操作具有volatile写入的内存语义*/protected final void setState(int newState) {state = newState;}/*** 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值* 此操作具有volatile读取和写入的内存语义。*/protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}// Queuing utilities/*** 旋转比使用定时停车更快的纳秒数。粗略的估计就足以以非常短的超时提高响应能力。*/static final long spinForTimeoutThreshold = 1000L;/*** 将节点插入队列,必要时进行初始化*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}/*** 为当前线程和给定模式创建和排队节点** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** 将队列头设置为节点,从而出队。仅由获取方法调用。* 为了 GC 并抑制不必要的信号和遍历,还清空未使用的字段。** @param node the node*/private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}/*** 唤醒节点的后继者(如果存在)** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}/*** 共享模式的释放操作——发出后继信号并确保传播。* 注:独占模式下,如果需要信号,释放就相当于调用head的unparkSuccessor。*/private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}/*** 设置队列头,并检查后继者是否可能在共享模式下等待* 如果设置了传播 > 0 或PROPAGATE 状态,则传播。** @param node the node* @param propagate the return value from a tryAcquireShared*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}// Utilities for various versions of acquire/*** 取消正在进行的获取尝试。** @param node the node*/private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;node.waitStatus = Node.CANCELLED;if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}/*** 检查并更新未能获取的节点的状态。如果线程应该阻塞,则返回 true。* 这是所有采集循环中的主要信号控制。要求 pred == node.prev。*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}/*** 中断当前线程的便捷方法。*/static void selfInterrupt() {Thread.currentThread().interrupt();}/*** 停车后检查是否中断的便捷方法*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}/*** 以独占不间断模式获取已在队列中的线程。由条件等待方法和获取使用。*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** 以独占可中断模式获取。*/private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}/*** 以独占定时模式获取。*/private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}/*** 以共享不间断模式获取。* @param arg the acquire argument*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** 以共享可中断模式获取。*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}/*** 以共享定时模式获取。*/private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}// Main exported methods/*** 尝试以独占模式获取。此方法应查询对象的状态是否允许以独占模式获取它,如果是则获取它。* <p>此方法总是由执行获取的线程调用。如果此方法报告失败,* 则获取方法可以将线程排入队列,如果它尚未排入队列,* 直到由某个其他线程的释放发出信号。这可以用于实现方法 {@link Lock#tryLock()}。*/protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}/*** 尝试设置状态以反映独占模式下的发布*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** Attempts to acquire in shared mode. This method should query if* the state of the object permits it to be acquired in the shared* mode, and if so to acquire it.** <p>This method is always invoked by the thread performing* acquire. If this method reports failure, the acquire method* may queue the thread, if it is not already queued, until it is* signalled by a release from some other thread.** <p>The default implementation throws {@link* UnsupportedOperationException}.** @param arg the acquire argument. This value is always the one* passed to an acquire method, or is the value saved on entry* to a condition wait. The value is otherwise uninterpreted* and can represent anything you like.* @return a negative value on failure; zero if acquisition in shared* mode succeeded but no subsequent shared-mode acquire can* succeed; and a positive value if acquisition in shared* mode succeeded and subsequent shared-mode acquires might* also succeed, in which case a subsequent waiting thread* must check availability. (Support for three different* return values enables this method to be used in contexts* where acquires only sometimes act exclusively.) Upon* success, this object has been acquired.* @throws IllegalMonitorStateException if acquiring would place this* synchronizer in an illegal state. This exception must be* thrown in a consistent fashion for synchronization to work* correctly.* @throws UnsupportedOperationException if shared mode is not supported*/protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}/*** Attempts to set the state to reflect a release in shared mode.** <p>This method is always invoked by the thread performing release.** <p>The default implementation throws* {@link UnsupportedOperationException}.** @param arg the release argument. This value is always the one* passed to a release method, or the current state value upon* entry to a condition wait. The value is otherwise* uninterpreted and can represent anything you like.* @return {@code true} if this release of shared mode may permit a* waiting acquire (shared or exclusive) to succeed; and* {@code false} otherwise* @throws IllegalMonitorStateException if releasing would place this* synchronizer in an illegal state. This exception must be* thrown in a consistent fashion for synchronization to work* correctly.* @throws UnsupportedOperationException if shared mode is not supported*/protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}/*** Returns {@code true} if synchronization is held exclusively with* respect to the current (calling) thread. This method is invoked* upon each call to a non-waiting {@link ConditionObject} method.* (Waiting methods instead invoke {@link #release}.)** <p>The default implementation throws {@link* UnsupportedOperationException}. This method is invoked* internally only within {@link ConditionObject} methods, so need* not be defined if conditions are not used.** @return {@code true} if synchronization is held exclusively;* {@code false} otherwise* @throws UnsupportedOperationException if conditions are not supported*/protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}/*** Acquires in exclusive mode, aborting if interrupted.* Implemented by first checking interrupt status, then invoking* at least once {@link #tryAcquire}, returning on* success. Otherwise the thread is queued, possibly repeatedly* blocking and unblocking, invoking {@link #tryAcquire}* until success or the thread is interrupted. This method can be* used to implement method {@link Lock#lockInterruptibly}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}/*** Attempts to acquire in exclusive mode, aborting if interrupted,* and failing if the given timeout elapses. Implemented by first* checking interrupt status, then invoking at least once {@link* #tryAcquire}, returning on success. Otherwise, the thread is* queued, possibly repeatedly blocking and unblocking, invoking* {@link #tryAcquire} until success or the thread is interrupted* or the timeout elapses. This method can be used to implement* method {@link Lock#tryLock(long, TimeUnit)}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.* @param nanosTimeout the maximum number of nanoseconds to wait* @return {@code true} if acquired; {@code false} if timed out* @throws InterruptedException if the current thread is interrupted*/public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}/*** Acquires in shared mode, ignoring interrupts. Implemented by* first invoking at least once {@link #tryAcquireShared},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquireShared} until success.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquireShared} but is otherwise uninterpreted* and can represent anything you like.*/public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}/*** Acquires in shared mode, aborting if interrupted. Implemented* by first checking interrupt status, then invoking at least once* {@link #tryAcquireShared}, returning on success. Otherwise the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted.* @param arg the acquire argument.* This value is conveyed to {@link #tryAcquireShared} but is* otherwise uninterpreted and can represent anything* you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}/*** Attempts to acquire in shared mode, aborting if interrupted, and* failing if the given timeout elapses. Implemented by first* checking interrupt status, then invoking at least once {@link* #tryAcquireShared}, returning on success. Otherwise, the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted or the timeout elapses.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquireShared} but is otherwise uninterpreted* and can represent anything you like.* @param nanosTimeout the maximum number of nanoseconds to wait* @return {@code true} if acquired; {@code false} if timed out* @throws InterruptedException if the current thread is interrupted*/public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// Queue inspection methods/*** Queries whether any threads are waiting to acquire. Note that* because cancellations due to interrupts and timeouts may occur* at any time, a {@code true} return does not guarantee that any* other thread will ever acquire.** <p>In this implementation, this operation returns in* constant time.** @return {@code true} if there may be other threads waiting to acquire*/public final boolean hasQueuedThreads() {return head != tail;}/*** Queries whether any threads have ever contended to acquire this* synchronizer; that is if an acquire method has ever blocked.** <p>In this implementation, this operation returns in* constant time.** @return {@code true} if there has ever been contention*/public final boolean hasContended() {return head != null;}/*** Returns the first (longest-waiting) thread in the queue, or* {@code null} if no threads are currently queued.** <p>In this implementation, this operation normally returns in* constant time, but may iterate upon contention if other threads are* concurrently modifying the queue.** @return the first (longest-waiting) thread in the queue, or* {@code null} if no threads are currently queued*/public final Thread getFirstQueuedThread() {// handle only fast path, else relayreturn (head == tail) ? null : fullGetFirstQueuedThread();}/*** Version of getFirstQueuedThread called when fastpath fails*/private Thread fullGetFirstQueuedThread() {/** The first node is normally head.next. Try to get its* thread field, ensuring consistent reads: If thread* field is nulled out or s.prev is no longer head, then* some other thread(s) concurrently performed setHead in* between some of our reads. We try this twice before* resorting to traversal.*/Node h, s;Thread st;if (((h = head) != null && (s = h.next) != null &&s.prev == head && (st = s.thread) != null) ||((h = head) != null && (s = h.next) != null &&s.prev == head && (st = s.thread) != null))return st;/** Head's next field might not have been set yet, or may have* been unset after setHead. So we must check to see if tail* is actually first node. If not, we continue on, safely* traversing from tail back to head to find first,* guaranteeing termination.*/Node t = tail;Thread firstThread = null;while (t != null && t != head) {Thread tt = t.thread;if (tt != null)firstThread = tt;t = t.prev;}return firstThread;}/*** Returns true if the given thread is currently queued.** <p>This implementation traverses the queue to determine* presence of the given thread.** @param thread the thread* @return {@code true} if the given thread is on the queue* @throws NullPointerException if the thread is null*/public final boolean isQueued(Thread thread) {if (thread == null)throw new NullPointerException();for (Node p = tail; p != null; p = p.prev)if (p.thread == thread)return true;return false;}/*** Returns {@code true} if the apparent first queued thread, if one* exists, is waiting in exclusive mode. If this method returns* {@code true}, and the current thread is attempting to acquire in* shared mode (that is, this method is invoked from {@link* #tryAcquireShared}) then it is guaranteed that the current thread* is not the first queued thread. Used only as a heuristic in* ReentrantReadWriteLock.*/final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}/*** Queries whether any threads have been waiting to acquire longer* than the current thread.** <p>An invocation of this method is equivalent to (but may be* more efficient than):* <pre> {@code* getFirstQueuedThread() != Thread.currentThread() &&* hasQueuedThreads()}</pre>** <p>Note that because cancellations due to interrupts and* timeouts may occur at any time, a {@code true} return does not* guarantee that some other thread will acquire before the current* thread. Likewise, it is possible for another thread to win a* race to enqueue after this method has returned {@code false},* due to the queue being empty.** <p>This method is designed to be used by a fair synchronizer to* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.* Such a synchronizer's {@link #tryAcquire} method should return* {@code false}, and its {@link #tryAcquireShared} method should* return a negative value, if this method returns {@code true}* (unless this is a reentrant acquire). For example, the {@code* tryAcquire} method for a fair, reentrant, exclusive mode* synchronizer might look like this:** <pre> {@code* protected boolean tryAcquire(int arg) {* if (isHeldExclusively()) {* // A reentrant acquire; increment hold count* return true;* } else if (hasQueuedPredecessors()) {* return false;* } else {* // try to acquire normally* }* }}</pre>** @return {@code true} if there is a queued thread preceding the* current thread, and {@code false} if the current thread* is at the head of the queue or the queue is empty* @since 1.7*/public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}// Instrumentation and monitoring methods/*** Returns an estimate of the number of threads waiting to* acquire. The value is only an estimate because the number of* threads may change dynamically while this method traverses* internal data structures. This method is designed for use in* monitoring system state, not for synchronization* control.** @return the estimated number of threads waiting to acquire*/public final int getQueueLength() {int n = 0;for (Node p = tail; p != null; p = p.prev) {if (p.thread != null)++n;}return n;}/*** Returns a collection containing threads that may be waiting to* acquire. Because the actual set of threads may change* dynamically while constructing this result, the returned* collection is only a best-effort estimate. The elements of the* returned collection are in no particular order. This method is* designed to facilitate construction of subclasses that provide* more extensive monitoring facilities.** @return the collection of threads*/public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list;}/*** Returns a collection containing threads that may be waiting to* acquire in exclusive mode. This has the same properties* as {@link #getQueuedThreads} except that it only returns* those threads waiting due to an exclusive acquire.** @return the collection of threads*/public final Collection<Thread> getExclusiveQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {if (!p.isShared()) {Thread t = p.thread;if (t != null)list.add(t);}}return list;}/*** Returns a collection containing threads that may be waiting to* acquire in shared mode. This has the same properties* as {@link #getQueuedThreads} except that it only returns* those threads waiting due to a shared acquire.** @return the collection of threads*/public final Collection<Thread> getSharedQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {if (p.isShared()) {Thread t = p.thread;if (t != null)list.add(t);}}return list;}/*** Returns a string identifying this synchronizer, as well as its state.* The state, in brackets, includes the String {@code "State ="}* followed by the current value of {@link #getState}, and either* {@code "nonempty"} or {@code "empty"} depending on whether the* queue is empty.** @return a string identifying this synchronizer, as well as its state*/public String toString() {int s = getState();String q = hasQueuedThreads() ? "non" : "";return super.toString() +"[State = " + s + ", " + q + "empty queue]";}// Internal support methods for Conditions/*** Returns true if a node, always one that was initially placed on* a condition queue, is now waiting to reacquire on sync queue.* @param node the node* @return true if is reacquiring*/final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;if (node.next != null) // If has successor, it must be on queuereturn true;/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it. It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/return findNodeFromTail(node);}/*** Returns true if node is on sync queue by searching backwards from tail.* Called only when needed by isOnSyncQueue.* @return true if present*/private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}/*** Transfers node, if necessary, to sync queue after a cancelled wait.* Returns true if thread was cancelled before being signalled.** @param node the node* @return true if cancelled before the node was signalled*/final boolean transferAfterCancelledWait(Node node) {if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}/** If we lost out to a signal(), then we can't proceed* until it finishes its enq(). Cancelling during an* incomplete transfer is both rare and transient, so just* spin.*/while (!isOnSyncQueue(node))Thread.yield();return false;}/*** Invokes release with current state value; returns saved state.* Cancels node and throws exception on failure.* @param node the condition node for this wait* @return previous sync state*/final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}// Instrumentation methods for conditions/*** Queries whether the given ConditionObject* uses this synchronizer as its lock.** @param condition the condition* @return {@code true} if owned* @throws NullPointerException if the condition is null*/public final boolean owns(ConditionObject condition) {return condition.isOwnedBy(this);}/*** Queries whether any threads are waiting on the given condition* associated with this synchronizer. Note that because timeouts* and interrupts may occur at any time, a {@code true} return* does not guarantee that a future {@code signal} will awaken* any threads. This method is designed primarily for use in* monitoring of the system state.** @param condition the condition* @return {@code true} if there are any waiting threads* @throws IllegalMonitorStateException if exclusive synchronization* is not held* @throws IllegalArgumentException if the given condition is* not associated with this synchronizer* @throws NullPointerException if the condition is null*/public final boolean hasWaiters(ConditionObject condition) {if (!owns(condition))throw new IllegalArgumentException("Not owner");return condition.hasWaiters();}/*** Returns an estimate of the number of threads waiting on the* given condition associated with this synchronizer. Note that* because timeouts and interrupts may occur at any time, the* estimate serves only as an upper bound on the actual number of* waiters. This method is designed for use in monitoring of the* system state, not for synchronization control.** @param condition the condition* @return the estimated number of waiting threads* @throws IllegalMonitorStateException if exclusive synchronization* is not held* @throws IllegalArgumentException if the given condition is* not associated with this synchronizer* @throws NullPointerException if the condition is null*/public final int getWaitQueueLength(ConditionObject condition) {if (!owns(condition))throw new IllegalArgumentException("Not owner");return condition.getWaitQueueLength();}/*** Returns a collection containing those threads that may be* waiting on the given condition associated with this* synchronizer. Because the actual set of threads may change* dynamically while constructing this result, the returned* collection is only a best-effort estimate. The elements of the* returned collection are in no particular order.** @param condition the condition* @return the collection of threads* @throws IllegalMonitorStateException if exclusive synchronization* is not held* @throws IllegalArgumentException if the given condition is* not associated with this synchronizer* @throws NullPointerException if the condition is null*/public final Collection<Thread> getWaitingThreads(ConditionObject condition) {if (!owns(condition))throw new IllegalArgumentException("Not owner");return condition.getWaitingThreads();}/*** Condition implementation for a {@link* AbstractQueuedSynchronizer} serving as the basis of a {@link* Lock} implementation.** <p>Method documentation for this class describes mechanics,* not behavioral specifications from the point of view of Lock* and Condition users. Exported versions of this class will in* general need to be accompanied by documentation describing* condition semantics that rely on those of the associated* {@code AbstractQueuedSynchronizer}.** <p>This class is Serializable, but all fields are transient,* so deserialized conditions have no waiters.*/public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/public ConditionObject() { }// Internal methods/*** Adds a new waiter to wait queue.* @return its new wait node*/private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}/*** Unlinks cancelled waiter nodes from condition queue.* Called only while holding lock. This is called when* cancellation occurred during condition wait, and upon* insertion of a new waiter when lastWaiter is seen to have* been cancelled. This method is needed to avoid garbage* retention in the absence of signals. So even though it may* require a full traversal, it comes into play only when* timeouts or cancellations occur in the absence of* signals. It traverses all nodes rather than stopping at a* particular target to unlink all pointers to garbage nodes* without requiring many re-traversals during cancellation* storms.*/private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}// public methods/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}/*** Moves all threads from the wait queue for this condition to* the wait queue for the owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}/*** Implements uninterruptible condition wait.* <ol>* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* </ol>*/public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();}/** For interruptible waits, we need to track whether to throw* InterruptedException, if interrupted while blocked on* condition, versus reinterrupt current thread, if* interrupted while blocked waiting to re-acquire.*//** Mode meaning to reinterrupt on exit from wait */private static final int REINTERRUPT = 1;/** Mode meaning to throw InterruptedException on exit from wait */private static final int THROW_IE = -1;/*** Checks for interrupt, returning THROW_IE if interrupted* before signalled, REINTERRUPT if after signalled, or* 0 if not interrupted.*/private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}/*** Throws InterruptedException, reinterrupts current thread, or* does nothing, depending on mode.*/private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}/*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final long awaitNanos(long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);final long deadline = System.nanoTime() + nanosTimeout;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {transferAfterCancelledWait(node);break;}if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return deadline - System.nanoTime();}/*** Implements absolute timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/public final boolean awaitUntil(Date deadline)throws InterruptedException {long abstime = deadline.getTime();if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (System.currentTimeMillis() > abstime) {timedout = transferAfterCancelledWait(node);break;}LockSupport.parkUntil(this, abstime);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;}/*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/public final boolean await(long time, TimeUnit unit)throws InterruptedException {long nanosTimeout = unit.toNanos(time);if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);final long deadline = System.nanoTime() + nanosTimeout;boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {timedout = transferAfterCancelledWait(node);break;}if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);return !timedout;}// support for instrumentation/*** Returns true if this condition was created by the given* synchronization object.** @return {@code true} if owned*/final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {return sync == AbstractQueuedSynchronizer.this;}/*** Queries whether any threads are waiting on this condition.* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.** @return {@code true} if there are any waiting threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/protected final boolean hasWaiters() {if (!isHeldExclusively())throw new IllegalMonitorStateException();for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION)return true;}return false;}/*** Returns an estimate of the number of threads waiting on* this condition.* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.** @return the estimated number of waiting threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/protected final int getWaitQueueLength() {if (!isHeldExclusively())throw new IllegalMonitorStateException();int n = 0;for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION)++n;}return n;}/*** Returns a collection containing those threads that may be* waiting on this Condition.* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.** @return the collection of threads* @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/protected final Collection<Thread> getWaitingThreads() {if (!isHeldExclusively())throw new IllegalMonitorStateException();ArrayList<Thread> list = new ArrayList<Thread>();for (Node w = firstWaiter; w != null; w = w.nextWaiter) {if (w.waitStatus == Node.CONDITION) {Thread t = w.thread;if (t != null)list.add(t);}}return list;}}/*** Setup to support compareAndSet. We need to natively implement* this here: For the sake of permitting future enhancements, we* cannot explicitly subclass AtomicInteger, which would be* efficient and useful otherwise. So, as the lesser of evils, we* natively implement using hotspot intrinsics API. And while we* are at it, we do the same for other CASable fields (which could* otherwise be done with atomic field updaters).*/private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}/*** CAS head field. Used only by enq.*/private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}/*** CAS tail field. Used only by enq.*/private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}/*** CAS waitStatus field of a node.*/private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);}/*** CAS next field of a node.*/private static final boolean compareAndSetNext(Node node,Node expect,Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);}}

独占式同步状态获取与释放

分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况。
重入锁
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。 这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。
实现重进入:重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞
- 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该
ReentrantLock中同步器的实现:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();/*** 通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的* 线程再次请求,则将同步状态值进行增强并返回true,表示同步状态成功*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}/*** 该锁被获取了n次,那么释放就需要n次,前n-1次调用tryRelease方法必须返回false* 当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功*/protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}}
公平锁与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//多了hasQueuedPredecessors方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该 方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释 放锁之后才能继续获取锁。
在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。可以看出,公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
读写锁
之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock
| 特性 | 说明 |
|---|---|
| 公平性选择 | 支持非公平和公平的所获取方式。吞吐量还是非公平优于公平 |
| 可重入 | 该锁支持重进入,已读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁 |
| 锁降级 | 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁 |
ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级
- 读写状态的设计:读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写
- 写锁的获取与释放:该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞
- 读锁的获取与释放:读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从Java 5到Java 6变得复杂许多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。
- 锁降级:锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
LockSupport工具
LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具。LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Threadthread)方法来唤醒一个被阻塞的线程。Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开。
LockSupport提供的阻塞和唤醒方法:
| 方法名称 | 描述 |
|---|---|
| void park() | 阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回 |
| void parkNanos() | 阻塞当前线程。最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回 |
| void parkUntil(long deadline) | 阻塞当前线程,知道deadline时间 |
| void unpark(Thread thread) | 唤醒处于阻塞状态的线程thread |
Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如果不加说明均指的是ConditionObject。
- 等待队列:等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点 的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部 。在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列加入等待队列

- 等待:调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
- 通知:调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

