AbstractQueuedSynchronizer抽象类(以下简称AQS)是整个java.util.concurrent包的核心,该包中的大多数同步器都是基于AQS来构建的。
AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable

CountDownLatch

用来控制一个或者多个线程等待多个线程。

维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

图片.png

  1. public class CountdownLatchExample {
  2. public static void main(String[] args) throws InterruptedException {
  3. final int totalThread = 10;
  4. CountDownLatch countDownLatch = new CountDownLatch(totalThread);
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < totalThread; i++) {
  7. executorService.execute(() -> {
  8. System.out.print("run..");
  9. countDownLatch.countDown();
  10. });
  11. }
  12. countDownLatch.await();
  13. System.out.println("end");
  14. executorService.shutdown();
  15. }
  16. }
  1. run..run..run..run..run..run..run..run..run..run..end

CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }
  7. public CyclicBarrier(int parties) {
  8. this(parties, null);
  9. }

图片.png

  1. public class CyclicBarrierExample {
  2. public static void main(String[] args) {
  3. final int totalThread = 10;
  4. CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < totalThread; i++) {
  7. executorService.execute(() -> {
  8. System.out.print("before..");
  9. try {
  10. cyclicBarrier.await();
  11. } catch (InterruptedException | BrokenBarrierException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.print("after..");
  15. });
  16. }
  17. executorService.shutdown();
  18. }
  19. }
  1. before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

  1. public class SemaphoreExample {
  2. public static void main(String[] args) {
  3. final int clientCount = 3;
  4. final int totalRequestCount = 10;
  5. Semaphore semaphore = new Semaphore(clientCount);
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. for (int i = 0; i < totalRequestCount; i++) {
  8. executorService.execute(()->{
  9. try {
  10. semaphore.acquire();
  11. System.out.print(semaphore.availablePermits() + " ");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } finally {
  15. semaphore.release();
  16. }
  17. });
  18. }
  19. executorService.shutdown();
  20. }
  21. }
  1. 2 1 2 2 2 2 2 1 2 2

同步状态

AQS中维护了一个名叫state的字段,是由volatile修饰的,它就是所谓的同步状态

  1. private volatile int state;

与state字段的相关方法

方法名 描述
protected final int getState() 获取state
的值
protected final void setState(int newState) 设置state
的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS
方式更新state
的值

同步队列

AQS中维护了同步队列,这个队列的节点类Node被定义成了一个静态内部类,Node类中有一个Thread类型的字段,这表明每一个节点都代表一个线程。

  1. static final class Node {
  2. static final Node SHARED = new Node();
  3. static final Node EXCLUSIVE = null;
  4. static final int CANCELLED = 1;
  5. static final int SIGNAL = -1;
  6. static final int CONDITION = -2;
  7. static final int PROPAGATE = -3;
  8. volatile int waitStatus;
  9. volatile Node prev;
  10. volatile Node next;
  11. volatile Thread thread;
  12. Node nextWaiter;
  13. }

任何想要获得锁的线程都需要来竞争同步状态,获得锁的线程可以继续业务流程的执行,而没有获得锁的线程会被放到一个FIFO的队列中去,等待再次竞争同步变量来获得锁,AbstractQueuedSynchronizer为每个没有获得锁的线程封装成一个Node再放到队列中去,AQS中定义一个头节点引用,一个尾节点引用。可以在队列上进行诸如插入和移除操作

  1. private transient volatile Node head;
  2. private transient volatile Node tail;

静态变量 值 描述 Node.CANCELLED 1 节点对应的线程已经被取消了(我们后边详细会说线程如何被取消) Node.SIGNAL -1 表示后边的节点对应的线程处于等待状态 Node.CONDITION -2 表示节点在等待队列中(稍后会详细说什么是等待队列) Node.PROPAGATE -3 表示下一次共享式同步状态获取将被无条件的传播下去(稍后再说共享式同步状态的获取与释放时详细唠叨) 无 0 初始状态

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始化状态

独占模式Exclusive

在独占模式下,同一个时刻只能有一个线程获取到同步状态。其他同时去获取同步状态的线程会被包装成一个Node节点放到同步队列中,直到获取到同步状态的线程释放掉同步状态才能继续执行。在独占模式需要我们定义AQS的子类并且重写下边这些方法:

方法名 描述
protected boolean tryAcquire(int arg) 独占式的获取同步状态,获取成功返回true,否则false
protected boolean tryRelease(int arg) 独占式的释放同步状态,释放成功返回true,否则false
protected boolean isHeldExclusively() 在独占模式下,如果当前线程已经获取到同步状态,则返回 true;其他情况则返回 false

AQS里定义了一些调用它们的方法,这些方法都是由public final修饰的:

方法名 描述
void acquire(int arg) 独占式获取同步状态,如果获取成功则返回,如果失败则将当前线程包装成Node
节点插入同步队列中。
void acquireInterruptibly(int arg) 与上个方法意思相同,只不过一个线程在执行本方法过程中被别的线程中断,则抛出InterruptedException
异常。
boolean tryAcquireNanos(int arg, long nanos) 在上个方法的基础上加了超时限制,如果在给定时间内没有获取到同步状态,则返回false
,否则返回true
boolean release(int arg) 独占式的释放同步状态。

acquire(int)

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

函数流程如下:

  1. tryAcquire()尝试直接去获取同步状态,如果返回true则结束;
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    tryAcquire(int)

    此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。AQS是一个抽象类,在
    tryAcquire只是抛出了异常,具体资源的获取/释放方式交由自定义同步器去实现 ```java protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
  1. <a name="ZN4di"></a>
  2. #### addWaiter(Node)
  3. 此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
  4. ```java
  5. private Node addWaiter(Node mode) {
  6. Node node = new Node(Thread.currentThread(), mode); //构造一个新节点
  7. Node pred = tail;
  8. if (pred != null) { //尾节点不为空,插入到队列最后
  9. node.prev = pred;
  10. if (compareAndSetTail(pred, node)) { //更新tail,并且把新节点插入到列表最后
  11. pred.next = node;
  12. return node;
  13. }
  14. }
  15. enq(node);
  16. return node;
  17. }
  18. private Node enq(final Node node) {
  19. for (;;) {
  20. Node t = tail;
  21. if (t == null) { //tail节点为空,初始化队列
  22. if (compareAndSetHead(new Node())) //设置head节点
  23. tail = head;
  24. } else { //tail节点不为空,开始真正插入节点
  25. node.prev = t;
  26. if (compareAndSetTail(t, node)) {
  27. t.next = node;
  28. return t;
  29. }
  30. }
  31. }
  32. }

acquireQueued

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor(); //获取前一个节点
  7. if (p == head && tryAcquire(arg)) { 前一个节点是头节点再次尝试获取同步状态
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

共享模式Share

方法名 描述
protected int tryAcquireShared(int arg) 共享式的获取同步状态,获取成功返回true,否则false
protected boolean tryReleaseShared(int arg) 共享式的释放同步状态,释放成功返回true,否则false

这个方法会调用我们自定义的AQS子类中的tryAcquireShared方法去获取同步状态,只不过tryAcquireShared的返回值是一个int值,该值不小于0的时候表示获取同步状态成功,则acquireShared方法直接返回;如果该返回值小于0的时候,表示获取同步状态失败,则会把该线程包装成Node节点插入同步队列

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }
  5. protected int tryAcquireShared(int arg) {
  6. throw new UnsupportedOperationException();
  7. }

doAcquireShared(int)

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);//加入队列尾部
  3. boolean failed = true;//是否成功标志
  4. try {
  5. boolean interrupted = false;//等待过程中是否被中断过的标志
  6. for (;;) {
  7. final Node p = node.predecessor();//前驱
  8. if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
  9. int r = tryAcquireShared(arg);//尝试获取资源
  10. if (r >= 0) {//成功
  11. setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
  12. p.next = null; // help GC
  13. if (interrupted)//如果等待过程中被打断过,此时将中断补上。
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
  20. if (shouldParkAfterFailedAcquire(p, node) &&
  21. parkAndCheckInterrupt())
  22. interrupted = true;
  23. }
  24. } finally {
  25. if (failed)
  26. cancelAcquire(node);
  27. }
  28. }

releaseShared(int)

释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }
  8. private void doReleaseShared() {
  9. for (;;) {
  10. Node h = head;
  11. if (h != null && h != tail) {
  12. int ws = h.waitStatus;
  13. if (ws == Node.SIGNAL) {
  14. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  15. continue; // loop to recheck cases
  16. unparkSuccessor(h);
  17. }
  18. else if (ws == 0 &&
  19. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  20. continue; // loop on failed CAS
  21. }
  22. if (h == head) // loop if head changed
  23. break;
  24. }
  25. }
  26. private void unparkSuccessor(Node node) {
  27. int ws = node.waitStatus;
  28. if (ws < 0)
  29. compareAndSetWaitStatus(node, ws, 0);
  30. Node s = node.next;
  31. if (s == null || s.waitStatus > 0) {
  32. s = null;
  33. for (Node t = tail; t != null && t != node; t = t.prev)
  34. if (t.waitStatus <= 0)
  35. s = t;
  36. }
  37. if (s != null)
  38. LockSupport.unpark(s.thread);
  39. }

模板模式

AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:

  1. 资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)
  2. 访问资源的线程如何进行并发管理?(等待队列)
  3. 如果线程等不及资源了,如何从等待队列退出?(超时/中断)

这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。
我们所熟知的ReentrantLock、CountDownLatch、CyclicBarrier等同步器,其实都是通过内部类实现了AQS框架暴露的API,以此实现各类同步器功能。这些同步器的主要区别其实就是对同步状态(synchronization state)的定义不同。AQS框架将剩下的一个问题留给用户:什么是资源?如何定义资源是否可以被访问?
我们来看下几个常见的同步器对这一问题的定义:

同步器 资源的定义
ReentrantLock 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数
CountDownLatch 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
Semaphore 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。
ReentrantReadWriteLock 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

Mutex(互斥锁)

Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。

  1. class Mutex implements Lock, java.io.Serializable {
  2. // 自定义同步器
  3. private static class Sync extends AbstractQueuedSynchronizer {
  4. // 判断是否锁定状态
  5. protected boolean isHeldExclusively() {
  6. return getState() == 1;
  7. }
  8. // 尝试获取资源,立即返回。成功则返回true,否则false。
  9. public boolean tryAcquire(int acquires) {
  10. assert acquires == 1; // 这里限定只能为1个量
  11. if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
  12. setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
  13. return true;
  14. }
  15. return false;
  16. }
  17. // 尝试释放资源,立即返回。成功则为true,否则false。
  18. protected boolean tryRelease(int releases) {
  19. assert releases == 1; // 限定为1个量
  20. if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
  21. throw new IllegalMonitorStateException();
  22. setExclusiveOwnerThread(null);
  23. setState(0);//释放资源,放弃占有状态
  24. return true;
  25. }
  26. }
  27. // 真正同步类的实现都依赖继承于AQS的自定义同步器!
  28. private final Sync sync = new Sync();
  29. //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
  30. public void lock() {
  31. sync.acquire(1);
  32. }
  33. //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
  34. public boolean tryLock() {
  35. return sync.tryAcquire(1);
  36. }
  37. //unlock<-->release。两者语文一样:释放资源。
  38. public void unlock() {
  39. sync.release(1);
  40. }
  41. //锁是否占有状态
  42. public boolean isLocked() {
  43. return sync.isHeldExclusively();
  44. }
  45. }