自旋:
- 缺点:耗费cpu资源,没有竞争到锁的线程会一直占用cpu资源进行cas操作,假如一个线程获得锁后要花费n秒处理业务逻辑,娜另外一个线程就会白白花费n秒的cpu资源。
思路:让得不到锁的线程让出cpu
Object类中的wait和notify方法
- wait和notify方法必须在同步代码块中,即必须先拿到对象锁才能wait和notify。底层是线程wait之后进入了waitSet队列,然后等待别人唤醒去和entryList中阻塞的队列抢锁(非公平)
- notify方法在wait方法之前执行不能唤醒之后的wait线程
- juc包中的condition接口的await方法和notify方法
- 效果同上
- lockSupport可以阻塞当前线程和唤醒指定被阻塞的线程:park与unpark
}
<a name="Tyl2K"></a>## 1.1 常量字段```javastatic final Node SHARED = new Node(); // 标识共享锁节点static final Node EXCLUSIVE = null; // 标识排它锁节点// 以下常量,都是waitStatus字段可能的值static final Node SHARED = new Node(); // 标识共享锁节点static final Node EXCLUSIVE = null; // 标识排它锁节点// 以下常量,都是waitStatus字段可能的值static final int CANCELLED = 1; // 取消状态,也就是不想申请或释放了static final int SIGNAL = -1; // 等待通知状态,当节点变为这个状态,唤醒后继节点static final int CONDITION = -2; // 条件状态,await/signal 场景下使用static final int PROPAGATE = -3; // 传播状态,传播共享状态
1.2 变量字段
volatile int waitStatus; // 用于标识每一个等待节点状态volatile Node prev; // 前置节点指针volatile Node next; // 后继节点指针volatile Thread thread; // 持有的线程对象,可以理解为每个node代表一个线程Node nextWaiter; // 等待队列下一个节点指针(Condition中使用)
1.3 方法
// 判断是否为申请共享锁的节点;final boolean isShared() {return nextWaiter == SHARED;}// 获取前置节点,如果为null抛出异常final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 构造方法Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
2.AQS(abstract queuee synchronizer)
2.1 AQS成员变量
private transient volatile Node head;private transient volatile Node tail;private volatile int state; // 同步器状态,它的值决定了是否持有锁,通过CAS自旋完成对state值的修改// 0表示没人,自由状态;>=1表示有人占用,进入队列等着
2.2 抽象方法
AQS中包含5个未实现方法,也就是等待子类来实现个性化逻辑的,他们是:
protected boolean tryAcquire(int arg); // 尝试获取排他锁protected boolean tryRelease(int arg); // 尝试释放排他锁protected int tryAcquireShared(int arg); // 尝试获取共享锁protected boolean tryReleaseShared(int arg); // 尝试释放共享锁protected boolean isHeldExclusively(); // 判断当前线程是否持有排他锁
由这5个方法可知,AQS中涉及了排它锁和共享锁两类的操作,每种锁又包含申请和释放两类操作流程,每个流程中又涉及到多个实例方法和抽象方法调用。
在AQS中,方法分为可中断的、不可中断的、同步的、带超时时间的同步的,目前整理的流程是“不可中断的同步的”。
2.3 结构
虚拟双向队列。将请求资源的线程封装成队列的节点,通过CAS、自旋以及LockSupport.park()方式,维护state变量的状态,使并发达到同步的控制效果。state+CLH
3. ReentrantLock
3.1 加锁
3.1.1 公平锁
final void lock() {acquire(1);}// AQS的方法,public final void acquire(int arg) {// tryAcquire 进入1 ,addWaiter进入2, acquireQueued进入3// tryAcquire拿到了锁就不会执行下面的// 没拿到锁;进入addWaiter(加入等待队列)方法(标识排他锁)(把当前线程封装成为node)if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 如果是被打断的,acquireQueued方法会返回true,设置打断标记,让自己知道是被打断唤醒的selfInterrupt();}// 1.尝试获取锁protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// 拿到线程当前的状态cint c = getState();// c=0,没人占用锁,我要去上锁.(上一个人恰好处理完解锁,我恰好来上锁)if (c == 0) {// hasQueuedPredecessors 进入1.1/*** 通过对比源码发现,公平锁比非公平锁多了这块代码: !hasQueuedPredecessors()* hasQueuedPredecessors() 是做什么呢?就是判断当前同步队列中是否存在节点,如果存在节点呢,* 就返回true,由于前面有个 !,那么就是false,再根据 && 逻辑运算符的特性,不会继续执行了;** tryAcquire()方法直接返回false,后面的逻辑就和非公平锁的一致了,就是创建Node节点,并将* 节点加入到同步队列尾; 公平锁:发现当前同步队列中存在节点,有线程在自己前面已经申请可锁,那* 自己就得乖乖的向后面排队去。** 友情提示:在生活中,我们也需要按照先来后到去排队,保证素质; 还有就是怕你们不排队被别人打了。*/// 我前面的等待队列中没人排队(我是第一个),有人排队我也要排队if (!hasQueuedPredecessors() &&// 设置值为1compareAndSetState(0, acquires)) {// 将该线程赋给当前锁对象(独占锁)setExclusiveOwnerThread(current);return true;}}// 线程必须是当前存入的,重入;如果不是当前线程,不进。else if (current == getExclusiveOwnerThread()) {// 重入,把state的值+1int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// AQS方法 1.1 判断队列中有没有节点在排队,没人排队返回true,有人排队返回falsepublic final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}// AQS 方法 2.加入等待队列(把当前线程封装成为node)// 第一次添加节点的时候会new一个双向链表等待队列,返回当前节点private Node addWaiter(Node mode) {// 把当前线程封装成为node,mode为独占锁Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 队列为空,需要初始化队列,不会进(第一次队列还未初始化);if (pred != null) {// 队列不为空,进入队列中,将自己设置为tail节点node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 2.1 加入队列enq(node);return node;}// AQS方法 2.1 进入队列尾部private Node enq(final Node node) {for (;;) { // 自旋Node t = tail;// 如果尾结点是空的,先初始化;new一个哑结点,放在队头if (t == null) { // Must initialize// new哑结点,并将头尾指针指向这个哑结点if (compareAndSetHead(new Node()))tail = head;} else {// 再添加一个当前的节点(自己)node.prev = t;// 把自己设置为队尾节点if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// AQS方法3.// 判断我之前的节点是park,那么我也park;如果上一个节点不是park,竞争锁// 自旋过程,每个节点来的时候每次进入自旋都会判断我能不能拿到锁,如果不能拿到锁,就先把上一个节点// 的waitStatus改成-1,自己进入阻塞状态final boolean acquireQueued(final Node node, int arg) {// 异常1:这个结点自己不想等了boolean failed = true;try {// 异常2:排队的过程被打断了boolean interrupted = false;// 自旋过程,每一次自旋都会去尝试获得一次锁,获得失败才会走下面的流程自旋for (;;) {// 拿到上一个节点pfinal Node p = node.predecessor();// 如果上一个节点p是head(即自己是进入等待队列自己是第二个节点,排队中的第一个节点)// unpark拿到锁之后会进入这个方法if (p == head && tryAcquire(arg)) {// 获取到锁,就把自己变成head(哑结点),并清空当前存的线程,这个时候如果不是队尾,waitStatus就是-1setHead(node);// 把前面的p节点gcp.next = null; // help GC// 正常结束,没有被取消failed = false;// 如果被打断,返回true;不被打断,返回falsereturn interrupted;}// 上一个节点不是head(自己不是第二个节点)或者锁还是被占用着,进入3.1 shouldParkAfterFailedAcquire:// 判断上一个节点p是否需要唤醒当前节点// 如果是-1,进入3.2 parkAndCheckInterrupt 则将自己park住,等待上一个节点唤醒// 如果不是-1,设置成-1,并进行下一次自旋// 整个流程:第一趟:将哨兵节点的值变成-1,第二趟:返回true,然后将自己park住(暂停执行,等待恢复执行)// 如果是被打断的,parkAndCheckInterrupt方法会返回true,interrupted就置为true表示自己是被打断的if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}//AQS 方法 3.1 判断前一个节点的waitStatus值,如果是-1,返回true;如果是1,取消;如果是其他,将值改成-1,返回false进行下次自旋private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;// 如果是-1,需要唤醒下个节点,返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 1表示线程取消判断if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/// 否则,修改这个值为-1compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// AQS方法 park住自己private final boolean parkAndCheckInterrupt() {// 线程挂起,程序不会向下执行LockSupport.park(this);// 根据park的api描述,程序会在以下三种情况向下执行:// 1.被unpark// 2.被中断(interrupt)// 3.其他不合逻辑的返回// 因上述三种情况程序执行到此处,返回当前线程的中断状态,并清空中断状态// 如果由于被中断,该方法会返回truereturn Thread.interrupted();}
3.1.2 非公平锁的不同点
final void lock() {// 先竞争,竞争失败再排队if (compareAndSetState(0, 1))// 设置占用线程为当前线程,独占锁setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 不用在乎自己是不是队首。if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
3.2 解锁
没有公平不公平之分
public void unlock() {sync.release(1);}//AQS方法public final boolean release(int arg) {// 尝试释放锁 进入1.1if (tryRelease(arg)) {Node h = head;// 队列不为空if (h != null &&// 且waitStatus == Node.SIGNAL 才需要 unpark(如果是0,那么他是队尾,就不用唤醒了)h.waitStatus != 0)// 唤醒下一个线程 进入1.2unparkSuccessor(h);return true;}return false;}//AQS方法 1.1protected final boolean tryRelease(int releases) {// 更改c的状态值,getState表示当前重入锁的数量值int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 如果c为0,表示没有人占用锁了(无重入),只有 state 减为 0, 才释放成功if (c == 0) {free = true;// 将独占该锁的线程置为nullsetExclusiveOwnerThread(null);}setState(c);return free;}//AQS方法 1.2 唤醒线程private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;// 更改waitStatus状态为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 拿到头结点的下个节点sNode s = node.next;// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 下个节点s不为null,唤醒下一个节点s// unpark后,从之前park的地方执行,即parkAndCheckInterrupt()方法if (s != null)LockSupport.unpark(s.thread);}
3.3 可打断原理
- 不可打断模式:
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了(阻塞状态被打断会清除打断标记)。
// Sync 继承自 AQSstatic final class NonfairSync extends Sync {// ...public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {// 如果打断状态为 trueselfInterrupt();}}static void selfInterrupt() {// 重新产生一次中断,(让线程知道自己是被打断阻塞而唤醒的,因为interrupt打断阻塞线程后会清除打断标记)这时候线程是如果正常运行的状态,那么不是处于sleep等状态,interrupt方法就不会报错Thread.currentThread().interrupt();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 再次循环拿到了锁if (p == head && tryAcquire(arg)) {setHead(node);p.next = null;// 更改failed为失败状态failed = false;// 获得锁后, 返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 如果是因为 interrupt 被唤醒, 返回打断状态为 trueinterrupted = true;}}} finally {// 只有获得锁出现异常,才会执行该方法(自己实现的AQS类可能会出异常)if (failed)// 取消加锁cancelAcquire(node);}}private final boolean parkAndCheckInterrupt() {// 如果打断标记已经是 true, 则 park 会失效LockSupport.park(this);// unpark之后执行,判断阻塞是否被别人打断// interrupted 会清除打断标记return Thread.interrupted();}}
可打断模式: ```java static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())throw new InterruptedException();// 如果没有获得到锁, 进入 ㈠if (!tryAcquire(arg))doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 在 park 过程中如果被 interrupt 会进入此// 这时候抛出异常, 而不会再次进入 for (;;)throw new InterruptedException();}}} finally {if (failed)cancelAcquire(node);}
} }
``` 区别在于不可打断模式是将打断标记置为true,再次进入for循环(在AQS队列中);可打断模式直接抛出异常,不会进入for循环。
