入口
public static void main(String[] args) {ReentrantLock lock =new ReentrantLock();lock.lock();System.out.println("Test.main");lock.unlock();}
public ReentrantLock() {sync = new NonfairSync();}
ReentrantLock 无参构造器默认使用非公平同步器 NonfairSync 进行锁的相关操作, NonfairSync 是 ReentrantLock 的静态内部类,继承自抽象静态内部类 Sync ,而Sync又继承 AbstractQueuedSynchronizer ,
这个类是java并发包中关于Lock,CountdownLatch,semaphore等实现的基础。
static final class NonfairSync extends Syncabstract static class Sync extends AbstractQueuedSynchronizer
下面看下ReentrantLock的lock方法的实现
//ReenTrantLockpublic void lock() {sync.lock(); //委托给syn,调用sync的lock方法,默认sync = NonfairSync()}//下面是非公平和公平两种模式获取锁的异同// NonfairSync 一上来就直接去cas抢锁,抢到了,就设置排他模式,// 抢不到再调acquire(1),跟FairSyn一样//NonfairSyncfinal void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}//FairSynfinal void lock() {acquire(1);}
使用
compareAndSetState方法用CAS去设置状态,当获取锁成功后,调用setExclusiveOwnerThread方式设置当前线程独占此锁下面看下
compareAndSetState方法,这个比较简单,就是用UnSafe类型做原子的比较替换操作,这个方法是 属于AbstractQueuedSynchronizer// AbstractQueuedSynchronizerprotected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
当设置状态失败,表明有其他线程在争抢此锁,并且争抢成功了,此时将走else分支,调用
acquire(1)方法下面看acquire()方法的代码,这个方法中的操作是AQS的重点代码
aqs.acquire(int arg)
以独占(排他)的模式去尝试获取锁,如果成功获取锁,则返回;获取失败,则会线程阻塞,进入等待锁的队列,等待获取到锁的线程释放锁后通知
//以独占的方式取public final void acquire(int arg) {//tryAcquire是个钩子方法,由子类去实现 ,//NonfairSync会尝试获取锁,FairSync看有前置节点没,有排队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire(int arg)
接下来看一下ReentrantLock中静态类Syn默认子类NonFairSyn的实现方式
//NonfairSyncprotected final boolean tryAcquire(int acquires) {//调用父类Sync中的nonfairTryAcquire方法获取锁return nonfairTryAcquire(acquires);}
//Syncfinal boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();//获取父类AQS的state属性值,它是volatile修饰,这个属性是所有同步器的核心,就是由它控制锁//排他锁,共享锁,信号量等实现全部靠它int c = getState();//stat == 0 标识没有线程获取到锁if (c == 0) {//cas去获取锁,获取成功后,设置当前线程独占if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//stat != 0 标识有线程已经获取到锁了,然后获取霸占锁的线程是否是当前线程else if (current == getExclusiveOwnerThread()) {//如果是,则stat加1,同一个线程,可以多次获取锁int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}//如果是其他线程则返回falsereturn false;}
此时
acquire(int arg)的组合条件第一项tryAcquire(int arg)尝试获取一次锁执行完毕,如果成功获取到锁,就已经返回了,假定没有获取到锁,看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中的逻辑addWaiter(Node.EXCLUSIVE)
此方法就是把线程封装成一个Node实例,往等待锁的队列中添加此实例对象(当前线程)
private Node addWaiter(Node mode) {//创建一个新的Node节点,并传递当前线程以及独占模式两个参数Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {//如果等待锁的队列队尾不为空,则设置当前节点的前置节点为原队尾节点node.prev = pred;//cas方式设置刚创建的节点为队尾节点(入队),如果入队成功,直接返回if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//如果等待锁队列为空或者cas设置队尾失败,会调用enq()入队方法enq(node);return node;}
enq(Node node)入队
private Node enq(final Node node) {//死循环直到入队成功for (;;) {Node t = tail;//队列为空,需要初始化队列if (t == null) { // Must initialize//注意:初始化的队列首位Node节点是一个空节点,它没有封装任何线程if (compareAndSetHead(new Node()))tail = head;} else {//CAS直到入队node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
acquireQueued(Node node,int arg)
此方法用于
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//获取到前一个节点pfinal Node p = node.predecessor();//如果前置节点是头节点,则尝试获取锁,因为头节点可能是初始化的空节点if (p == head && tryAcquire(arg)) {//获取锁成功后,将当前节点设置为头节点,被将node的thread,prev设置为空//即头节点需是空节点,也就是持有锁的节点setHead(node);//原来的头节点的next节点设置null,因为头节点已经换成抢到锁的node节点了//原来的头节点p就是垃圾了,等待gc回收p.next = null; // help GCfailed = false;//获取锁成功,那么当前线程就不需要中断阻塞了return interrupted;}//如果前一个节点不是头节点或者获取锁失败,调用shouldParkAfterFailedAcquirefangfa//设置前置节点为可唤醒状态(waitStatus=Node.SIGNAL),并剔除前置为waitStatus==Node//.CANCAELLED的节点if (shouldParkAfterFailedAcquire(p, node) &&//阻塞当前线程//当线程被唤醒后,如果线程是被打断的,则设置interrutted为true//又开始for循环去获取锁,根据if条件,所以只能是第二节点(pre==head)才可能调用//tryAcquire()获取锁,所以,jvm唤醒线程应该是唤醒队首的线程parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)//这只可能异常的情况下才会执行到,获取到锁failed = true//没有获取锁,则线程阻塞,所以只可能没有获取到锁,但是过程异常了cancelAcquire(node);}}
shouldParkAfterFailedAcquire(Node pre,Node node)
此方法的作用是设置当前节点的前置节点的waitStatus为Node.SIGNAL状态,因为只有持有锁的线程释放锁后,通知等待锁的线程抢锁,被通知到的线程对应的Node的waitStatus必须为<=0,
- SIGNAL(-1):标识有后继节点马上进入阻塞,需要当前节点唤醒它,它标识的不是本节点的状态,而是后继节点的状态
- CANCELLED(1):当线程阻塞的timeout到期,或者线程被interrupt后,节点的状态变更为CANCELLED,当线程是此状态后,当再次调用阻塞API阻塞此线程时,此线程不会被阻塞
- CONDITION(-2):标识当前线程处于一个条件队列(condition queue)中,条件队列中的节点(线程)在转移到获取锁的同步队列前不会唤醒,当它转移到同步队列中等待锁时,waitStatus从-2变更为0
- PROPAGATE(-3):共享锁实现时用到,当共享锁释放锁,用来设置head节点,然后传递给其他节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 检查前置节点的waitStatus状态int ws = pred.waitStatus;//如果状态waitStatus是-1 标识前置节点的waitStatus已经被设置为需要唤醒,也就是//锁持有者释放锁后,需要唤醒一个等待锁的线程,这个被唤醒的线程的waitStatus需要是<=0//如果有后置节点进来,前置节点的waitstatus==-1 ,否则一直是0if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;//waitStatus == 1(Node.CANCELLED)标识此线程取消抢锁if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {//将队列前置节点waitStatus == 1的Node节点剔除队列node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*///将前置节点设置为Node.SIGNAL,设置成功后还需要调用者再调一次确保线程被阻塞前没有成为对首//因为队首节点可以抢锁了,避免不必要的阻塞线程compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
parkAndCheckInterrupt()
这个方法很简单,就是阻塞当前线程private final boolean parkAndCheckInterrupt() {//用到了LockSupport工具类,将线程阻塞//此线程要想唤醒,除非1,有其他线程调用LockSupport.unPark()方法,2 有线程将当前线程中断,//3,虚假唤醒LockSupport.park(this);return Thread.interrupted();}
cancelAcquire(Node node)
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;//设置node节点的线程为空node.thread = null;// Skip cancelled predecessors//取消的node清除出队列Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.//设置node的waitstatus为取消状态node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.//如果是队尾元素,则移出队列if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.//如果不是队尾元素,并且不是队首节点,且前置节点为SIGNAL(如果不是,设置为SIGNAL)//然后将前后节点连接起来int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {//如果是头节点,负责唤醒下一个节点unparkSuccessor(node);}//非队尾的node取消后,设置后继节点为自己node.next = node; // help GC}}
unparkSuccessor(Node node)
唤醒后续等待队列中队首的线程(如果没取消的话Node.waitStatus == Node.CANCENLLED),如果队首节点取消的话,则从队尾遍历最靠前的没有取消的线程,唤醒它。
注(细节):
如果后继节点更新完head节点的waitStatus为SIGNAL,然后再一次tryAcquire(1)没有获取到锁,然后在挂起线程前head节点释放锁,然后调用此方法唤醒后继节点,那就是LockSupport.unpark(t)先于LockSupport.park(this)执行,那么后继节点不会再阻塞。
这个方法也是lock.release(1)最终调用的方法
//通过上文分析可知node节点为head节点private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)//如果ws < 0 ,把head节点的waitststus更新为0compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;//如果下一个节点为空,或者处于取消状态,则从后往前遍历//找到最前面的等待唤醒的线程//为啥从后遍历??//cancelAcquire()方法中当取消的node为不是tail时,为了help gc 设置了node.next = node//所以遍历时不能使用next引用,得用prev引用,所以就得从后往前遍历if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//将之唤醒if (s != null)LockSupport.unpark(s.thread);}
在并发环境下,加锁和解锁需要以下三个部件的协调:
- 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
- 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程
- 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用
入队
线程1,调用lock(),获取到锁,此时队列尚未初始化,head和tail都还是null
线程2调用lock(),tryAcquire(1)失败,addWaiter(Node.EXCLUSIVE),接着调用enq(final Node node) 初始化队列
并将当前线程入队 ,见enq(Node node),接着在shouldParkAfterFailedAcquire方法中更新head.waitStatus=-1
线程3再进来
带有过期时间的lock()
tryLock(long timeout, TimeUnit unit)
//ReenTrantLockpublic boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}AQSpublic final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//上来就先获取锁,获取不到才走doAcquireNanosreturn tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}//AQSprivate boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {//setHead操作:// head = node;node.thread = null;node.prev = null;//当node获取锁时将自身设置head节点,此时Node的属性next还没设置为null?// 因为释放锁的时候还需要next引用找到下一个需要唤醒的节点setHead(node);p.next = null; // help GC 新的head节点接棒后,旧head最后一个next引用置空failed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)//区别是阻塞一定时间,被唤醒有两种方式,1,时间到,2时间未到,head节点唤醒LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
可以打断的lockInterruptily()
//前面过程都差不多,也是上来就tryAcquir(1),失败就走下面的逻辑private void doAcquireInterruptibly(int arg)throws InterruptedException {//加入队列final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//打断后抛异常throw new InterruptedException();}} finally {if (failed)//如果抛异常然后走这,取消当前节点,设置当前节点CANCELLED,踢出队列或者唤醒后继节点cancelAcquire(node);}}
阻塞在condition上
////ReenTrantLockpublic Condition newCondition() {return sync.newCondition();}//ReenTrantLock.Syncfinal ConditionObject newCondition() {return new ConditionObject();}//AQS//从ConditionObject 可以看出AQS实现的Condition,内部同样维护了一个队列//当多个线程调用condition的await()方法后,都放入此等待队列中public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;//其他代码...}
Condition.await()
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//添加节点到条件队列Node node = addConditionWaiter();//释放全部的信号/锁,这里以ReentrantLock.newCondition()为例,//因为lock是可重入的,所以stat可能不止是1,所以全部释放//并将释放前state的值保存savedState中int savedState = fullyRelease(node);int interruptMode = 0;// 这里退出循环有两种情况,之后再仔细分析// 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了// 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断while (!isOnSyncQueue(node)) {LockSupport.park(this);//假设当前线程被signal()唤醒,继续执行,且没有被打断(打断细节暂不考虑)//又进入while循环判断,此时跳出循环if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//进入获取锁,阻塞的死循环中了,获取锁,使用释放前锁持有的数量savedStateif (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {// 如果conditon queue的最后一个节点不是CONDITION状态,则把非CONDITION状态的节点// 全部踢出condition队列unlinkCancelledWaiters();t = lastWaiter;}//创建新CONDITION节点,新创建的Node节点的pre和next都是null,即不在等待锁的同步队列//它本身也是持有锁后才走到这的Node node = new Node(Thread.currentThread(), Node.CONDITION);//为啥这里往队列加元素没有用cas?//因为此方法要想执行,首先得获取到condition附着的锁,所以不需要casif (t == null)firstWaiter = node;//condition queue为空elset.nextWaiter = node; //放到队列尾部lastWaiter = node;return node;}//condition queue使用了Node节点的nextWaiter属性构成一个单向链表//踢出队列的动作就是将非CONDITION的的node的nextWaiter属性置空,将前一个CONDITION状态的node的//nextWaiter指向node的下一个节点(如果也需要踢出,则更换指向下下一个节点,依次类推)private void unlinkCancelledWaiters() {Node t = firstWaiter;//trail是遍历到最新到waitStatus==CONDITION(-2)的node,Node trail = null;//t初始为第一个节点,循环逻辑执行完毕后将t指向下一个节点,只要队列不为空,就一直循环while (t != null) {//把下一个节点先取出来Node next = t.nextWaiter;//比较当前节点t的waitStatus是否是-2if (t.waitStatus != Node.CONDITION) {//如果走到这,说明当前节点t不再需要等待condition条件,将他的nextWaiter置空t.nextWaiter = null;if (trail == null)//如果遍历到的节点都是需要踢出condition的队列的,会走到这,直到碰到第一个//真正需要等待condition的节点,将队列头引用firstWaiter指向它//这里是提前将队首引用指向下一个节点(当前节点肯定不是)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)//如果队列到尾了,就将队尾引用指向trail,它是最后一个等待的节点lastWaiter = trail;}else//走到这,说明当前节点是需要等待的节点,会赋给trailtrail = t;//t当前节点 这里把下一个节点赋给当前节点,然后继续循环判断t = next;}}// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,// 这个方法就是判断 node 是否已经移动到阻塞队列了final boolean isOnSyncQueue(Node node) {// 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到// 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中// 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了if (node.next != null)return true;// 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,//否则就是不在阻塞队列// 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。// AQS 的入队方法,首先设置的是 node.prev 指向 tail,// 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。return findNodeFromTail(node);}
condition.signal()
public final void signal() {//是否持有排他锁if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)//唤醒condition队列的第一个节点doSignal(first);}private void doSignal(Node first) {do {//如果队列中还有等待的node,则把下一个节点设置为头节点,如果没有,则把尾引用置空if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;//一上来就把队列头的节点给干掉了,因为它马上就被唤醒了,所以要踢出conndition的队列first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);//如果队列头唤醒失败,且队列中还有其他节点//则继续唤醒后继节点}final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*///CAS设置waitStatus,设置失败说明节点取消了if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*///熟悉的代码来了,将节点放入获取锁的同步队列,返回前一个节点Node p = enq(node);int ws = p.waitStatus;//如果前一个节点的waitSatus > 0 就是CANCELLED 或者设置前置节点SIGNAL失败//则将当前节点唤醒if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}

Condition总结:
1,在使用 condition 时,必须先持有相应的锁
2,ReentrantLock 实例可以通过调用多次 newCondition 产生多个Condition类型的 ConditionObject 的实例,每一个实例维护了一个自己的condition队列
3,调用condition的await()方法时,把当前线程包装成Node对象放入condition 队列,释放锁,等待其他线程signal()/signalAll()
4,当处于其他线程调用signal()/signalAll()唤醒 condition队列的node时,将其放入获取锁的同步队列
5,被唤醒的线程从阻塞处开始执行,尝试去抢锁,就进入抢不到锁,阻塞;抢到了锁,执行代码,释放锁,通知后置节点的逻辑了。
