一、AQS
1.概述
AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
AQS是一个抽象类,所以不能直接实例化,当我们需要实现一个自定义锁的时候可以去继承AQS然后重写以下一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
2.实现不可重入锁
自定义不可重入锁
```java package panw.AQS;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j //自定义不可重入锁 public class MyLock implements Lock{ public static void main(String[] args) { MyLock myLock = new MyLock(); Thread t1 = new Thread(() -> { myLock.lock(); try { while (true){ log.debug(“t1”); Thread.sleep(1000000000000L); }
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
myLock.unlock();
}
}, "t1");
Thread t2 = new Thread(() -> {
myLock.lock();
try {
log.debug("t1");
Thread.sleep(1000000000000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
myLock.unlock();
}
}, "t2");
t1.start();
t2.start();
}
static class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (acquires==1){
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
if (acquires==1){
if (getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
protected Condition newCondition() {
return new ConditionObject();
}
@Override
//是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
protected MySync mySync = new MySync();
@Override
public void lock() {
mySync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
mySync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return mySync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return mySync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
mySync.release(0);
}
@Override
public Condition newCondition() {
return mySync.newCondition();
}
}
<a name="Y6bQC"></a>
## 二、AQS源码解析
<a name="KU4ge"></a>
### 总结
这部分是我debug源码完来进行的总结,可以先让大家了解一下整个AQS的结构,方便大家理解。<br />AQS类中有state属性来表示锁是否被占,还有两个重要属性:head、tail。这两个节点类型是内部类的Node节点,Node类有一个waitState来作为节点状态的标识,Node节点则是用来构建一个**双向链表的阻塞队列**和一个**单向链表的等待队列**。head和tail是用来构建**阻塞队列的,即下图:**<br />![](https://cdn.nlark.com/yuque/0/2022/jpeg/28810082/1655179484404-7ba1a1dd-216b-424b-a983-34621c23a158.jpeg)<br />还有一个内部类是ConditionObject,该内部类复用了Node类,并构造了**单向链表的等待队列。**<br />![](https://cdn.nlark.com/yuque/0/2022/jpeg/28810082/1655179768730-a9432a10-a14b-4c6b-af11-f10a1a445b3d.jpeg)
<a name="XMJCa"></a>
### WaitStatus
| SIGNAL | 值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,那么就会通知后继节点,让后继节点的线程能够运行 |
| --- | --- |
| CONDITION | 值为-2,节点在等待队列中,节点线程等待在Condition上,不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列转移到同步队列中,然后开始尝试对同步状态的获取 |
| PROPAGATE | 值为-3,表示下一次的共享式同步状态获取将会无条件的被传播下去 |
| CANCELLED | 值为1,由于超时或中断,该节点被取消。 节点进入该状态将不再变化。特别是具有取消节点的线程永远不会再次阻塞 |
| INITIAL | 值为0,初始状态 |
<a name="LRiuP"></a>
### 通过分析ReentrantLock来分析
<a name="T1fM7"></a>
#### 构造
```java
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
当在构造参数中添加一个true的布尔值就使用公平锁,否则就是非公平锁。那么公平锁和非公平锁是如何实现的呢?
我们来具体分析一下ReentrantLock的结构
从图中可以看出:
ReentrantLock有三个静态内部类 Sync、NonfairSync、FairSync。其中Sync继承自AbstractQueuedSynchronizer,NonfairSync、FairSync则继承Sync。
当调用lock方法时:
//ReentrantLock调用了Sync中的抽象方法lock
public void lock() {
sync.lock();
}
//Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
...
}
在NonfairSync、FairSync分别实现了该方法:
// FairSync
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
// NonfairSync
static final class NonfairSync extends Sync {
final void lock() {
//首先对state进行原子操作,如果成功就将owner线程设置为当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
其中都调用了acquire(1)
,这是AbstractQueuedSynchronizer中的方法:
public final void acquire(int arg) {
//如果没获取到锁则将其加入队列中,并打断该线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
而tryAcquire()
方法在AbstractQueuedSynchronizer中并未实现,需要子类去进行重写:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
}
tryAcquire
公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //获取当前state值
if (c == 0) { //锁没被其他线程获取
if (!hasQueuedPredecessors() && //判断是否队列中是否有其他线程Node
compareAndSetState(0, acquires)) { //原子操作将state置为1
setExclusiveOwnerThread(current); //将owner线程设置为当前线程
return true;
}
}
//锁已被其他线程获取
else if (current == getExclusiveOwnerThread()) { //owner线程与当前线程一致
int nextc = c + acquires; //state++
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); //将新state写入,用于可重入锁计数
return true;
}
return false;
}
//这是AbstractQueuedSynchronizer中hasQueuedPredecessors方法
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//判断是否队列中是否有其他线程Node
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && //判断是不是头尾为一个Node节点,是一个节点直接返回false
((s = h.next) == null || s.thread != Thread.currentThread());
//如果不是一个节点再看头节点的下一个节点是不是null,是null直接返回true
//如果下一个节点不是null,判断该节点的Thread是不是当前线程
}
非公平锁
protected final boolean tryAcquire(int acquires) {
//调用Sync中的方法
return nonfairTryAcquire(acquires);
}
//Sync中的方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从上可以看出,公平锁与非公平锁的acquire方法实现基本一样,只是公平锁在判断时调用了hasQueuedPredecessors方法,判断是否队列中是否有其他线程Node。
这里将上面的AbstractQueuedSynchronizer中的方法再次拿下来好看一些
public final void acquire(int arg) {
//如果没获取到锁则将其加入队列中,并打断该线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此时acquire方法如果执行成功则acquire方法结束,逐层返回,否则则执行 &&
后的语句,先来看看addWaiter。
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//第一次执行方法需要进行执行enq进行初始化,之后则不需要
if (pred != null) {
node.prev = pred; //在链表尾节点添加元素
if (compareAndSetTail(pred, node)) { //用cas将尾节点置为新节点
pred.next = node; //将新节点放在队列尾部
return node;
}
}
enq(node);
return node;
}
enq
该方法是第一次进入进行队列初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 必须初始化
if (compareAndSetHead(new Node())) //第一次循环,创建一个NULL节点为头尾节点
tail = head;
} else {
//第二次循环,将该节点放在队列尾部,并且返回
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //得到前置节点
if (p == head && tryAcquire(arg)) { //先判断前节点是不是头节点,是就再进行一次抢锁
setHead(node); //枪锁成功就将该节点设置为头节点
p.next = null; // help GC 将头节点删除
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //第一次执行将waitStatus置为SIGNAL即 -1,然后再进行循环
parkAndCheckInterrupt()) //第二次执行 shouldParkAfterFailedAcquire返回true,
//执行parkAndCheckInterrupt
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //默认操作失败,进行处理,下面具体来看
}
}
shouldParkAfterFailedAcquire
该方法来判断是否需要park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //第一次仅需该方法 waitStatus的默认值为0所以执行该代码块的12行
if (ws == Node.SIGNAL) //第二次进入,条件成立,返回true
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 第一次将waitStatus设置为-1
}
return false;
}
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null; //将当前节点线程置为null
Node pred = node.prev; //获取前节点
while (pred.waitStatus > 0) //如果前节点的waitStatus是 CANCELLED 即 1
node.prev = pred = pred.prev; //跳过前面已经为CANCELLED的节点
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED; //将该节点waitStatus置为 1
// 如果是尾节点,直接删除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); //将前节点的下一个节点置为null
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); //将要删除节点的后节点链接到该节点的前节点
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
当调用unlock方法时:
//ReentrantLock调用了Sync中的方法release
public void unlock() {
sync.release(1);
}
release
//Sync中的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //如果头节点不为null,status不为0
return true;
}
return false;
}
tryRelease
//ReentrantLock中的方法tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //state - 1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); //如果state为0设置owner为null,free为true
}
setState(c); //写回state,如果不为0则未解锁
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); //把头结点的waitStatus置为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //如果头节点的下一个节点为null或者waitStatus为CANCELLED 即1
//那么就一直往后找到waitStatus<0的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //找到节点,将线程unpark
}
当调用await方法时:
这时会执行AbstractQueuedSynchronizer
中内部类ConditionObject
中的await
方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //创建等待队列
int savedState = fullyRelease(node); //将同步队列中的锁释放
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //进入阻塞队列
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter; //获取尾节点
if (t != null && t.waitStatus != Node.CONDITION) { //如果不为空且waitStatus不为-2
unlinkCancelledWaiters(); //删除已取消的节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建新节点
if (t == null)
firstWaiter = node; //如果第一次创建则将节点加在头
else
t.nextWaiter = node; //之后则加在尾节点后
lastWaiter = node; //将该节点置为尾节点
return node;
}
当调用signal方法时:
public final void signal() {
if (!isHeldExclusively()) //判断owner线程是不是当前线程
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; //上面三行是将首节点拿出等待队列
} while (!transferForSignal(first) && //将节点从等待队列转移到同步队列
(first = firstWaiter) != null);
}
transferForSignal
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //enq上面已经分析过,用来对阻塞队列进行初始化,返回前节点
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果前节点未被删除,把前节点waitStatus置为-1,让该节点线程unpark
LockSupport.unpark(node.thread);
return true;
}
我们就对AQS源码讨论这么多,相信你已经把握大部分了,其他的就由自己研究把!
三、读写锁(待写)
3.1ReentrantReadWriteLock
当读操作远远高于写操作时候,这时候使用读写锁可以让读-读
并发,提高性能。