一、锁的可重入性
因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。“可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。
synchronized关键字,同样是可重入锁
二、类的继承层次
2.1、接口Lock
public interface Lock {
// 尝试去获得锁
// 如果锁不可用,当前线程会变得不可用,直到获得锁为止。(中途会忽略中断)
void lock();
// 尝试去获取锁,如果锁获取不到,线程将不可用
// 直到获取锁,或者被其他线程中断
// 线程在获取锁操作中,被其他线程中断,则会抛出InterruptedException异常,并且将中断标识清除。
void lockInterruptibly() throws InterruptedException;
// 锁空闲时返回true,锁不空闲是返回false
// 该方法不会引起当前线程阻塞
boolean tryLock();
// 在unit时间内成功获取锁,返回true
// 在unit时间内未成功获取锁,返回false
// 如果当前线程在获取锁操作中,被其他线程中断,则会抛出InterruptedException异常,并且将中断标识清除。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 获取一个绑定到当前Lock对象的Condition对象
// 获取Condition对象的前提是当前线程持有Lock对象
Condition newCondition();
}
}
2.2、lock()和lockInterruptibly()方法的区别
lock()方法类似于使用synchronized关键字加锁,如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。
lockInterruptibly()方法顾名思义,就是如果锁不可用,那么当前正在等待的线程是可以被中断的,这比synchronized关键字更加灵活。
2.3、实现ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
abstract void lock();
}
...
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
//默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
/**
*公平锁与非公平锁
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
三、锁的公平性与非公平性
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。
公平锁:是指线程在抢占锁失败后会进入一个等待队列,先进入队列的线程会先获得锁。公平性体现在先来先得。
非公平锁:是指线程抢占锁失败后会进入一个等待队列,但是这些等待线程谁能先获得锁不是按照先来先得的规则,而是随机的。不公平性体现在后来的线程可能先得到锁。
如果有很多线程竞争一把公平锁,系统的总体吞吐量(即速度很慢,常常极其慢)比较低,因为此时在线程调度上面的开销比较大。
原因是采用公平策略时,当一个线程释放锁时,需要先将等待队列中的线程唤醒。这个唤醒的调度过程是比较耗费时间的。如果使用非公平锁的话,当一个线程释放锁之后,可用的线程能立马获得锁,效率较高。默认设置的是非公平锁,其实是为了提高效率,减少线程切换。
四、锁实现的基本原理
4.1、实现锁的基本要素
1、需要一个state变量,标记该锁的状态
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The synchronization state.
*/
private volatile int state;//记录锁的状态,通过CAS修改状态的值
}
state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。
例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。
当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
当state>1时,说明该线程重入了该锁。
2、需要记录当前是哪个线程持有锁。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;//记录锁被那个线程持有
}
3、需要底层支持对一个线程进行阻塞或唤醒操作。
在Unsafe类中,提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。
有一个LockSupport的工具类,对这一对原语做了简单封装:
在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread t),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。尤其是unpark(Thread t),它实现了一个线程对另外一个线程的“精准唤醒”。前面讲到的wait()/notify(),notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。
4、需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要用到CAS。
在AQS中利用双向链表和CAS实现了一个阻塞队列。如下所示。
阻塞队列是整个AQS核心中的核心,下面做进一步的阐述。如图3-2所示,head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置。
初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。
4.2、公平锁和非公平锁的实现差异
1、ReentrantLock在公平性和非公平性上的实现差异。
非公平锁NonfairSync
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1))//非公平锁,一上来就争抢锁 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
公平锁FairSync
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() {//没有一上来就争抢锁 acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
2、acquire()方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
3、tryAcquire(..)虚函数
tryAcquire(..)是一个虚函数,也就是再次尝试拿锁,被NonfairSync与FairSync分别实现。
非公平锁实现tryAcquire
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
....
}
公平锁实现tryAcquire
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
这两段代码非常相似,唯一的区别是公平锁实现的代码多了一个if (!hasQueuedPredecessors())。什么意思呢?就是只有当c==0(没有线程持有锁),并且排在队列的第1个时(即当队列中没有其他线程的时候),才去抢锁,否则继续排队,这才叫“公平”!
4、acquireQueued(..)函数
目的是把线程放入阻塞队列,然后阻塞该线程。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter函数
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//尝试加入到队列尾部,如果不成功则执行下面的enq方法 pred.next = node; return node; } } enq(node);//enq内部会进行队列的初始化,建立一个新的空Node,然后不断尝试自选,直到把该Node加入队列尾部。 return node; }
就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。
addWaiter(..)
在addWaiter(..)函数把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(..)函数完成。线程一旦进入acquireQueued(..)就会被无限期阻塞,即使有其他线程调用interrupt()函数也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(..)返回。
注意:进入acquireQueued(..),该线程被阻塞。在该函数返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//被唤醒,如果自己在队列头部(自己的前一个节点是head指向的空节点),则尝试拿锁
setHead(node);//拿锁成功,出队列(即head指针前移一个节点),同时会把node的thread变量置为null,所以head还是指向一个空节点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//自己调用park阻塞自己
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先,acquireQueued(..)函数有一个返回值,表示什么意思呢?虽然该函数不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该函数会返回true;否则,返回false。基于这个返回值,才有了下面的代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
当acquireQueued(..)返回true 时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。
阻塞就发生在下面这个函数中:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程调用park()函数,自己把自己阻塞起来,直到被其他线程唤醒,该函数返回。park()函数返回有两种情况。
情况1:其他线程调用了unpark(Thread t)。
情况2:其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
也正因为LockSupport.park()可能被中断唤醒,acquireQueued(..)函数才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。
4.3、unlock()实现分析
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
release()里面做了两件事:tryRelease(..)函数释放锁;unparkSuccessor(..)函数唤醒队列中的后继者。
在上面的代码中有一个关键点要说明:因为是排他锁,只有已经持有锁的线程才有资格调用release(..),这意味着没有其他线程与它争抢。所以,在上面的tryRelease(..)函数中,对state值的修改,不需要CAS操作,直接减1即可。
但对于读写锁中的读锁,也就是releaseShared(..),就不一样了,见后续分析。
4.4、lockInterruptibly()实现分析
lock 不能被中断,lockInterruptibly()可以被中断,
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
acquireInterruptibly(..)也是AQS 的模板方法,里面的tryAcquire(..)分别被FairSync和NonfairSync实现,主要注意doAcquireInterruptibly(..)函数。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个函数返回。
4.5、tryLock()实现分析
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
tryLock()实现基于调用非公平锁的tryAcquire(..),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。
五、syncchronized和lock的区别
- syncchronized是一个内置的关键字,lock是一个java类
- syncchronized无法判断获取锁的状态,lock可以判断是否获取到了锁
- syncchronized会自动释放锁,lock必须要手动释放锁,如果不释放,会造成死锁
- syncchronized中如果线程1获取锁,其他线程会等待,lock就不一定会等待下去。
- syncchronized可重入锁,不可以中断,非公平。lock,可重入锁,可以中断,也可以调整公平或者非公平。