思路:https://www.cnblogs.com/waterystone/p/4920797.html
image.png
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

tryAcquire()是自定义重写的,ReentrantLock重写即校验、修改state的值,更新当前ExclusiveOwnerThread值,除此之外没有其他逻辑。

结点(Node)状态waitStatus

这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。

  1. 1 public final void acquire(int arg) {
  2. 2 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  3. 4 selfInterrupt();
  4. 5 }

函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

AQS(AbstractQueuedSynchronizer)是一个抽象同步队列,JUC(java.util.concurrent)中很多同步锁都是基于AQS实现的。
AQS的基本原理就是当一个线程请求共享资源的时候会判断是否能够成功操作这个共享资源,如果可以就会把这个共享资源设置为锁定状态,如果当前共享资源已经被锁定了,那就把这个请求的线程阻塞住,也就是放到队列中等待。

image.png

CAS

AQS中采用CAS进行并发处理部分源码如下

  1. //属性值偏移量
  2. private static final long nextOffset;
  3. static {
  4. try {
  5. //获取属性值偏移量
  6. nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
  7. } catch (Exception ex) { throw new Error(ex); }
  8. }
  9. /**
  10. * CAS next field of a node.
  11. */
  12. private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
  13. /**
  14. * @param:node CAS操作的对象
  15. * @param:nextOffset 对象内部的属性相对偏移量,配合node找到属性地址
  16. * @param:期望旧值
  17. * @param:更新值
  18. */
  19. return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  20. }
  21. /**
  22. * CAS配合自旋
  23. **/
  24. for (;;) {
  25. Node h = head;
  26. if (h != null && h != tail) {
  27. int ws = h.waitStatus;
  28. if (ws == Node.SIGNAL) {
  29. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  30. continue; // loop to recheck cases
  31. unparkSuccessor(h);
  32. }
  33. else if (ws == 0 &&
  34. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  35. continue; // loop on failed CAS
  36. }
  37. if (h == head) // loop if head changed
  38. break;
  39. }

doAcquireNanos:带超时时间的独占锁获取方法

  1. private boolean doAcquireNanos(int arg, long nanosTimeout)
  2. throws InterruptedException {
  3. if (nanosTimeout <= 0L)
  4. return false;
  5. final long deadline = System.nanoTime() + nanosTimeout;
  6. //加入到阻塞队列中,并将node标记为独占锁方式
  7. final Node node = addWaiter(Node.EXCLUSIVE);
  8. boolean failed = true;
  9. try {
  10. for (;;) {
  11. final Node p = node.predecessor();
  12. if (p == head && tryAcquire(arg)) {
  13. setHead(node);
  14. p.next = null; // help GC
  15. failed = false;
  16. return true;
  17. }
  18. nanosTimeout = deadline - System.nanoTime();
  19. if (nanosTimeout <= 0L)
  20. return false;
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. nanosTimeout > spinForTimeoutThreshold)
  23. LockSupport.parkNanos(this, nanosTimeout);
  24. if (Thread.interrupted())
  25. throw new InterruptedException();
  26. }
  27. } finally {
  28. if (failed)
  29. /**
  30. * 将node的waitStatus置为CANCEL,并移除队列。如果移除的是head的后继节点,
  31. * 唤醒移除的节点的后继节点
  32. **/
  33. cancelAcquire(node);
  34. }
  35. }

cancelAcquire:节点取消竞争获取锁

  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn't exist
  3. if (node == null)
  4. return;
  5. //1. node不再关联到任何线程
  6. node.thread = null;
  7. //2. 跳过被cancel的前继node,找到一个有效的前继节点pred
  8. // Skip cancelled predecessors
  9. Node pred = node.prev;
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. // predNext is the apparent node to unsplice. CASes below will
  13. // fail if not, in which case, we lost race vs another cancel
  14. // or signal, so no further action is necessary.
  15. Node predNext = pred.next;
  16. //3. 将node的waitStatus置为CANCELLED
  17. // Can use unconditional write instead of CAS here.
  18. // After this atomic step, other Nodes can skip past us.
  19. // Before, we are free of interference from other threads.
  20. node.waitStatus = Node.CANCELLED;
  21. //4. 如果node是tail,更新tail为pred,并使pred.next指向null
  22. // If we are the tail, remove ourselves.
  23. if (node == tail && compareAndSetTail(node, pred)) {
  24. compareAndSetNext(pred, predNext, null);
  25. } else {
  26. // If successor needs signal, try to set pred's next-link
  27. // so it will get one. Otherwise wake it up to propagate.
  28. //
  29. int ws;
  30. //5. 如果node既不是tail,又不是head的后继节点
  31. //则将node的前继节点的waitStatus置为SIGNAL
  32. //并使node的前继节点指向node的后继节点
  33. if (pred != head &&
  34. ((ws = pred.waitStatus) == Node.SIGNAL ||
  35. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  36. pred.thread != null) {
  37. Node next = node.next;
  38. if (next != null && next.waitStatus <= 0)
  39. compareAndSetNext(pred, predNext, next);
  40. } else {
  41. //6. 如果node是head的后继节点,则直接唤醒node的后继节点
  42. unparkSuccessor(node);
  43. }
  44. node.next = node; // help GC
  45. }
  46. }

acquire:获取独占锁方法

  1. public final void acquire(int arg) {
  2. //如果tryAcquire失败(返回false),则调用acquireQueued方法,
  3. //将当前线程加入到等待队列中,并中断当前线程,等待唤醒
  4. if (!tryAcquire(arg) &&
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }

tryAcquire()是子类实现的方法,AQS中直接抛出异常,重写了tryAcquire()即竞争独占锁逻辑。

ReentrantLock lock()

ReentrantLock包含3个内部类:一个抽象类Sync,两个Sync的子类FairSync(公平锁)和NonfairSync(非公平锁),ReentrantLock调用lock()方法即调用的Sync的lock()方法,Sync的lock()抽象方法即多态的方式调用两个子类中具体实现的一个。
下面为FairSync类的代码,调用AQS的acquire()方法,重写tryAcquire()实现了lock();

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. /**
  7. * Fair version of tryAcquire. Don't grant access unless
  8. * recursive call or no waiters or is first.
  9. */
  10. protected final boolean tryAcquire(int acquires) {
  11. final Thread current = Thread.currentThread();
  12. int c = getState();
  13. if (c == 0) {
  14. if (!hasQueuedPredecessors() &&
  15. compareAndSetState(0, acquires)) {
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }
  19. }
  20. else if (current == getExclusiveOwnerThread()) {
  21. int nextc = c + acquires;
  22. if (nextc < 0)
  23. throw new Error("Maximum lock count exceeded");
  24. setState(nextc);
  25. return true;
  26. }
  27. return false;
  28. }
  29. }

与非公平锁的tryAcquire()方法相比,公平锁只是多了hasQueuedPredecessors()一个判断条件,hasQueuedPredecessors()是判断阻塞队列(AQS维护的tail、head)存在更久的等待锁的节点,如果存在不会尝试CAS进行锁竞争,老老实实排队(体现公平原则)。
这里的tryAcquire()方法体现了一个DRY(Don’t Repeat Yourself)原则的解释,代码重复不等于功能语义重复,并不一定需要优化。DRY原则侧重表达的是不要写语义重复的代码,语义不重复但代码相似度高的代码是可以接受的。