java5之后,并发包提供了Lock接口来实现锁的功能,他提供了与synchronized类似的的功能,只是在使用时需要显示获取释放锁。
lock接口提供synchronized不具备的特性:
- 尝试非阻塞获取锁
- 能被中断获取锁
-
接口
public interface Lock {void lock();//获取锁void lockInterruptibly() throws InterruptedException;//会响应中断的获取锁boolean tryLock();//尝试获取锁boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //超时内获取锁。可被中断void unlock();//释放锁Condition newCondition();//获取等待通知组件}
构造器
public ReentrantLock() {sync = new NonfairSync(); //默认时非公平锁}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
获取锁
final void lock() {if (compareAndSetState(0, 1)) //抢占互斥资源setExclusiveOwnerThread(Thread.currentThread()); //抢到锁,保存当前线程elseacquire(1); //没有抢到锁}
compareAndSetState 是由AQS实现的:
protected final boolean compareAndSetState(int expect, int update) { //乐观锁// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update); //stateOffset 内存中的偏移量 与 expect(预期值比较) 相同就更新 为 update}
这里使用的是调用unfafe类的CAS乐观锁,stateOffset 是锁状态的偏移量。state为0 时,没有线程获取锁。与预期值 expect比较。如果相等说明无锁状态,将state该为1.当前线程获取锁成功,返回true。否则不改变state的值。返回false
如果获取锁失败,则线进入acquire()方法。public final void acquire(int arg) {if (!tryAcquire(arg) && //重入判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire(arg)重入判断
这里使用了策略模式,公平锁和非公平锁的实现不一致。默认是使用公平锁(详见构造器)
NonfairSync非公平锁
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();//当前线程int c = getState();//当锁的状态if (c == 0) {//等于0,说明锁已经释放,if (compareAndSetState(0, acquires)) {//cas抢占锁 。这是非公平锁的特性setExclusiveOwnerThread(current);//抢锁成功return true;}}else if (current == getExclusiveOwnerThread()) {//如果持有锁的线程就是当前线程,int nextc = c + acquires; //state+1 增加重入次数if (nextc < 0) // overflow 溢出throw new Error("Maximum lock count exceeded");setState(nextc); //设置重入次数return true;}return false;}
FairSync公平锁
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();//当前线程int c = getState(); //stateif (c == 0) {//无锁if (!hasQueuedPredecessors() && //是否有等等队列compareAndSetState(0, acquires)) {//cas占有锁setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {//判断重入int nextc = c + acquires;//重入次数+1if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
公平锁和非公平锁最大差异在于判断是否是重入前,发现持有锁的线程已经释放锁。非公平锁会cas去插队回去锁,公平锁者会判断有没有等待获取锁的队列,如果没有才去cas抢占锁。
addWaiter(Node.EXCLUSIVE)加入等待队列
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); //将线程包装成一个node// Try the fast path of enq; backup to full enq on failure 试试enq的快速路径;失败时备份到完整的enqNode pred = tail; //tail尾部节点if (pred != null) { //enq的快速路径node.prev = pred;//将当前节点的pre指向tailif (compareAndSetTail(pred, node)) { //cas 将tail节点指向 当前节点pred.next = node;//如果cas成功,将原本原来最后一个节点的next指向当前节点。return node;}}enq(node);//完整enqreturn node;}
先尝试快enq,失败的话,完整enq:
private Node enq(final Node node) {for (;;) { //自旋Node t = tail;if (t == null) { // Must initialize 必须先初始化,初始化结束进行下个自旋if (compareAndSetHead(new Node()))//cas为head创建一个新的节点tail = head; //tail也指向这个新节点} else {node.prev = t;//当前节点的prev指向tailif (compareAndSetTail(t, node)) {//cas让tail指向单前节点t.next = node;//cas成功,让原本指向tail的节点指向当前节点,否则下次自旋尝试return t;}}}}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))自旋检查直到挂起来
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//中断标识for (;;) {//自旋final Node p = node.predecessor();//获取pre节点,没找到抛出异常if (p == head && tryAcquire(arg)) { //p == head ,说明当前节点是排在队列中排第一个 检查锁已经被释放,并去cas获取锁setHead(node);//重新设置headp.next = null; // help GC 方便垃圾回收failed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&//是否需要挂起线程判断parkAndCheckInterrupt()) //挂起 当前线程阻塞在这里,等待被唤醒interrupted = true;//中断标识}} finally {if (failed) //线程被取消cancelAcquire(node);//回收节点}}
是否挂起判断逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前个节点的状态if (ws == Node.SIGNAL) //signal值为-1 运行被唤醒的状态/** This node has already set status asking a release* to signal it, so it can safely park. 这个节点已经设置了状态,要求释放信号给它,这样它就可以安全挂起了。*/return true;if (ws > 0) { //CANCELLED 取消的=1 移除掉/** Predecessor was cancelled. Skip over predecessors and* indicate retry. 前任被取消了。跳过前辈和显示重试。*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//小于0/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//cas将当前节点变成SIGNAL}return false;}private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //线程会阻塞在这里return Thread.interrupted(); //唤醒后。重置线程中断标识}
如果线程被取消执行取消操作
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null) //节点已经可以被回收return;node.thread = null; //方便GC回收线程// Skip cancelled predecessorsNode pred = node.prev;while (pred.waitStatus > 0)//前面节点也被取消node.prev = pred = pred.prev;//移除前面节点// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;//前节点的后节点。(可能已经不是当前节点了)// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.node.waitStatus = Node.CANCELLED;//将当前节点置为可回收状态// If we are the tail, remove ourselves.if (node == tail && compareAndSetTail(node, pred)) {//如果tail指向当前节点,cas让tail指向前节点compareAndSetNext(pred, predNext, null);//cas当前阶段的next为空} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head &&//前节点非头节点((ws = pred.waitStatus) == Node.SIGNAL ||//前节点是可被挂起状态(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//将前节点的wait状态设为signalpred.thread != null) {//前节点有线程Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);//cas前节点的next指向当前节点的next} else {unparkSuccessor(node);//如果是头节点的话,唤醒unpark该节点}node.next = node; // help GC}}
selfInterrupt();
static void selfInterrupt() {Thread.currentThread().interrupt(); //再次中断}
释放锁
public void unlock() {sync.release(1);//同步器释放锁}
public final boolean release(int arg) {if (tryRelease(arg)) { // 释放锁Node h = head;if (h != null && h.waitStatus != 0) //有等待的线程unparkSuccessor(h);//唤醒头节点的线程return true;}return false;}
释放锁:
protected final boolean tryRelease(int releases) {int c = getState() - releases; //一层一层减少重入次数if (Thread.currentThread() != getExclusiveOwnerThread())//当前线程不是抢占锁线程。抛出异常throw new IllegalMonitorStateException();boolean free = false;//释放释放成功标识if (c == 0) {//stete ==0 说明当前线程已经释放了资源free = true;setExclusiveOwnerThread(null);}setState(c); //释放锁的最后,写volatile变量state。利用了volatile的内存屏障特性return free;}
唤醒等待队列中的线程
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)//SIGNAL -1 CONDITION -2 PROPAGATE -3compareAndSetWaitStatus(node, ws, 0); //cas waitStatus 设置成0/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;//真正要被唤醒的线程if (s == null || s.waitStatus > 0) { //没有线程,或者线程被取消s = null;for (Node t = tail; t != null && t != node; t = t.prev)//从tail开始往前遍历,直到为空,获取当前节点if (t.waitStatus <= 0)//且线程没有被取消s = t; //赋给要被唤醒的节点}if (s != null)LockSupport.unpark(s.thread);//唤醒该节点}
