AQS是jdk并发包java.util.concurrent下绝大部分工具类实现的基础,比如条件队列,阻塞队列,独占锁,共享锁等。
AQS具备特性
- 独占/共享
- 可重入
- 允许中断
- 公平/非公平
- 阻塞等待
ReentrantLock使用方法非常简单 只需要在需要加锁的逻辑前调用lock() unlock() 即可进行加减锁。
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock(); // 加锁
try {
todo();
}catch (Exception e) {
log.error();
}finally {
lock.unlock(); // 解锁
}
}).start();
new Thread(() -> {
lock.lock();
try {
todo();
}catch (Exception e) {
log.error();
}finally {
lock.unlock(); // 解锁
}
}).start();
ReentrantLock对比synchronized
- �synchronized是JVM层面实现的,ReentrantLock是JDK层面的锁。
- synchronized只有非公平,ReentrantLock有公平/非公平两种。
- synchronized不可被中断,ReentrantLock#lockInterruptibly()是可中断的。
- synchronized不可获取加锁状态,ReentrantLock#isLocked()是可以获取锁状态的。
- 发生异常时synchronized会自动释放锁,ReentrantLock需通过finally来显示释放。
- synchronized加锁方式单一,ReentrantLock可通过tryLock()快速获取锁加锁结构,以及设置可等待时长等更为灵活。
既然ReentrantLock这么强大 那我们来看看他是怎么实现的吧
看源码呢 带着问题去读 会更有效果
ReentrantLock构造方法有两种,通过构造方法我们可以获取公平/非公平锁,默认为非公平。
AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。公平锁与非公平锁都是基于AQS实现的。
�
图1
加锁时首先会调用一个acquire()
通过tryAcquire()我们会找到父类AQS的实现方法nonfairTryAcquire()
图2
�首先会获取我们的线程与state。
这个state是我们需要关注的点。它也是可ReentrantLock实现可重入、共享锁的关键。
�
�state初始化为0 所以上边的逻辑我们只需要先关注 == 0的逻辑即可
-》图2。compareAndSetState即为我们常见的CAS逻辑。CAS可以通过JVM层面来保证我们操作的原子性。这个方法可以把acquires赋值给state并返回boolean,即如果state当前状态为0(没有被其他线程所修改的情况)则当前线程可将其更改为1。
此时我们假设当前线程为thread1且修改state状态成功。
则可通过setExclusiveOwnerThread()设置当前锁拥有者为此线程。
-》图2。所示此时thread1 获取锁并返回true。持有锁并加锁成功。
如果thread1再次进入另一个加锁逻辑会怎样呢。此时state会进行+1来记录thread1 进行了两次加锁。通过state
的状态我们可以看出state与ReentrantLock的可重入特性紧密相关。
-》图2。如果此时thread2进入加锁逻辑,由于之前thread1先对state进行了状态的更改。此时只能返回false。
-》图1。此时会执行addWaiter(Node.EXCLUSIVE)方法
在看后边逻辑之前,我们先了解下AQS
AQS有两种资源模式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
基于MESA管程模型AQS还有两种队列
- 同步等待队列:主要用于维护获取锁失败的线程。同步队列为双向链表。
- 条件等待队列:用于维护调研await()释放锁后的线程,当调用signal()唤醒后会把条件队列的线程节点全部移动到同步队列中,等待再次获取锁。条件队列为单向链表。
此Node为AQS实现的同步队列
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 共享
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 独占
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
}
�创建节点后 会通过循环来初始化同步等待队列,并使thread2入队返回当前node节点
此时生成的队列状态为
�之后会调用acquireQueued()首先进行判断当前thread2节点的prev是否为Head节点。如果是Head节点,则会再次进行获取锁逻辑。因为此时刚入队的thread2还未被park。可以再次尝试来降低线程切换带来的性能损耗。
�图3
此时如果获取锁成功则会把头节点制空,然后当前thread2节点变为新的Head节点
如果加锁失败则shouldParkAfterFailedAcquire(p, node)使Head节点的waitStatus变为-1,表示后边的下一个节点为可唤醒状态。并通过parkAndCheckInterrupt()对thread2进行park。
至此加锁逻辑分析完毕。