重新回到这个acquire加锁的方法,如果tryAcquire加锁失败了就走第2步操作
第二步操作又可以分成两步,一个是addWaiter(Node.EXCLUSIVE),另一个是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter,线程入队
addWaiter(Node.EXCLUSIVE)
这个EXCLUSIVE代表的就是排他性,独占锁,同一时间只能有一个线程获取到锁,此时是排他锁,独占锁。
private Node addWaiter(Node mode) {
//1.封装node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//2.
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//3
enq(node);
return node;
}
1.Node node = new Node(Thread.currentThread(), mode);
- 将当前线程(抢锁的线程)封装成了一个Node,mode=EXCLUSIVE
- 具体看一下Node
- volatile int waitStatus; :一个线程无法获取到锁的时候,就会阻塞住,当作线程挂起,但是阻塞状态又细分为很多不同的阻塞状态CANCELLED、SIGNAL、CONDITION、PROPAGATE
- volatile Node prev;:一个节点可以有上一个节点、prev指针
- volatile Node next;:一个节点可以有下一个节点、next指针
- volatile Thread thread;:保存当前node的Thread
- Node nextWaiter;:可以认为是下一个等待线程
获取不到锁,处于等待状态的线程,会封装为一个Node,而且有指针,最后多个处于阻塞等待状态的线程可以封装为一个Node双向链表。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
Node nextWaiter;
/**
* The synchronization state.
*/
private volatile int state;
3.enq(node);队列初始化
private Node enq(final Node node(这个node就是封装线程2的node对象)) {
for (;;) {
//1
Node t = tail;
//2
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一轮循环
- Node t = tail; tail此时还是null,t=null
- if (t == null)这一步成立,随后通过cas操作unsafe.compareAndSwapObject(this, headOffset, null, update);,判断一下head变量是否为null,如果是null就new 了一个Node()并放到head指针上,执行tail = head,头尾相连。
第二轮循环
- Node t = tail; 此时tail指向HeadNode,t也就是指向了HeadNode
- node.prev = t; 线程2node的prev指针指向t指向的那个headNode
- compareAndSetTail(t, node);尝试比较tail的变量是否是t节点,如果是tail就指向线程2的node
- t.next = node;把那个创建的HeadNode节点的next指针和Node关联上,这俩形成一个双向链表
- return t;这个t实际上就是图中的HeadNode。到此,双向队列初始化完成
addWaiter结束后,线程2的Node就已经被加入到队列里面去了
acquireQueued,第二个线程尝试加锁或阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1
final Node p = node.predecessor();
//2
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//3
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第二个线程加锁成功过程
- final Node p = node.predecessor();获取线程2Node的上一个节点,
- p == head && tryAcquire(arg);根据图中已经创建好的队列来看,Node上一个节点就是head指向的Node,此时p==head成立,接下来就要再次尝试获取一下锁tryAcquire(arg);
- 如果加锁成功,将线程2对应的node从队列中移除。
重新构造队列,
setHead(node);该方法把head的指针指向线程2的Node,同时把线程2的thread干掉设置为null,同时线程2Node的prev指针也与之前的HeadNode取消关联。
p.next = null; 原来的HeadNode的next指针与原来的线程2Node取消关联,这个时候HeadNode就孤立了,不被任何变量引用,因此就能被GC掉(也就是为什么注释里写了help GC的原因)
return interrupted;此时这个interrupted一定是一个false,因为已经被唤醒了线程的interrupt肯定不能是true
(疑问:执行了LockSupport.park代码是不会往下走了吗?同时这个park执行了后都改了线程的什么东西)
setHead(node);
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
p.next = null; // help GC
failed = false;
return interrupted;
第二个线程加锁失败过程
加锁失败进入shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()判断。判断一下,是否需要将当前线程挂起,阻塞等待。如果需要,使用park操作挂起当前线程
shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
int ws = pred.waitStatus;,pred就是Head指向的node就是头节点,头节点这个watiStatus一开始的时候是个0
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);进入这个else代码块,把这个pred的也就是这个头节点的waitStatus设置成SIGNAL。
- 返回false
又跳回上层acquireQueued的代码,这个代码中是个for死循环
- final Node p = node.predecessor();又拿一次那个headNode
- p == head && tryAcquire(arg);尝试加锁,又失败
- shouldParkAfterFailedAcquire(p, node)进入该方法做判断,这个时候ws == Node.SIGNAL判断成立,因此就返回true
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt(),挂起线程
private final boolean parkAndCheckInterrupt() {
//1
LockSupport.park(this);
//2
return Thread.interrupted();
}
LockSupport.park(this);将线程进行挂起,必须有另外一个线程来对当前线程执行unpark操作,唤醒当前挂起的线程(为什么叫异步阻塞,这里的park就必须被别的线程所唤醒,这个过程就是异步的)
- Thread.interrupted(); 就是修改线程的interrupte标记位改为true,不让当前线程再运行了。
设置interrupted
第三个线程加锁失败过程
tryAcquire失败仍然是做,addWaiter(Node.EXCLUSIVE)和acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter
private Node addWaiter(Node mode) {
//1.封装node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//2.
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//3
enq(node);
return node;
}
- 封装node
- Node pred = tail;
- pred != null 成立,执行node.prev = pred;
- compareAndSetTail(pred, node),由于pred保存的tail的引用,cas操作后就把tail指向了线程3的Node
- pred.next = node;在第2步的时候pred保存的tail指针,而原来tail指针指向的线程2的Node,因此pred就是线程2的Node,此时线程2Node的next指针指向线程3的Node指针
-
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
第一次循环
final Node p = node.predecessor();拿到上一个节点,线程3Node的上一个节点就是线程2Node,此时p指向线程2Node
- p == head && tryAcquire(arg);首先p ==head就不成立,意味着如果上一个节点不是head那就意味着还有别的线程在等待加锁,因此后面的tryAcquire压根就不让他走
shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()
- shouldParkAfterFailedAcquire(p, node) 过程是一样的,pred指向线程2的Node,直接走的else块,把ws设置成signal
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- shouldParkAfterFailedAcquire(p, node) 过程是一样的,pred指向线程2的Node,直接走的else块,把ws设置成signal
又跳回上层acquireQueued的代码,这个代码中是个for死循环,shouldParkAfterFailedAcquire(p, node)此时由于线程2Node的ws是SIGNAL因此这个时候这个方法会返回true
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt();直接挂起当前线程3。