自旋:
- 缺点:耗费cpu资源,没有竞争到锁的线程会一直占用cpu资源进行cas操作,假如一个线程获得锁后要花费n秒处理业务逻辑,娜另外一个线程就会白白花费n秒的cpu资源。
思路:让得不到锁的线程让出cpu
Object类中的wait和notify方法
- wait和notify方法必须在同步代码块中,即必须先拿到对象锁才能wait和notify。底层是线程wait之后进入了waitSet队列,然后等待别人唤醒去和entryList中阻塞的队列抢锁(非公平)
- notify方法在wait方法之前执行不能唤醒之后的wait线程
- juc包中的condition接口的await方法和notify方法
- 效果同上
- lockSupport可以阻塞当前线程和唤醒指定被阻塞的线程:park与unpark
}
<a name="Tyl2K"></a>
## 1.1 常量字段
```java
static final Node SHARED = new Node(); // 标识共享锁节点
static final Node EXCLUSIVE = null; // 标识排它锁节点
// 以下常量,都是waitStatus字段可能的值
static final Node SHARED = new Node(); // 标识共享锁节点
static final Node EXCLUSIVE = null; // 标识排它锁节点
// 以下常量,都是waitStatus字段可能的值
static final int CANCELLED = 1; // 取消状态,也就是不想申请或释放了
static final int SIGNAL = -1; // 等待通知状态,当节点变为这个状态,唤醒后继节点
static final int CONDITION = -2; // 条件状态,await/signal 场景下使用
static final int PROPAGATE = -3; // 传播状态,传播共享状态
1.2 变量字段
volatile int waitStatus; // 用于标识每一个等待节点状态
volatile Node prev; // 前置节点指针
volatile Node next; // 后继节点指针
volatile Thread thread; // 持有的线程对象,可以理解为每个node代表一个线程
Node nextWaiter; // 等待队列下一个节点指针(Condition中使用)
1.3 方法
// 判断是否为申请共享锁的节点;
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前置节点,如果为null抛出异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
2.AQS(abstract queuee synchronizer)
2.1 AQS成员变量
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state; // 同步器状态,它的值决定了是否持有锁,通过CAS自旋完成对state值的修改
// 0表示没人,自由状态;>=1表示有人占用,进入队列等着
2.2 抽象方法
AQS中包含5个未实现方法,也就是等待子类来实现个性化逻辑的,他们是:
protected boolean tryAcquire(int arg); // 尝试获取排他锁
protected boolean tryRelease(int arg); // 尝试释放排他锁
protected int tryAcquireShared(int arg); // 尝试获取共享锁
protected boolean tryReleaseShared(int arg); // 尝试释放共享锁
protected boolean isHeldExclusively(); // 判断当前线程是否持有排他锁
由这5个方法可知,AQS中涉及了排它锁和共享锁两类的操作,每种锁又包含申请和释放两类操作流程,每个流程中又涉及到多个实例方法和抽象方法调用。
在AQS中,方法分为可中断的、不可中断的、同步的、带超时时间的同步的,目前整理的流程是“不可中断的同步的”。
2.3 结构
虚拟双向队列。将请求资源的线程封装成队列的节点,通过CAS、自旋以及LockSupport.park()方式,维护state变量的状态,使并发达到同步的控制效果。state+CLH
3. ReentrantLock
3.1 加锁
3.1.1 公平锁
final void lock() {
acquire(1);
}
// AQS的方法,
public final void acquire(int arg) {
// tryAcquire 进入1 ,addWaiter进入2, acquireQueued进入3
// tryAcquire拿到了锁就不会执行下面的
// 没拿到锁;进入addWaiter(加入等待队列)方法(标识排他锁)(把当前线程封装成为node)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果是被打断的,acquireQueued方法会返回true,设置打断标记,让自己知道是被打断唤醒的
selfInterrupt();
}
// 1.尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 拿到线程当前的状态c
int c = getState();
// c=0,没人占用锁,我要去上锁.(上一个人恰好处理完解锁,我恰好来上锁)
if (c == 0) {
// hasQueuedPredecessors 进入1.1
/**
* 通过对比源码发现,公平锁比非公平锁多了这块代码: !hasQueuedPredecessors()
* hasQueuedPredecessors() 是做什么呢?就是判断当前同步队列中是否存在节点,如果存在节点呢,
* 就返回true,由于前面有个 !,那么就是false,再根据 && 逻辑运算符的特性,不会继续执行了;
*
* tryAcquire()方法直接返回false,后面的逻辑就和非公平锁的一致了,就是创建Node节点,并将
* 节点加入到同步队列尾; 公平锁:发现当前同步队列中存在节点,有线程在自己前面已经申请可锁,那
* 自己就得乖乖的向后面排队去。
*
* 友情提示:在生活中,我们也需要按照先来后到去排队,保证素质; 还有就是怕你们不排队被别人打了。
*/
// 我前面的等待队列中没人排队(我是第一个),有人排队我也要排队
if (!hasQueuedPredecessors() &&
// 设置值为1
compareAndSetState(0, acquires)) {
// 将该线程赋给当前锁对象(独占锁)
setExclusiveOwnerThread(current);
return true;
}
}
// 线程必须是当前存入的,重入;如果不是当前线程,不进。
else if (current == getExclusiveOwnerThread()) {
// 重入,把state的值+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// AQS方法 1.1 判断队列中有没有节点在排队,没人排队返回true,有人排队返回false
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// AQS 方法 2.加入等待队列(把当前线程封装成为node)
// 第一次添加节点的时候会new一个双向链表等待队列,返回当前节点
private Node addWaiter(Node mode) {
// 把当前线程封装成为node,mode为独占锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列为空,需要初始化队列,不会进(第一次队列还未初始化);
if (pred != null) {
// 队列不为空,进入队列中,将自己设置为tail节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1 加入队列
enq(node);
return node;
}
// AQS方法 2.1 进入队列尾部
private Node enq(final Node node) {
for (;;) { // 自旋
Node t = tail;
// 如果尾结点是空的,先初始化;new一个哑结点,放在队头
if (t == null) { // Must initialize
// new哑结点,并将头尾指针指向这个哑结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 再添加一个当前的节点(自己)
node.prev = t;
// 把自己设置为队尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// AQS方法3.
// 判断我之前的节点是park,那么我也park;如果上一个节点不是park,竞争锁
// 自旋过程,每个节点来的时候每次进入自旋都会判断我能不能拿到锁,如果不能拿到锁,就先把上一个节点
// 的waitStatus改成-1,自己进入阻塞状态
final boolean acquireQueued(final Node node, int arg) {
// 异常1:这个结点自己不想等了
boolean failed = true;
try {
// 异常2:排队的过程被打断了
boolean interrupted = false;
// 自旋过程,每一次自旋都会去尝试获得一次锁,获得失败才会走下面的流程自旋
for (;;) {
// 拿到上一个节点p
final Node p = node.predecessor();
// 如果上一个节点p是head(即自己是进入等待队列自己是第二个节点,排队中的第一个节点)
// unpark拿到锁之后会进入这个方法
if (p == head && tryAcquire(arg)) {
// 获取到锁,就把自己变成head(哑结点),并清空当前存的线程,这个时候如果不是队尾,waitStatus就是-1
setHead(node);
// 把前面的p节点gc
p.next = null; // help GC
// 正常结束,没有被取消
failed = false;
// 如果被打断,返回true;不被打断,返回false
return interrupted;
}
// 上一个节点不是head(自己不是第二个节点)或者锁还是被占用着,进入3.1 shouldParkAfterFailedAcquire:
// 判断上一个节点p是否需要唤醒当前节点
// 如果是-1,进入3.2 parkAndCheckInterrupt 则将自己park住,等待上一个节点唤醒
// 如果不是-1,设置成-1,并进行下一次自旋
// 整个流程:第一趟:将哨兵节点的值变成-1,第二趟:返回true,然后将自己park住(暂停执行,等待恢复执行)
// 如果是被打断的,parkAndCheckInterrupt方法会返回true,interrupted就置为true表示自己是被打断的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//AQS 方法 3.1 判断前一个节点的waitStatus值,如果是-1,返回true;如果是1,取消;如果是其他,将值改成-1,返回false进行下次自旋
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果是-1,需要唤醒下个节点,返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 1表示线程取消判断
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 否则,修改这个值为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AQS方法 park住自己
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会向下执行
LockSupport.park(this);
// 根据park的api描述,程序会在以下三种情况向下执行:
// 1.被unpark
// 2.被中断(interrupt)
// 3.其他不合逻辑的返回
// 因上述三种情况程序执行到此处,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回true
return Thread.interrupted();
}
3.1.2 非公平锁的不同点
final void lock() {
// 先竞争,竞争失败再排队
if (compareAndSetState(0, 1))
// 设置占用线程为当前线程,独占锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 不用在乎自己是不是队首。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.2 解锁
没有公平不公平之分
public void unlock() {
sync.release(1);
}
//AQS方法
public final boolean release(int arg) {
// 尝试释放锁 进入1.1
if (tryRelease(arg)) {
Node h = head;
// 队列不为空
if (h != null &&
// 且waitStatus == Node.SIGNAL 才需要 unpark(如果是0,那么他是队尾,就不用唤醒了)
h.waitStatus != 0)
// 唤醒下一个线程 进入1.2
unparkSuccessor(h);
return true;
}
return false;
}
//AQS方法 1.1
protected final boolean tryRelease(int releases) {
// 更改c的状态值,getState表示当前重入锁的数量值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果c为0,表示没有人占用锁了(无重入),只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
// 将独占该锁的线程置为null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//AQS方法 1.2 唤醒线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 更改waitStatus状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 拿到头结点的下个节点s
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 下个节点s不为null,唤醒下一个节点s
// unpark后,从之前park的地方执行,即parkAndCheckInterrupt()方法
if (s != null)
LockSupport.unpark(s.thread);
}
3.3 可打断原理
- 不可打断模式:
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了(阻塞状态被打断会清除打断标记)。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 重新产生一次中断,(让线程知道自己是被打断阻塞而唤醒的,因为interrupt打断阻塞线程后会清除打断标记)这时候线程是如果正常运行的状态,那么不是处于sleep等状态,interrupt方法就不会报错
Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 再次循环拿到了锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// 更改failed为失败状态
failed = false;
// 获得锁后, 返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
}
} finally {
// 只有获得锁出现异常,才会执行该方法(自己实现的AQS类可能会出异常)
if (failed)
// 取消加锁
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// unpark之后执行,判断阻塞是否被别人打断
// interrupted 会清除打断标记
return Thread.interrupted();
}
}
可打断模式: ```java static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程 private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
} }
``` 区别在于不可打断模式是将打断标记置为true,再次进入for循环(在AQS队列中);可打断模式直接抛出异常,不会进入for循环。