1. AQS队列同步器
1.1. AQS 队列同步器基础
1.1.1 AQS的组成及内部原理
- AQS在
CountDownLatch
、Semaphore
、CountDownLatch
等工具内都有使用,全称是:AbstractQueuedSynchronizer
是一个抽象类,在《Java并发编程艺术》一书中称之为队列同步器,是用来构建锁或者其他同步组件的基础框架,通过内置FIFO队列来完成获取资源线程的派对工作; - 作者是大佬 **Doug lea **
我们可以大致看一下我们锁用到的这些并发控制的工具类和锁的内部实现
Semaphore
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
......
ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
......
CountDownLatch
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
......
由上源码我们可以看到,里面都有一个内部类,
Sync
继承自AbstractQueuedSynchronizer
- 它的主要作用就是同个state的加减和队列来管理线程的执行。
- 看源码可知:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch中都有继承自AQS的Sync及其子类。
- AQS 同步器主要通过继承的方式,来实现它的抽象方法来实现管理同步器状态。《并发编程的艺术》
- 主要实现中组成就包括状态管理部分(包括
state
以及需要我们实现的tryAcquire
、tryRelease
等相关),阻塞管理部分(包括acquire()
方法、release()
方法、LockSupport
工具、FIFO队列等)
1.1.2 状态管理
state:状态及其设置方法
/**
* The synchronization state.
*/
private volatile int state;
......
// 获取同步状态
protected final int getState() {
return state;
}
// 设置当前同步状态
protected final void setState(int newState) {
state = newState;
}
// CAS更新同步状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
我们可以看到里面使用了原子类也使用到了的Unsafe类,使用CAS来更新state。
- 状态在不同的工具中的意思一般不同,如:CountDownLatch中做计数器,Semaphore中做许可数量,ReentrantLock中做重入次数。都与计数相关。并且在AQS中管理时也以state作为阻塞和唤醒相关操作的判断依据。主要通过调用tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared来实现独占式同步状态和共享是同步状态获取和释放。需要工具自行实现。
- AQS源码中是直接抛出错误,所以需要继承然后实现。如下的AQS中的
tryReleaseShared
方法。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
- 获取同步状态在各类中state的变化是不同的。
- 获取同步状态:
- 在
ReentrantLock
中,就是获取锁。state+1
- 在
Semaphore
中就是acquire
获取许可,state-1
,当state==0
就会阻塞 - 在
CountDownLatch
中就是await
方法,就是等待state==0
- 在
- 释放同步状态:
- 释放操作不会阻塞
- 在
ReentrantLock
中就是unlock
方法调用release(1)
对应state-1
- 在
Semaphore
中就是realease
,也是state-1
CountDownLatch
中就是countDown
方法,也是state-1
- 一般情况下,实现类都会实现
tryAcquire
、tryRelease
、tryAcquireShared
、tryReleaseShared
相关方法,以对应各个类的需求
1.1.2 阻塞管理
- 阻塞管理部分主要是管理线程的阻塞和唤醒,主要通过acquire和release等方法来实现,而这类方法在Java中都是固定的实现。核心方法就是如下两个,其他的拓展时间处理和共享处理都差不多。根本逻辑还是差不都的
- acquire()方法:独占式获取同步状态。tryAcquire就是工具或者我们自定义的工具类实现的逻辑。这个地方需要注意一点,独占式的是返回true或false,而共享式的则是返回数字。具体是为什么后面再说。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 两个判断主要是第一个是尝试获取同步状态,如果失败,那就调用第二个方法addWaiter加入同步队列尾部中等待,Node.EXCLUSIVE是独占式的表明。
- release()方法:独占式释放同步队列,这个方法在尝试释放成果后,利用LockSupport工具去唤醒FIFO队列中的头结点的线程(不为空的情况下)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- LockSupport工具:它定义了一组公共静态方法,这些方法提供了线程的阻塞和唤醒功能。是构建同步组件的基础工具,主要方法是park()负责阻塞当前线程和unpark(Thread)唤醒传入的线程,此外还有时间相关的阻塞方法。我们可以看下unparkSuccessor()方法中的代码
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 可以看到前面是状态操作,后面就是调用LockSupport的方法。
FIFO队列:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
等待队列就是用来存放没有竞争到锁的等待线程的,AQS会对这个队列进行管理,AQS中定义了头结点和尾节点。
- FIFO队列是一个双向链表;队列头节点是当前拿到锁的线程;在AQS中保存了这个队列的头尾节点。
- 当我们切换执行线程时,head指针会指向头结点的next指针。
1.1.3 AQS的使用点
- 在工具内部写一个Sync类继承同步队列器AQS(可以模仿CountDownLatch)
- 根据需求实现获取和释放方法,也就是acquire和release方法的调用规则
- 根据是独占还是共享来决定重写的方法:独占使用
tryAcquire
/tryRelease
、共享使用tryAcquireShared(int acquires)
/tryReleaseShared(int releases)
;
1.2. AQS在CountDownLatch中的源码剖析
- CountDownLatch使用的是共享式同步状态,所以acquire和release是调用的tryAcquireShared和tryReleaseShared,这两个方式返回的是数字。关于数字是什么意思,下面阅读过源码后就明白了。
1.2.2 构造函数
我们看到内部实现就是初始化一个
Sync
然后把计数值传入public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
我们可以看下面的
CountDownlatch
中Sync
的实现,在构造方法创建的Sync
传入的count
调用了setState
方法传入了AQS
的state
中- 在
CountDownLatch
内部有一个继承AQS的Sync
```java
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
- `CountDownLatch`的`getCount()`方法
```java
public long getCount() {
return sync.getCount();
}
- 我们可以看到
getCount
实际也是调用Sync
的getCount()
来获取state
并返回1.2.3
CountDownLatch
的countDown()
方法public void countDown() {
sync.releaseShared(1);
}
我们看一看到它直接调用了
AQS
的releaseShared(1)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
而
releaseShared
则是回去调用CountDownLatch
中实现的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
而在
tryReleaseShared
中则是主要对state的值做-1操作,如果state大于零可以获取到就减一并且用CAS并发更新值,如果最新值为0就返回true- 返回true过后就
doReleaseShared
释放锁,唤醒队列里面的等待线程。也就是调用了await()
方法的线程
1.2.4 CountDownLatch
的await()
方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
而
await
则会调用AQS中的默认实现sync.acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
而里面则是调用
tryAcquireShared(arg) < 0
看是否小于0,如果小于0就代表没有获取到锁,就调用doAcquireSharedInterruptibly(arg);
入队- 而
tryAcquireShared
则是在CountDownLatch
中的Sync
实现的
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果当前state为0了(也就是说计数已经到0了)就返回一个1就不会满住上面的
acquireSharedInterruptibly
方法中的条件,就会放行,如果不等于0就会返回-1,此时就会入队。调用doAcquireSharedInterruptibly
方法private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法首先会把当前线程在
addWaiter
中包装成一个Node
节点并添加到队列尾部;而这个Node
节点就是FIFO队列的节点。然后就会进入循环,如果当前节点不是
head
,那么就会进入到后面的判断,其中重要的是parkAndCheckInterrupt
,方法如下:private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
它会调用
LockSupport
的park
并且此park
方法就是封装了Unsafe
的native方法park()
来把线程挂起进入阻塞状态public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
我们只需要知道
doAcquireSharedInterruptibly
方法就是把当前线程放到阻塞队列中,并且把线程阻塞就OK了。- AQS在
CountDownLatch
中使用的一些点:- 调用
CountDownLatch
的await()
时,便会尝试获取共享锁,开始时是获取不到锁的,于是就被阻塞 - 可以获取到的条件就是计数器为0,也就是
state==0
的时候。 - 只有每次调用
countDown
方法才会使得计数器减一,减到0时就回去唤醒阻塞中的线程。
- 调用
1.3. AQS在Semaphore
中的源码剖析
- 由于上面讲得很细了,接下来就简略一些
- 在
Semaphore
中state
就是许可证的数量 - 主要的操作就是acquire和release,也是借用Sync对state的操作来控制线程的阻塞与唤醒
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
先看下
acquire
调用的acquireSharedInterruptibly
此方法在上面已经说过。public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
而在Semaphore中Sync有两个实现:
NonfairSync
、FairSync
- 在FairSync中
tryAcquireShared
就会有hasQueuedPredecessors
判断,如果不是头节点,那就返回-1,在acquireSharedInterruptibly
方法中去调用doAcquireSharedInterruptibly
入队并且阻塞线程
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
而在
NonfairSync
中而是直接调用Sync
的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 可以看到其中并没有对是否阻塞队列的头节点判断,直接去获取值,判断是会否许可足够。
而
release
中则是调用AQS的releaseShared
其也是调用Semaphore
中Sync
的tryReleaseShared
来判断是否需要释放锁,去唤醒阻塞线程public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
我们可以看到此处就是关于
Semaphore
的已获取许可的释放 把state
加回去然后用CAS更新state
1.4. AQS在ReentrantLock
中的应用
- 源码在后续中
- 在
ReentrantLock
中,state
主要是重入的次数,加锁的时候state+1 ,而在释放锁的时候,state-1然后判断当前的state==0
- 在
ReentrantLock
中与AQS相关的有三个类:UnfairSync
,FairSync
,Sync
- 关于加锁和解锁的逻辑也是AQS中的acquire方法的逻辑(获取锁失败就会放入队列中)和release方法(调用子类的tryRelease来去掉头部,并且唤醒线程)
- 而加锁解锁中的逻辑,主要是公平锁和非公平锁的区别,公平锁会去判断是否在队列头部,如果在才会去执行,而非公平锁则会抢锁。不会管你是不是在队列头部。
- 相信在上面的源码分析过后,分析
ReentrantLock
是十分简单的。大家可以自行分析。
2. Lock 锁
2.1. Lock接口
2.1.1 Lock接口间接
Lock
和synchronized
,是Java中最常见的锁,他们都可以达到线程安全的目的,Lock
主要用于丰富加锁的形式,以及处理的方法2.1.2 为什么需要Lock?
主要是因为
synchronized
不够用,有如下问题:
lock()
就是最普通的获取锁,如果锁已被其他线程获取,则等待;Lock
不会像synchronized
一样在异常时自动释放锁,因此我们需要手动释放锁,最佳实践:在finally中释放锁,以保证发生异常时锁一定被释放。此外lock()
方法不能被中断,这会有很大隐患,一旦陷入死锁,lock()
就会陷入永久等待。/**
* Lock最佳实践 Lock不像synchronized主动释放锁,需要调用unlock
* @author yiren
*/
public class LockInterface {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " do some work!");
}finally {
lock.unlock();
}
}
}
tryLock()
用来尝试获取锁,如果当前所没有被其他线程占用,则获取成功返回true,锁获取失败返回false;相比于lock()
,这样的方法显然功能更强大了,我们可以根据是否能获取到锁来决定后续的程序行为;且此方法会立即返回;tryLock(long time, TimeUnit unit)
和tryLock()
使用类似,不过它本身可以阻塞等待一段时间锁,超时过后再放弃。- 在我死锁的文章中有个案例,就是利用tryLock来解决死锁问题,代码如下
/**
* 使用tryLock来避免死锁
*
* @author yiren
*/
public class DeadlockTryLock {
private static Lock lock1 = new ReentrantLock();
private static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " got lock 1");
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(10));
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " got lock1 and lock2 successfully.");
lock2.unlock();
lock1.unlock();
break;
} else {
System.out.println(Thread.currentThread().getName() + " fail to get lock2");
lock1.unlock();
}
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(10));
} else {
System.out.println(Thread.currentThread().getName() + " fail to get lock1");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " got lock 2");
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(10));
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " got lock2 and lock1 successfully.");
lock1.unlock();
lock2.unlock();
break;
} else {
System.out.println(Thread.currentThread().getName() + " fail to get lock1");
lock2.unlock();
}
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(10));
} else {
System.out.println(Thread.currentThread().getName() + " fail to get lock2");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
thread2.start();
}
}
Thread-0 got lock 1
Thread-1 got lock 2
Thread-1 fail to get lock1
Thread-0 fail to get lock2
Thread-0 got lock 1
Thread-1 got lock 2
Thread-1 fail to get lock1
Thread-0 got lock1 and lock2 successfully.
Thread-1 got lock 2
Thread-1 got lock2 and lock1 successfully.
Process finished with exit code 0
lockInterruptibly()
相当于tryLock(long time, TimeUnit unit)
把超时时间设置为无线。并且在等待锁的过程中,线程可以被中断。
/**
* @author yiren
*/
public class LockInterruptibly {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Runnable runnable = () -> {
try {
System.out.println(Thread.currentThread().getName() + " try to get lock");
lock.lockInterruptibly();
try {
System.out.println(Thread.currentThread().getName() + " got lock");
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " sleep ");
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " unlock");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " lockInterruptibly ");
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
Thread.sleep(2000);
thread2.interrupt();
}
}
Thread-0 try to get lock
Thread-0 got lock
Thread-1 try to get lock
Thread-1 lockInterruptibly
Thread-0 unlock
Process finished with exit code 0
2.1.4 可见性保证
- Lock的加解锁和synchronized有同样的内存语义,也就是说下一个线程加锁后可以看到所有前一个线程解锁前发生的所有操作。拥有happens-before。
2.2. 锁的分类
2.2.1 乐观锁和悲观锁
- 悲观锁(互斥同步锁)的劣势
- 阻塞和唤醒带来的性能劣势,悲观锁,锁住过后就是独占的。
- 可能永久阻塞:如果尺有所的线程被永久阻塞,如遇到了死循环、死锁等活跃性问题,这时等待线程释放的锁的线程将永远得不到执行。
- 优先级错乱:如果优先级低的线程获取到锁了,优先级高的也必须等待优先级低的锁释放。
- 什么是乐观锁和悲观锁
- 乐观锁:总认为没人抢资源,所以通常先不加锁,等到出了问题了再处理。如果在更新的时候,去对比在我修改期间数据有没有被其他人修改过,如果没被修改过,那就说明真的只有自己操作,就去更新数据。那么如果被修改过,那就说明被人改了,此时就会选择放弃、报错、重试等策略。
- 典型案例:乐观锁的实现一般都是利用CAS算法来实现,如:Atomic类、并发容器等
- 典型案例:数据库中,可以添加一个version版本号,更新的时候先查询,然后更新的时候用update一条一句对版本进行判断并更新
- 开销:虽然乐观锁一开始的开销比悲观锁校,但是如果自旋的事件很长或者不断重试,那么消费的资源也会越来越多。
- 使用场景:乐观锁适用于:并发写入少,大部分是读取场景,不加锁的能让读取性能大幅度提高
- 悲观锁:认为资源总是在竞争,如果不锁住就会造成数据错误,所以悲观锁为了保证正确性,会在每次获取并修改数据时,把数据锁住,让别人无法访问该数据,这样就可以确保数据内容万无一失;
- 典型案例:Java中悲观锁典型的就是
synchronized
和Lock
相关类 - 典型案例:数据库中select for update就是悲观锁
- 开销:悲观锁的原始开销要高于乐观锁,但是一劳永逸,临界区尺有所时间就算越来越差,也不会对互斥锁的开销造成影响
- 使用场景:悲观锁适用于临界区有IO操作,代码复杂或者循环量大,竞争非常激烈的情况,以避免大量的无用自旋等的性能消耗
- 典型案例:Java中悲观锁典型的就是
2.2.2 可重入锁与非可重入锁
- 以ReentrantLock为例,synchronized也支持
- 什么是可重入锁?
- 可重入就是说某个线程已经获得某个锁,可以再次获取这个锁而不会出现死锁。 ```java /**
- @author yiren
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
} } ```Lock lock = new ReentrantLock();
lock.lock();
try {
System.out.println("in 1");
lock.lock();
try {
System.out.println("in 2");
}finally {
lock.unlock();
System.out.println("out 2");
}
}finally {
lock.unlock();
System.out.println("out 1");
}
in 1
in 2
out 2
out 1
Process finished with exit code 0
- 可重入的好处
- 避免死锁:如果一个方法已经获取到了锁,调用另外一个方法也要使用这个锁,那就会第二次加锁,如果不能成功获取锁,就会发生死锁。
- 代码演示 ```java /**
- @author yiren
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
} } ```ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
System.out.println("HoldCount:" + lock.getHoldCount() + " in 1");
lock.lock();
try {
System.out.println("HoldCount:" + lock.getHoldCount() + " in 2");
lock.lock();
try {
System.out.println("HoldCount:" + lock.getHoldCount() + " in 3");
}finally {
lock.unlock();
System.out.println("out 3");
}
}finally {
lock.unlock();
System.out.println("out 2");
}
}finally {
lock.unlock();
System.out.println("out 1");
}
HoldCount:1 in 1
HoldCount:2 in 2
HoldCount:3 in 3
out 3
out 2
out 1
Process finished with exit code 0
- 源码分析
ReentrantLock
中默认是使用的NonfairSync
,而NonfairSync
继承自Sync
,加锁和释放锁主要涉及里面下面两个方法,另外FairSync里面的关于重入锁部分也差不多。final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
加锁时在
nonfairTryAcquire
中else-if会判断如果当前线程就是已经占有锁的线程,则status就会加一,并返回true。- 释放锁时在
tryRelease
中也是先判断当前线程是否是已经占有锁的线程,然后在判断status
,如果status
等于0了,才真正释放锁。
- ReentrantLock其他方法介绍
isHeldByCurrentThread()
可以查看出锁是否被当前线程锁持有getQueueLength
可以返回当前正在等待这把锁的队列有多长
2.2.3 公平锁与非公平锁
- 什么是公平与非公平锁
- 公平:按照线程请求的顺序来分配锁
- 非公平:不完全按照请求的顺序,在一定情况下可以插队;不过非公平锁,同样不提倡插队,它只在合适的时机插队,而不是盲目乱插队
- 为什么需要非公平锁
- 注意:在ReentrantLock中,如果不指定,默认的实现就是非公平锁。如果在创建
ReentrantLock
是,传入参数true
,此时就会变成公平锁 - 使用非公平锁的原因是为了提高效率,避免唤醒带来的空档期
- 比如:有三个线程,A现在持有锁,按照公平当A释放锁后,B就会唤醒执行,但是当A释放锁的时候,唤醒B,B没有及时响应还在唤醒中,线程C此时就可以立马执行,就会交给线程C执行,以此来避免B唤醒期间的资源浪费。
案例演示
- 模拟打印工作,公平和非公平只需要修改
printQueue
里面ReentrantLock
的参数 ```java /**
@author yiren */ public class FairLock {
public static void main(String[] args) throws InterruptedException { PrintQueue queue = new PrintQueue(); ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 4; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+ " start to print");
queue.printJob(new Object());
System.out.println(Thread.currentThread().getName()+ " finished print ");
});
TimeUnit.MILLISECONDS.sleep(100);
}
}
private static class PrintQueue { private Lock lock = new ReentrantLock(true);
private void printJob(Object document) {
lock.lock();
try {
Integer duration = (int) (Math.random() * 3 + 1);
System.out.println(Thread.currentThread().getName() + " print 1 need " + duration + " s");
Thread.sleep(duration * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
lock.lock();
try {
Integer duration = (int) (Math.random() * 3 + 1);
System.out.println(Thread.currentThread().getName() + " print 2 need " + duration + " s");
Thread.sleep(duration * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} } } ```
- 模拟打印工作,公平和非公平只需要修改
- 如果参数为true,为公平锁,结果如下
pool-1-thread-1 start to print
pool-1-thread-1 print 1 need 1 s
pool-1-thread-2 start to print
pool-1-thread-3 start to print
pool-1-thread-4 start to print
pool-1-thread-2 print 1 need 3 s
pool-1-thread-3 print 1 need 1 s
pool-1-thread-4 print 1 need 2 s
pool-1-thread-1 print 2 need 3 s
pool-1-thread-1 finished print
pool-1-thread-2 print 2 need 2 s
pool-1-thread-2 finished print
pool-1-thread-3 print 2 need 3 s
pool-1-thread-3 finished print
pool-1-thread-4 print 2 need 3 s
pool-1-thread-4 finished print
- 我们可以通过结果看出,线程按照执行的先后顺序,来打印。不会出现插队的情况,先打印第一次,然后打印第二次,且多个线程依次执行。
- 如果不给参数,就为非公平锁,结果如下:
pool-1-thread-1 start to print
pool-1-thread-1 print 1 need 3 s
pool-1-thread-2 start to print
pool-1-thread-3 start to print
pool-1-thread-4 start to print
pool-1-thread-1 print 2 need 2 s
pool-1-thread-1 finished print
pool-1-thread-2 print 1 need 3 s
pool-1-thread-2 print 2 need 1 s
pool-1-thread-2 finished print
pool-1-thread-3 print 1 need 3 s
pool-1-thread-3 print 2 need 2 s
pool-1-thread-3 finished print
pool-1-thread-4 print 1 need 3 s
pool-1-thread-4 print 2 need 1 s
pool-1-thread-4 finished print
- 非公平状态下,我们可以看到,打印完第一次,如果按照排队顺序应该是线程2,但是打印的实际是线程1的第二次。
- 特例
tryLock()
它不遵守设定的公平规则。也就是说:当有线程执行tryLock
的时候,一旦有线程释放了锁,即使他之前已经有其他在等待队列里的线程,这个正在tryLock的线程依旧能获取到锁。
- 优缺点分析
- 公平锁:
- 优点:各个线程公平,每个线程在等待一段时间后,总有执行机会。
- 缺点:更慢,吞吐量更小
- 非公平锁:
- 优点:更快,吞吐量更大
- 缺点:有可能某些线程会产生饥饿,线程长时间,始终得不到执行
- 源码分析
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
两者在获取锁的代码中,最主要的区别就是公平锁有一个
!hasQueuedPredecessors()
,它会判断是否有现成在队列前面已经排队了,如果没有才去获取锁。
2.2.4 共享锁和排他锁
- Java中
ReentrantReadWriteLock
为代表
- 什么是共享锁和排他锁
- 共享锁:又称读锁,获取共享锁过后,可以查看但是无法修改和删除,其他线程可以同时获取到共享锁
- 排他锁:又称独占锁、独占锁,获取了排他锁后既可以读又可以写,但是其他线程无法再次获取。
- 读写锁的作用
- 如果我们不适用读写锁,那么我们多个线程读的操作,并不能同时进行,只能排队,虽然没有线程安全问题,但是性能会变差。
- 如果我们在读的地方用读锁,写的地方用写锁,可以提高效率。
- 读写锁的规则
- 多个线程读锁可以重复获取
- 但是如果有线程以及获取了读锁,那么其他线程就不可以获取写锁
- 但是如果有线程以及获取了写锁,那么其他线程就不可以获取写锁
- 总结:读写互斥、写写互斥。
ReentrantReadWriteLock
用法/**
* @author yiren
*/
public class ReadWriteLock {
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
private static void read() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " start to read, got read lock");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " read finished, release read lock");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
private static void write() {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " start to write, got write lock");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " read finished, release write lock");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 2; i++) {
executorService.execute(ReadWriteLock::write);
}
for (int i = 0; i < 5; i++) {
executorService.execute(ReadWriteLock::read);
}
}
}
pool-1-thread-1 start to write, got write lock
pool-1-thread-1 read finished, release write lock
pool-1-thread-2 start to write, got write lock
pool-1-thread-2 read finished, release write lock
pool-1-thread-3 start to read, got read lock
pool-1-thread-2 start to read, got read lock
pool-1-thread-1 start to read, got read lock
pool-1-thread-3 read finished, release read lock
pool-1-thread-1 read finished, release read lock
pool-1-thread-2 read finished, release read lock
pool-1-thread-1 start to read, got read lock
pool-1-thread-3 start to read, got read lock
pool-1-thread-3 read finished, release read lock
pool-1-thread-1 read finished, release read lock
- 我们可以看到,读可以同时进行,而写的时候则是需要等持有写锁的线程的完成了,再进入到另一个写锁,并且我们可以看到,当写锁持有的时候,读锁也立即获取到,而是等待写锁完成后,再获取到读锁
- 读锁插队策略
- 按照上面所说,如果先进入读任务,那么来了按顺序再来一个写锁,然后再来一个读锁,我们可以试想,读锁,不需要排队,可以直接进入。此时会有一个问题,如果后面继续再来读锁,写锁是不是一直获取不了。就会造成饥饿。
- ReentrantReadWriteLock(非公平锁时,公平情况下都得排队)并不是这样做的,它的策略是,如果读任务正在进行,此时先来一个写锁排在队头部,然后再来一个读锁它发现队列头部是写锁任务,此时进来的读任务就不会插队,会进入队列排在写锁之后,以保证写锁可以得到执行。宁可降低一点性能,也要避免写线程饥饿。
- 看下非公平锁是否插队判断的源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
- 上面注释就说明了,写的人总是可以插队
- 但是读者调用了
apparentlyFirstQueuedIsExclusive
队列头结点是不是排他锁(写锁)如果是就不允许插队了。 我们可以对上面读写锁的案例进行修改一下main方法
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(ReadWriteLock::write);
executorService.execute(ReadWriteLock::read);
executorService.execute(ReadWriteLock::read);
executorService.execute(ReadWriteLock::write);
executorService.execute(ReadWriteLock::read);
}
pool-1-thread-1 start to write, got write lock
pool-1-thread-1 read finished, release write lock
pool-1-thread-2 start to read, got read lock
pool-1-thread-3 start to read, got read lock
pool-1-thread-2 read finished, release read lock
pool-1-thread-3 read finished, release read lock
pool-1-thread-4 start to write, got write lock
pool-1-thread-4 read finished, release write lock
pool-1-thread-5 start to read, got read lock
pool-1-thread-5 read finished, release read lock
- 此时我们就可以看到,线程5读线程,并没有插队执行,而是等待了线程4完成了,再执行。
额外提醒:读锁在队列头部不是写锁的时候,是可以插队的。
如现在的队列是这样的:Reader->Reader->Writer->Reader,这最后一个读锁,就有可能和前两个一起执行。我们修改一下上面的代码,把线程数改成4
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(ReadWriteLock::write);
executorService.execute(ReadWriteLock::read);
executorService.execute(ReadWriteLock::read);
executorService.execute(ReadWriteLock::write);
executorService.execute(ReadWriteLock::read);
}
pool-1-thread-1 start to write, got write lock
pool-1-thread-1 read finished, release write lock
pool-1-thread-2 start to read, got read lock
pool-1-thread-3 start to read, got read lock
pool-1-thread-1 start to read, got read lock
pool-1-thread-2 read finished, release read lock
pool-1-thread-3 read finished, release read lock
pool-1-thread-1 read finished, release read lock
pool-1-thread-4 start to write, got write lock
pool-1-thread-4 read finished, release write lock
可以看此时的执行的就是三个读锁先执行了,然后再执行写锁!
- 读写锁的升降级
- 支持锁的降级,但是不支持升级
代码演示:
/**
* @author yiren
*/
public class ReadWriteLockLevel {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Thread thread = new Thread(() -> {
readWriteLock.writeLock().lock();
try {
System.out.println("writer task!");
Thread.sleep(1000);
readWriteLock.readLock().lock();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
try {
System.out.println("reader task!");
Thread.sleep(1000);
System.out.println("reader task! end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
});
Thread thread1 = new Thread(() -> {
readWriteLock.readLock().lock();
try {
System.out.println("other reader task!");
}finally {
readWriteLock.readLock().unlock();
}
});
thread.start();
thread1.start();
}
}
writer task!
reader task!
other reader task!
reader task! end
Process finished with exit code 0
- 我们可以看到,锁降级过后,读锁就可以再次获取
- 而读锁是不能升级成写锁的,上面就说过,读锁和写锁不会同时存在!
2.2.5 自旋锁和阻塞锁
- 什么是自旋锁和阻塞锁?
- 阻塞或唤醒一个线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间
- 如果同步代码块中的内容过于简单,状态装换消耗的事件可能比用户代码执行的时间还要长
- 在许多场景中,同步资源锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失
- 如果物理机器有多个处理器,能够让两个或者以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃CPU的执行时间,看看持有锁的线程是否很快就会释放锁。
- 为了让当前线程等一下,我们就让当前线程进行自旋,如果在自旋完成后,前面锁定同步资源的线程以及释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁
- 相反阻塞锁就是如果线程没有拿到锁,就会直接把线程阻塞,知道被唤醒。
- 自旋锁的缺点:如果锁的占用时间过长,那么自旋的线程就会白白浪费处理器资源,浪费资源随时间线性增长
原理和源码分析
- 在J.U.C下
atomic
包下的类基本都是自旋锁试下 如:AtomicInteger:自旋锁实现是CAS,AtomicInteger中调用了unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改过程中遇到其他线程竞争导致没修改成功,就在while里面疯狂循环,直到修改成功
// AtomicInteger
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// Unsafe
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
- 在J.U.C下
自己实现一个简单的自旋锁:
/**
* @author yiren
*/
public class SpinLock {
private static AtomicReference<Thread> sign = new AtomicReference<>();
private static void lock() {
Thread current = Thread.currentThread();
while (!sign.compareAndSet(null, current)) {
System.out.println("fail to set!");
}
}
private static void unlock() {
Thread thread = Thread.currentThread();
sign.compareAndSet(thread, null);
}
public static void main(String[] args) {
Runnable runnable = () -> {
System.out.println("start to get lock");
SpinLock.lock();
System.out.println("got lock successfully!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
SpinLock.unlock();
}
};
Thread thread = new Thread(runnable);
Thread thread1 = new Thread(runnable);
thread.start();
thread1.start();
}
}
- 自旋锁使用场景:
- 自旋锁一般用于多核的服务器,在并发度不高的情况下,比阻塞锁效率高
- 用于临界区比较短小的情况下,否则线程一旦拿到锁,很久才释放,就会造成性能浪费了。
2.2.5 可中断锁与不可中断锁
- 在java中,synchronized就是不可中断锁,而Lock是可中断锁,可通过
tryLock(time)
和lockInterruptibly
来实现响应中断 上面Lock接口案例演示中已经演示过,可看第一部分的
LockInterruptibly
类2.3. 锁优化
2.3.1 JVM对锁的优化
自旋锁和自适应:比如自旋多少过后,它会把锁编程阻塞锁
- 锁消除:有些场景下,不需要加锁,JVM会分析出来,然后直接消除它
锁粗化:如果一系列操作都是对一个对象反复加锁,也会带来性能开销,所以JVM会把它们合成一次加解锁。
2.3.2 编码优化
缩小同步代码块,只锁需要锁的
- 尽量不要锁住方法
- 减少锁的请求次数,减少频繁获取锁的开销。
- 避免人为制造“热点”,比如一个集合你每次用大小都去遍历一遍计数
- 锁里面尽量不要包含锁
- 选择合适的锁的类型或者合适的工具类