同步器 AbstractQueuedSynchronizer(简称同步器) 是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。

同步器主要使用方式是继承,子类通过继承同步器并实现它的部分方法来管理同步状态。

同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。独占锁模式下,每次只能有一个线程持有锁,比如上一章例子中的 ReentrantLock 就是以独占方式实现的互斥锁;共享锁模式下,允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock。

锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

同步器的接口和示例

同步器的设计是基于模板方法模式的,我们只需要继承同步器并重写指定的方法,然后将同步器组合在自定义同步组件的实现中。
image.png

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

方法名称 描述
protected final int getState() 获取当前同步状态
protected final void setState(int newState) 设置当前同步状态
protected final boolean compareAndSetState(int expect, int update) 使用 CAS 设置当前状态,该方法能够保证状态设置的原子性

同步器可重写的方法与描述如表所示。

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态
protected final void setState(int newState) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
方法名称 描述
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占
方法名称 描述
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占

实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述如表所示。

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则会返回,否则当前线程会进入同步队列等待,该方法需要调用重写的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 与 acquire(int arg) 相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 并返回
boolean tryAcquireNanos(int arg, long nanosTimeout) 在 acquireInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回 false,如果获取到了返回 true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与 acquireShared(int arg) 相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在 acquireSharedInterruptibly(int arg) 基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection getQueuedThreads() 获取等待在同步队列上的线程集合

同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

通过一个独占锁的示例来深入了解一下同步器的工作原理。

  1. /**
  2. * 独占锁
  3. */
  4. public class Mutex implements Lock {
  5. /**
  6. * 静态内部类,自定义同步器
  7. */
  8. private static class Sync extends AbstractQueuedSynchronizer {
  9. /**
  10. * 当状态为0的时候获取锁
  11. *
  12. * @param arg
  13. * @return
  14. */
  15. @Override
  16. protected boolean tryAcquire(int arg) {
  17. if (compareAndSetState(0, 1)) {
  18. setExclusiveOwnerThread(Thread.currentThread());
  19. return true;
  20. }
  21. return false;
  22. }
  23. /**
  24. * 释放锁,将状态设置为0
  25. *
  26. * @param arg
  27. * @return
  28. */
  29. @Override
  30. protected boolean tryRelease(int arg) {
  31. if (getState() == 0) {
  32. throw new IllegalMonitorStateException();
  33. }
  34. setExclusiveOwnerThread(null);
  35. setState(0);
  36. return true;
  37. }
  38. /**
  39. * 是否处于占用状态
  40. *
  41. * @return
  42. */
  43. @Override
  44. protected boolean isHeldExclusively() {
  45. return getState() == 1;
  46. }
  47. /**
  48. * 返回一个Condition
  49. *
  50. * @return
  51. */
  52. Condition newCondition() {
  53. return new ConditionObject();
  54. }
  55. }
  56. private final Sync sync = new Sync();
  57. @Override
  58. public void lock() {
  59. sync.acquire(1);
  60. }
  61. @Override
  62. public void lockInterruptibly() throws InterruptedException {
  63. sync.acquireInterruptibly(1);
  64. }
  65. @Override
  66. public boolean tryLock() {
  67. return sync.tryAcquire(1);
  68. }
  69. @Override
  70. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  71. return sync.tryAcquireNanos(1, unit.toNanos(time));
  72. }
  73. @Override
  74. public void unlock() {
  75. sync.release(1);
  76. }
  77. @Override
  78. public Condition newCondition() {
  79. return sync.newCondition();
  80. }
  81. }

测试代码。

  1. public class Test2 {
  2. private static Mutex lock = new Mutex();
  3. public static void test() {
  4. lock.lock();
  5. try {
  6. Thread.sleep(180000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println(Thread.currentThread().getName() + " test");
  11. lock.unlock();
  12. }
  13. public static void main(String[] args) {
  14. new Thread(() -> {
  15. test();
  16. }, "thread-1").start();
  17. new Thread(() -> {
  18. test();
  19. }, "thread-2").start();
  20. }
  21. }

假设 thread-1 先进入同步块,thread-2 在进入同步块的时候发现锁已经被 thread-1 占用,thread-2 进入同步队列等待。thread-2 进入同步队列的执行过程如下图所示。

image.png

thread-2 在执行 acquireQueued() 方法是会执行 LockSupport.park 方法,thread-2 进入等待状态,当 thread-1 执行完同步块后,执行 unlock() 方法,释放锁,并唤醒 thread-2 线程。

关于 acquireQueued() 方法会在同步器的原理分析中详细描述,该方法实现了节点自旋获取同步状态的行为。

image.png

参考

《Java并发编程的艺术》参考 5.2 队列同步器

作者:殷建卫 链接:https://www.yuque.com/yinjianwei/vyrvkf/gyxdeo 来源:殷建卫 - 架构笔记 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。