一、AQS
1.概述
AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
AQS是一个抽象类,所以不能直接实例化,当我们需要实现一个自定义锁的时候可以去继承AQS然后重写以下一些方法(默认抛出 UnsupportedOperationException)
tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
2.实现不可重入锁
自定义不可重入锁
```java package panw.AQS;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @Slf4j //自定义不可重入锁 public class MyLock implements Lock{ public static void main(String[] args) { MyLock myLock = new MyLock(); Thread t1 = new Thread(() -> { myLock.lock(); try { while (true){ log.debug(“t1”); Thread.sleep(1000000000000L); }
} catch (InterruptedException e) {throw new RuntimeException(e);} finally {myLock.unlock();}}, "t1");Thread t2 = new Thread(() -> {myLock.lock();try {log.debug("t1");Thread.sleep(1000000000000L);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {myLock.unlock();}}, "t2");t1.start();t2.start();}static class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int acquires) {if (acquires==1){if (compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());return true;}}return false;}@Overrideprotected boolean tryRelease(int acquires) {if (acquires==1){if (getState()==0){throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}return false;}protected Condition newCondition() {return new ConditionObject();}@Override//是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}}protected MySync mySync = new MySync();@Overridepublic void lock() {mySync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {mySync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return mySync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return mySync.tryAcquireNanos(1,unit.toNanos(time));}@Overridepublic void unlock() {mySync.release(0);}@Overridepublic Condition newCondition() {return mySync.newCondition();}
}
<a name="Y6bQC"></a>## 二、AQS源码解析<a name="KU4ge"></a>### 总结这部分是我debug源码完来进行的总结,可以先让大家了解一下整个AQS的结构,方便大家理解。<br />AQS类中有state属性来表示锁是否被占,还有两个重要属性:head、tail。这两个节点类型是内部类的Node节点,Node类有一个waitState来作为节点状态的标识,Node节点则是用来构建一个**双向链表的阻塞队列**和一个**单向链表的等待队列**。head和tail是用来构建**阻塞队列的,即下图:**<br /><br />还有一个内部类是ConditionObject,该内部类复用了Node类,并构造了**单向链表的等待队列。**<br /><a name="XMJCa"></a>### WaitStatus| SIGNAL | 值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,那么就会通知后继节点,让后继节点的线程能够运行 || --- | --- || CONDITION | 值为-2,节点在等待队列中,节点线程等待在Condition上,不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列转移到同步队列中,然后开始尝试对同步状态的获取 || PROPAGATE | 值为-3,表示下一次的共享式同步状态获取将会无条件的被传播下去 || CANCELLED | 值为1,由于超时或中断,该节点被取消。 节点进入该状态将不再变化。特别是具有取消节点的线程永远不会再次阻塞 || INITIAL | 值为0,初始状态 |<a name="LRiuP"></a>### 通过分析ReentrantLock来分析<a name="T1fM7"></a>#### 构造```javapublic ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
当在构造参数中添加一个true的布尔值就使用公平锁,否则就是非公平锁。那么公平锁和非公平锁是如何实现的呢?
我们来具体分析一下ReentrantLock的结构
从图中可以看出:
ReentrantLock有三个静态内部类 Sync、NonfairSync、FairSync。其中Sync继承自AbstractQueuedSynchronizer,NonfairSync、FairSync则继承Sync。
当调用lock方法时:
//ReentrantLock调用了Sync中的抽象方法lockpublic void lock() {sync.lock();}//Syncabstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock();...}
在NonfairSync、FairSync分别实现了该方法:
// FairSyncstatic final class FairSync extends Sync {final void lock() {acquire(1);}}// NonfairSyncstatic final class NonfairSync extends Sync {final void lock() {//首先对state进行原子操作,如果成功就将owner线程设置为当前线程if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}}
其中都调用了acquire(1),这是AbstractQueuedSynchronizer中的方法:
public final void acquire(int arg) {//如果没获取到锁则将其加入队列中,并打断该线程if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
而tryAcquire()方法在AbstractQueuedSynchronizer中并未实现,需要子类去进行重写:
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}}
tryAcquire
公平锁
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); //获取当前state值if (c == 0) { //锁没被其他线程获取if (!hasQueuedPredecessors() && //判断是否队列中是否有其他线程NodecompareAndSetState(0, acquires)) { //原子操作将state置为1setExclusiveOwnerThread(current); //将owner线程设置为当前线程return true;}}//锁已被其他线程获取else if (current == getExclusiveOwnerThread()) { //owner线程与当前线程一致int nextc = c + acquires; //state++if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc); //将新state写入,用于可重入锁计数return true;}return false;}//这是AbstractQueuedSynchronizer中hasQueuedPredecessors方法public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;//判断是否队列中是否有其他线程Nodereturn h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t && //判断是不是头尾为一个Node节点,是一个节点直接返回false((s = h.next) == null || s.thread != Thread.currentThread());//如果不是一个节点再看头节点的下一个节点是不是null,是null直接返回true//如果下一个节点不是null,判断该节点的Thread是不是当前线程}
非公平锁
protected final boolean tryAcquire(int acquires) {//调用Sync中的方法return nonfairTryAcquire(acquires);}//Sync中的方法final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
从上可以看出,公平锁与非公平锁的acquire方法实现基本一样,只是公平锁在判断时调用了hasQueuedPredecessors方法,判断是否队列中是否有其他线程Node。
这里将上面的AbstractQueuedSynchronizer中的方法再次拿下来好看一些
public final void acquire(int arg) {//如果没获取到锁则将其加入队列中,并打断该线程if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
此时acquire方法如果执行成功则acquire方法结束,逐层返回,否则则执行 &&后的语句,先来看看addWaiter。
addWaiter
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;//第一次执行方法需要进行执行enq进行初始化,之后则不需要if (pred != null) {node.prev = pred; //在链表尾节点添加元素if (compareAndSetTail(pred, node)) { //用cas将尾节点置为新节点pred.next = node; //将新节点放在队列尾部return node;}}enq(node);return node;}
enq
该方法是第一次进入进行队列初始化
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize 必须初始化if (compareAndSetHead(new Node())) //第一次循环,创建一个NULL节点为头尾节点tail = head;} else {//第二次循环,将该节点放在队列尾部,并且返回node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
acquireQueued
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //得到前置节点if (p == head && tryAcquire(arg)) { //先判断前节点是不是头节点,是就再进行一次抢锁setHead(node); //枪锁成功就将该节点设置为头节点p.next = null; // help GC 将头节点删除failed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && //第一次执行将waitStatus置为SIGNAL即 -1,然后再进行循环parkAndCheckInterrupt()) //第二次执行 shouldParkAfterFailedAcquire返回true,//执行parkAndCheckInterruptinterrupted = true;}} finally {if (failed)cancelAcquire(node); //默认操作失败,进行处理,下面具体来看}}
shouldParkAfterFailedAcquire
该方法来判断是否需要park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; //第一次仅需该方法 waitStatus的默认值为0所以执行该代码块的12行if (ws == Node.SIGNAL) //第二次进入,条件成立,返回truereturn true;if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 第一次将waitStatus设置为-1}return false;}
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
cancelAcquire
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null; //将当前节点线程置为nullNode pred = node.prev; //获取前节点while (pred.waitStatus > 0) //如果前节点的waitStatus是 CANCELLED 即 1node.prev = pred = pred.prev; //跳过前面已经为CANCELLED的节点Node predNext = pred.next;node.waitStatus = Node.CANCELLED; //将该节点waitStatus置为 1// 如果是尾节点,直接删除if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null); //将前节点的下一个节点置为null} else {int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next); //将要删除节点的后节点链接到该节点的前节点} else {unparkSuccessor(node);}node.next = node; // help GC}}
当调用unlock方法时:
//ReentrantLock调用了Sync中的方法releasepublic void unlock() {sync.release(1);}
release
//Sync中的releasepublic final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); //如果头节点不为null,status不为0return true;}return false;}
tryRelease
//ReentrantLock中的方法tryReleaseprotected final boolean tryRelease(int releases) {int c = getState() - releases; //state - 1if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null); //如果state为0设置owner为null,free为true}setState(c); //写回state,如果不为0则未解锁return free;}
unparkSuccessor
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); //把头结点的waitStatus置为0Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev) //如果头节点的下一个节点为null或者waitStatus为CANCELLED 即1//那么就一直往后找到waitStatus<0的节点if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); //找到节点,将线程unpark}
当调用await方法时:
这时会执行AbstractQueuedSynchronizer中内部类ConditionObject中的await方法
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); //创建等待队列int savedState = fullyRelease(node); //将同步队列中的锁释放int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //进入阻塞队列interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
addConditionWaiter
private Node addConditionWaiter() {Node t = lastWaiter; //获取尾节点if (t != null && t.waitStatus != Node.CONDITION) { //如果不为空且waitStatus不为-2unlinkCancelledWaiters(); //删除已取消的节点t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建新节点if (t == null)firstWaiter = node; //如果第一次创建则将节点加在头elset.nextWaiter = node; //之后则加在尾节点后lastWaiter = node; //将该节点置为尾节点return node;}
当调用signal方法时:
public final void signal() {if (!isHeldExclusively()) //判断owner线程是不是当前线程throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
doSignal
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null; //上面三行是将首节点拿出等待队列} while (!transferForSignal(first) && //将节点从等待队列转移到同步队列(first = firstWaiter) != null);}
transferForSignal
final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); //enq上面已经分析过,用来对阻塞队列进行初始化,返回前节点int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果前节点未被删除,把前节点waitStatus置为-1,让该节点线程unparkLockSupport.unpark(node.thread);return true;}
我们就对AQS源码讨论这么多,相信你已经把握大部分了,其他的就由自己研究把!
三、读写锁(待写)
3.1ReentrantReadWriteLock
当读操作远远高于写操作时候,这时候使用读写锁可以让读-读并发,提高性能。
