首先,来看看Java实现线程安全的几种方式:

  • 互斥同步:synchronized和ReentrantLock,可视为悲观锁
  • 非阻塞同步:CAS,AtomicXxxx,可视为乐观锁
  • 无同步方案:栈封闭,Thread Local,可重入代码

    1. CAS原子指令

  • CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,经过调查发现,其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些AtomicInteger类便是使用了这些封装后的接口。

  • 简单解释:CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。 CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁。JDK中大量使用了CAS来更新数据而防止加锁(这里特指synchronized重量级锁)来保持原子更新。
  • 通过JMM我们可以知道,synchronized关键字是重量级锁,底层使用监视器实现,意味着阻塞、性能低;而JUC包里的Lock类(使用AQS实现),其底层就大量使用了CAS原语,CAS相比而言更轻量,通过自旋而非阻塞的形式保证程序的性能,某种角度来说,CAS可以看作为乐观锁,synchronized为悲观锁。

    1.1 CAS的问题

    ABA问题

  • 因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

  • ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A,这也是完整的乐观锁实现方式
  • 从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

    循环时间长开销大

  • 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令,那么效率会有一定的提升。

  • pause指令有两个作用:第一,它可以延迟流水线执行命令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起CPU流水线被清空(CPU Pipeline Flush),从而提高CPU的执行效率。

    只能保证一个共享变量的原子操作

  • 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。

  • 还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。 从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。

    1.2 UnSafe类

  • Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。

  • 这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。
  • Unsafe类的底层调用的是C++编写的native方法,因此具备直接操作硬件的能力(Java是不具备这种能力的),CAS指令就需要这样的方法才能实现。

    底层的核心方法

    ```java public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);

public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);

public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);

  1. - Unsafe只提供了3CAS方法:compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong。都是C++编写的native方法。
  2. - 这些方法会先判断系统的CPU类型,进而使用其指令集直接操作系统硬件来实现相关的CAS操作。
  3. <a name="m35Yv"></a>
  4. ## 1.3 三大原子操作类
  5. Java的基本类型(intboolean)、数组类型、普通引用类型是非线程安全的,他们并不能保证原子性,因此即便用volatile关键字修饰,也不能确保线程安全,JUC针对这三大类,基于CAS提供了对应的AtomicXXX类,以此确保在不加synchronized重量级锁的情况下达到线程安全的目的。
  6. <a name="XFPmm"></a>
  7. ### 原子更新基本类型
  8. - AtomicBoolean: 原子更新布尔类型。
  9. - AtomicInteger: 原子更新整型。
  10. - AtomicLong: 原子更新长整型。
  11. - 这几个类的实现原理都是使用volatile修饰基本值,然后提供基于CAS实现的修改方法,例如:自增、自减、设置新值、获取当前值等。
  12. <a name="4rGBj"></a>
  13. ### 原子更新数组
  14. 通过原子的方式更新数组里的某个元素,Atomic包提供了以下的4个类:
  15. - AtomicIntegerArray: 原子更新整型数组里的元素。
  16. - AtomicLongArray: 原子更新长整型数组里的元素。
  17. - AtomicReferenceArray: 原子更新引用类型数组里的元素。
  18. - 这三个类的最常用的方法是如下两个方法:
  19. - get(int index):获取索引为index的元素值。
  20. - compareAndSet(int i,E expect,E update): 如果当前值等于预期值,则以原子方式将数组位置i的元素设置为update值。
  21. <a name="TiAV2"></a>
  22. ### 原子更新引用类型
  23. Atomic包提供了以下三个类(注意,更新的是整个引用对象,而非对象里具体的某个字段):
  24. - AtomicReference: 原子更新引用类型。
  25. - AtomicStampedReference: 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。
  26. - AtomicMarkableReferce: 原子更新带有标记位的引用类型。
  27. - 这三个类提供的方法都差不多,首先构造一个引用对象,然后把引用对象setAtomic类,然后调用compareAndSet等一些方法去进行原子操作,原理都是基于Unsafe实现。
  28. <a name="sR6Gb"></a>
  29. ### 原子更新字段类
  30. Atomic包提供了四个类进行原子字段更新(这里就是对象里具体的某个字段了):
  31. - AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
  32. - AtomicLongFieldUpdater: 原子更新长整型字段的更新器。
  33. - AtomicStampedFieldUpdater: 原子更新带有版本号的引用类型的更新器。
  34. - AtomicReferenceFieldUpdater: 原子更新引用类型字段的更新器。
  35. - 这四个类的使用方式都差不多,是基于反射的原子更新字段的值。要想原子地更新字段类需要两步:
  36. - 第一步,因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
  37. - 第二步,更新类的字段必须使用public volatile修饰,若字段不具备可见性,CAS也就无法操作(因为要对比值)。
  38. <a name="Mn6oj"></a>
  39. # 2. Java线程的五大状态
  40. ![](https://cdn.nlark.com/yuque/0/2021/jpeg/772705/1614136057901-d64cc250-ec6e-4a52-8891-83c4f44bed8c.jpeg#align=left&display=inline&height=436&margin=%5Bobject%20Object%5D&originHeight=480&originWidth=700&size=0&status=done&style=none&width=636)<br />
  41. - **新建(New):** 线程对象被创建后,就进入了新建状态。例如,Thread thread = new Thread()。
  42. - **就绪(Runnable):** 也被称为“可执行状态”。线程对象被创建后,其它线程调用了该对象的start()方法,从而来启动该线程。例如,thread.start()。处于就绪状态的线程,随时可能被CPU调度执行。
  43. - **运行(Running):** 线程获取CPU权限进行执行。需要注意的是,线程只能从就绪状态进入到运行状态。
  44. - **阻塞(Blocked):** 阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
  45. - 等待阻塞 -- 通过调用线程的wait()方法,让线程等待某工作的完成。
  46. - 同步阻塞 -- 线程在获取synchronized同步锁失败(因为锁被其它线程所占用),它会进入同步阻塞状态。
  47. - 其他阻塞 -- 通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、I/O处理完毕时,线程重新转入就绪状态。
  48. - **死亡(Dead):** 线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
  49. <a name="Vp7K8"></a>
  50. ## 2.1 为什么线程需要阻塞?
  51. - 通常在后端服务中,真正属于cpu处理的时间其实非常短,但为什么访问仍旧很慢呢?因为大量的时间损耗在了IO上,无论是数据库IO、文件IO,还是网络IO,都需要等待一段时间才能收到结果,此时若线程一直占用cpu等待结果,就会显得非常浪费,因此不如将其阻塞,让cpu先处理其他线程,等到结果返回了,再唤醒执行。
  52. - 当多个线程同时访问一个资源时,必定引发竞争问题,阻塞可以保证线程安全。
  53. <a name="6wmm8"></a>
  54. ### 线程同步的意义
  55. - 线程同步其实就是**线程排队**,为了防止**多个线程**访问**共享资源**时造成冲突,就需要大家排好队,按先后次序访问,避免冲突。
  56. - 多个线程访问共享资源的代码有可能是同一份代码,也有可能是不同的代码;无论是否执行同一份代码,只要这些线程的代码访问同一份可变的**共享资源**,这些线程之间就需要同步。
  57. <a name="ur4OO"></a>
  58. # 3. LockSupport阻塞原语
  59. - LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。简而言之,当调用LockSupport.park时,表示当前线程将会等待,直至获得许可,当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行。
  60. - 它和CAS指令共同构建了JUC线程安全类的基石,即AQS框架。
  61. <a name="pAVXk"></a>
  62. ## 3.2 核心函数
  63. LockSupport仅有两个核心函数,这两个函数都是基于Unsafe类中定义的parkunpark函数的重载版本(正常开发是不允许使用Unsafe类的),底层都是基于C++实现的native方法。
  64. ```java
  65. public native void park(boolean isAbsolute, long time);
  66. public native void unpark(Thread thread);
  • park函数,阻塞线程,并且该线程在下列情况发生之前都会被阻塞:
    1. 调用unpark函数,释放该线程的许可。
    2. 该线程被中断。
    3. 设置的阻塞时间到了,并且当time为绝对时间时,isAbsolute为true,否则,isAbsolute为false。当time为0时,表示无限等待,直到unpark发生。
  • unpark函数,释放线程的许可,即激活被park阻塞的线程。这个函数不是安全的,调用这个函数时要确保线程依旧存活。

    park函数

    park函数有两个重载版本,两个函数的区别在于park()函数没有没有blocker参数,因此没有给线程设置parkBlocker字段:

    1. public static void park();
    2. public static void park(Object blocker);

    来看看第二个有参版本的park函数源码,非常的简洁:

    1. public static void park(Object blocker) {
    2. // 获取当前线程
    3. Thread t = Thread.currentThread();
    4. // 设置Blocker
    5. setBlocker(t, blocker);
    6. // 获取许可,阻塞线程
    7. UNSAFE.park(false, 0L);
    8. // 恢复运行后设置Blocker为null
    9. setBlocker(t, null);
    10. }
  • 流程:获取当前线程→设置Blocker字段→调用Unsafe类的park函数→将Blocker设为null

  • 那么,为什么要在此park函数中要调用两次setBlocker函数呢?原因其实很简单,调用park函数时,当前线程首先设置好Blocker字段,接着调用Unsafe的park函数,这时候当前线程就被阻塞了,也就是加锁;在该线程的unpark函数被调用后,末尾的setBlocker函数会将Blocker重置为空,这里可以视为一次锁的释放。
  • 如果没有第二个setBlocker,那么后续线程会拿到前一个线程设置的Blocker对象,这就非常不科学了。

另外一个无参重载版本,park()函数如下:

  1. public static void park() {
  2. // 获取许可,设置时间为无限长,直到可以获取许可
  3. UNSAFE.park(false, 0L);
  4. }

调用了该函数后,会直接禁用当前线程,除非许可可用。在以下三种情况之一发生之前,当前线程都将处于无限休眠状态,即下列情况发生时,当前线程会获取许可,可以继续运行。

  • 其他某个线程将当前线程作为目标调用 unpark。
  • 其他某个线程中断当前线程。
  • 该调用不合逻辑地(即毫无理由地)返回。

    parkNanos函数

    此函数会在许可可用前禁用当前线程,并最多等待指定的时间,到期后会自动释放,具体函数如下:
    1. public static void parkNanos(Object blocker, long nanos) {
    2. if (nanos > 0) { // 时间大于0
    3. // 获取当前线程
    4. Thread t = Thread.currentThread();
    5. // 设置Blocker
    6. setBlocker(t, blocker);
    7. // 获取许可,并设置了时间
    8. UNSAFE.park(false, nanos);
    9. // 设置许可
    10. setBlocker(t, null);
    11. }
    12. }

    parkUntil函数

    此函数表示在指定的时限前禁用当前线程,除非许可可用,与上一个函数的区别是,需要指定一个具体的时间点(Unix时间戳)作为释放时机,而非过期时长:
    1. public static void parkUntil(Object blocker, long deadline) {
    2. // 获取当前线程
    3. Thread t = Thread.currentThread();
    4. // 设置Blocker
    5. setBlocker(t, blocker);
    6. UNSAFE.park(true, deadline);
    7. // 设置Blocker为null
    8. setBlocker(t, null);
    9. }

    4. Java四种阻塞方式的区别

    | 函数 | 是否释放锁 | 唤醒机制 | | :—-: | :—-: | :—-: | | Thread.sleep() | 否 | 1、必须传入阻塞时长。
    2、到期后自动唤醒,期间外部无法介入。
    3、其底层就是一个独立的native方法。
    4、抛出了中断异常,需要处理。
    5、由于没有释锁,所以唤醒后一定执行。 | | Object.wait() | 是 | 1、可传时间,自动唤醒。
    2、可不传时间,必须由其他线程主动使用Object.notify()唤醒。
    3、抛出了中断异常,需要处理。
    4、由于释锁了,唤醒后不一定执行,如果锁被其他线程占用,那么线程就只能等待。
    5、必须严格按wait→notify的顺序执行,否则报错。 | | Condition.await() | 是 | 1、和wait()基本一致,只是底层使用了LockSupport.park()实现阻塞,但相比而言会主动释锁。
    2、由于主动释锁,唤醒后也不一定会执行,同wait()。
    3、由于底层使用了park,因此不需要处理中断异常。 | | LockSupport.park() | 否 | 1、底层借助于Unsafe类的native方法。
    2、可不传时间,由LockSupport.unpark()唤醒。
    3、有多个重载方法,可支持个性化时间定制。
    4、不会抛出中断异常,无需处理。
    5、唤醒后一定会执行,因为阻塞前不会释锁。
    6、park和unpark执行顺序可颠倒,不会报错。 |

4.1 二元信号量

  • park()、unpark()底层的原理是“二元信号量”,可以把它想象成只有一个许可证的Semaphore,只不过这个信号量在重复执行unpark()的时候也不会再增加许可证,整个过程有且仅有一个许可证。
  • Linux中,基于system V的进程间通信包含一种称为二元信号量的通信方式,但严格意义上来说,信号量并不具备数据交换的功能,它本质是一把数据操作锁,通过控制其他的通信资源(⽂件,外部设备)来实现进程间通信,它本⾝只是⼀种外部资源的标识。信号量在此过程中负责数据操作的互斥、同步等功能。

    为什么要使用信号量

  • 为了防⽌出现因多个程序同时访问⼀个共享资源⽽引发的⼀系列问题,我们需要⼀种⽅法,它可以通过⽣成并使⽤令牌来做资源授权,且在任⼀时刻只能有⼀个执⾏线程可以访问代码的临界区域。

  • 临界区域是指执⾏数据更新的代码需要独占式地执⾏。⽽信号量就可以提供这样的⼀种访问机制,让⼀个临界区同⼀时间只有⼀个线程在访问它, 也就是说信号量是⽤来协调进程对共享资源的访问的,其中共享内存的使⽤就要⽤到信号量。
  • 信号量本质就是一个数字,当为1时,有进程来需要使用这个邻界资源,那么要对信号量减一将其变为0,而减一的操作并不是一步执行的,而是由cpu从内存读入1到寄存器中,再对1减一,最后再将0写入到内存中,可见,有三步操作,因此信号量是天生具备原子性的。

    5. 核心框架:AQS

  • 基于CAS原子操作和LockSupport类,Java实现了一个用来构建锁和同步器的框架,其包含一个核心抽象类:AbstractQueuedSynchronizer,因此简称为AQS。

  • 使用AQS能简单且高效地构造出应用广泛的大量同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

    5.1 AQS 核心思想

  • AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞、等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

  • CLH(Craig Landin and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系),AQS会将每条请求共享资源的线程封装进一个CLH锁队列,并成为其中的一个节点(Node),以此来实现锁的分配。
  • AQS使用一个int成员变量来表示同步状态,通过内置的队列来完成线程的排队工作。AQS使用CAS对该同步状态进行原子操作,并通过volatile关键字保证其可见性。
  • 状态信息通过procted类型的getState,setState,compareAndSetState进行操作

    1. //共享变量,使用volatile修饰保证线程可见性
    2. private volatile int state;
    3. //返回同步状态的当前值
    4. protected final int getState() {
    5. return state;
    6. }
    7. // 设置同步状态的值
    8. protected final void setState(int newState) {
    9. state = newState;
    10. }
    11. //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
    12. protected final boolean compareAndSetState(int expect, int update) {
    13. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    14. }

    5.2 AQS对资源的共享方式

    AQS定义两种资源共享方式

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock。

    设计模式

    不同的同步器争用共享资源的方式也不同,但我们在实现同步器时只需要实现共享资源 state 的获取与释放即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

AQS框架是基于模板方法模式编写的,如果需要自定义同步器,一般的方式是这样:

  • 使用者继承AbstractQueuedSynchronizer并重写指定的方法。这些重写方法很简单,无非是对于共享资源state的获取和释放)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

    1. isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    2. tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    3. tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    4. tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    5. tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
  • 默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

  • 以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    5.3 CLH虚拟队列

  • 上面提到过CLH是一个虚拟双向队列,作为队列,就要满足先进先出的原则,AQS底层借助链表来模拟队列结构。

  • Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。
  • Condition queue不是必须的,是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue,和同步队列的区别是:在此队列的线程将处于等待状态(调用await()后自动入队),不会尝试获取锁,直至调用signal()后,才会加入同步队列,开始竞争锁资源。

底层核心原理 - 图1

节点状态

可以发现,节点类的核心成员就是前驱、后继节点,以及节点的当前状态:

  • CANCELLED,值为1,表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。。
  • SIGNAL,值为-1,表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL,代表后继节点希望前驱尽快执行。
  • CONDITION,值为-2,表示当前节点在等待condition,也就是处于condition queue中,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。。
  • PROPAGATE,值为-3,表示共享模式下无条件传播唤醒事件,即除了唤醒后继,还会唤醒后继的后继。
  • 值为0,新节点进入同步队列后的默认状态,表示当前节点已处于sync queue中,等待着获取锁。

    5.4 核心方法

    acquire()独占模式获取资源

    该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:

    1. public final void acquire(int arg) {
    2. // 与操作符是顺序执行的,能执行acquireQueued则说明tryAcquire != true
    3. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4. selfInterrupt();
    5. }

    由源码可以发现当一个线程调用acquire时,流程如下:
    底层核心原理 - 图2
    整体逻辑:

  • 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中,默认会抛出一个异常,所以子类必须重写此方法,完成自己的逻辑,根据资源占用情况返回true、false。

  • 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue队列,排队等待执行。
  • 调用acquireQueued方法,此方法完成的功能是让Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

再来看acquireQueued方法的逻辑:

  1. 首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取资源,则说明当前节点排第二顺位,马上就到自己执行,那么当前节点就能够尝试获取资源(占有锁)。若成功就将当前节点设置为头结点并返回,否则进入finally块中放弃获取资源。
  2. 若步骤 1 未能获取到资源,则判断是否需要park自己,判断原理是查看前驱结点的状态是否为SIGNAL(-1),若是,则说明上一节点已经unpark并在执行了,那么当前结点自然需要park自己,并等待前驱执行完毕。
  3. 若park了自己,之后某个线程又对本线程unpark、并且本线程也获得机会运行,那么将会继续进行步骤 1 的判断。

    release()独占模式释放资源

    该方法以独占模式释放资源,实际上就是判断当前的头节点是否有效,然后unpark第二顺位的节点线程,源码如下:
    1. public final boolean release(int arg) {
    2. if (tryRelease(arg)) { // 释放成功
    3. // 保存头结点
    4. Node h = head;
    5. if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0
    6. unparkSuccessor(h); //释放头结点的后继结点
    7. return true;
    8. }
    9. return false;
    10. }

    5.5 AQS总结

    对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析:
  • 每一个结点都是由前一个结点唤醒(队列特性,先进先出)。
  • 当结点发现前驱结点是head并且尝试获取资源成功时,则会轮到该节点对应的线程运行。
    • 在发现前驱节点是头结点时,前驱节点甚至都已经执行完毕了,cpu速度是非常快的,因此这时候自己直接执行并无不妥
  • condition queue中的结点向sync queue中转移是通过signal操作完成的,而线程调用await后会进入condition queue。
  • 当结点的状态为SIGNAL时,表示后面的结点需要运行,这是一种督促自己尽快运行的信号。

    6. JUC两大锁类

    6.1 ReentrantLock

  • ReentrantLock(可重入锁)实现了Lock接口,Lock接口中定义了lock与unlock操作,并且还存在newCondition方法,表示生成一个条件。

  • ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类:

底层核心原理 - 图3
Sync是内部基类,他继承自AQS抽象类,并实现了具体的方法,而NonfairSync与FairSync类顾名思义分别是非公平锁和公平锁。

NonfairSync非公平锁

  1. static final class NonfairSync extends Sync {
  2. final void lock() {
  3. if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
  4. // 把当前线程设置独占了锁
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else // 锁已经被占用,或者set失败
  7. // 以独占模式获取对象,忽略中断
  8. acquire(1);
  9. }
  10. protected final boolean tryAcquire(int acquires) {
  11. return nonfairTryAcquire(acquires);
  12. }
  13. }

可以发现,非公平锁的lock方法非常粗暴地查询锁、获取锁,如果刚好这时候锁被释放了,它就可以直接抢占锁资源,而不是排入同步队列尾部等待。

FairSyn公平锁

  1. static final class FairSync extends Sync {
  2. final void lock() {
  3. // 以独占模式获取对象,忽略中断
  4. acquire(1);
  5. }
  6. // 尝试公平获取锁
  7. protected final boolean tryAcquire(int acquires) {
  8. // 获取当前线程
  9. final Thread current = Thread.currentThread();
  10. // 获取状态
  11. int c = getState();
  12. if (c == 0) { // 状态为0
  13. if (!hasQueuedPredecessors() &&
  14. compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功
  15. // 设置当前线程独占
  16. setExclusiveOwnerThread(current);
  17. return true;
  18. }
  19. }
  20. else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据
  21. // 下一个状态
  22. int nextc = c + acquires;
  23. if (nextc < 0) // 超过了int的表示范围
  24. throw new Error("Maximum lock count exceeded");
  25. // 设置状态
  26. setState(nextc);
  27. return true;
  28. }
  29. return false;
  30. }
  31. }

公平锁则重写了AQS的tryAcquire方法,在获取锁之前,它会先判断当前同步队列里是否有等待了更长时间的线程,有的话就把自己加入队列尾部,而不是直接插队抢锁,这里也就体现了公平原则。

6.2 ReentrantReadWriteLock

  • ReentrantReadWriteLock(可重入读写锁)将锁分成了读写两部分,读锁ReadLock和写锁WriteLock,可以通过这两种锁实现线程间的同步。
  • 它的底层也是基于AQS实现的,内部类的关系如下,相比ReentrantLock来说,它在对Lock接口的实现上做了拆分,区分了读和写:

底层核心原理 - 图4

6.2.1 核心类:Sync

可重入读写锁的业务代码最终都会转入Sync类中处理,因此,只分析该内部类即可。

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. // 计数器
  3. static final class HoldCounter {
  4. // 计数
  5. int count = 0;
  6. // Use id, not reference, to avoid garbage retention
  7. // 获取当前线程的TID属性的值
  8. final long tid = getThreadId(Thread.currentThread());
  9. }
  10. // 本地线程计数器
  11. static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  12. // 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
  13. public HoldCounter initialValue() {
  14. return new HoldCounter();
  15. }
  16. }
  17. }
  • Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。
  • ThreadLocalHoldCounter则继承自ThreadLocal类(该类存储线程的信息), 并且重写了其中的initialValue方法。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

    6.2.2 构造器与成员属性

  • 注意成员属性里有:读锁、写锁线程的最大量,本地线程计数器

  • 注意构造函数中设置了本地线程计数器和AQS的状态state。

    1. abstract static class Sync extends AbstractQueuedSynchronizer {
    2. // 版本序列号
    3. private static final long serialVersionUID = 6317671515068378041L;
    4. // 高16位为读锁,低16位为写锁
    5. static final int SHARED_SHIFT = 16;
    6. // 读锁单位
    7. static final int SHARED_UNIT = (1 << SHARED_SHIFT);
    8. // 读锁最大数量
    9. static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
    10. // 写锁最大数量
    11. static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    12. // 本地线程计数器
    13. private transient ThreadLocalHoldCounter readHolds;
    14. // 缓存的计数器
    15. private transient HoldCounter cachedHoldCounter;
    16. // 第一个读线程
    17. private transient Thread firstReader = null;
    18. // 第一个读线程的计数
    19. private transient int firstReaderHoldCount;
    20. // 构造函数
    21. Sync() {
    22. // 本地线程计数器
    23. readHolds = new ThreadLocalHoldCounter();
    24. // 设置AQS的状态
    25. setState(getState());
    26. }
    27. }

    6.2.3 核心函数

    读写锁计数

    1. abstract static class Sync extends AbstractQueuedSynchronizer {
    2. // 读锁数量
    3. static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
    4. // 写锁数量
    5. static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    6. }
  • 正常来说,我们可以用两个int变量来表示占有读、写锁的数量,但JDK非常巧妙的只用一个state变量即可表示,原理就是通过位操作符,分别取高16位作为读锁,低16位做写锁。

  • 假设同步状态为s,则写状态为s & 0x0000FFFF,相当于高16位全部抹除,置为0,这样就只剩低16位;同理,读状态则为s >>> 16,也就是右移16位,相当于高位补0,把写锁的低16位去掉了。然后再判断s是否大于0, s>0,有读锁,反之没有。
  • 比较复杂的就是读状态变化的时候,不是简单的s +1,因为这样的话就加到了写操作上。因此这里使用s + 0x00010000,即把低位全部补上0。类似的与操作、位移等等,JDK中都有大量的应用,比如hashmap中确定元素所在链表等操作都有应用。

    释放写锁tryRelease()

  • 此函数用来释放写锁,先判断该线程是否为独占线程,若不为独占线程,则抛出异常;否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。

    获取写锁tryAcquire()

  • 此函数用于获取写锁,首先会获取state,判断是否为0,若为0,表示此时没有读锁线程。

  • 再判断写线程是否应该被阻塞,若在非公平策略下,总是不会被阻塞;若在公平策略下,会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞)。
  • 之后设置状态state,然后返回true。若state不为0,则表示此时存在读锁或写锁线程,若写锁线程数量为0或者当前线程为独占锁线程,则返回false,表示不成功;否则,判断写锁线程的重入次数是否大于了最大值,若是,则抛出异常,否则,设置状态state,返回true,表示成功。

    释放读锁tryReleaseShared()

  • 首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为null,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;

  • 若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 );
  • 若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器;
  • 若计数器的计数count小于等于1,则移除当前线程对应的计数器;
  • 如果计数器的count小于等于0,则抛出异常,之后再减少计数即可。无论何种情况,都会进入无限循环,该循环可以确保成功设置状态state。

    获取读锁tryAcquireShared()

  • 首先判断写锁是否为0并且当前线程不占有独占锁,若不满足上述条件直接返回-1;

  • 然后判断读线程是否需要被阻塞、读锁数量是否小于最大值、状态是否设置成功,若当前没有读锁,则设置第一个读线程firstReader为当前线程,并把firstReaderHoldCount设为1;
  • 若当前线程线程为第一个读线程,则对firstReaderHoldCount加1;否则,将设置当前线程对应的HoldCounter对象的值。

    构造函数

    1. abstract static class Sync extends AbstractQueuedSynchronizer {
    2. public ReentrantReadWriteLock() {
    3. this(false);
    4. }
    5. public ReentrantReadWriteLock(boolean fair) {
    6. // 公平策略或者是非公平策略
    7. sync = fair ? new FairSync() : new NonfairSync();
    8. // 读锁
    9. readerLock = new ReadLock(this);
    10. // 写锁
    11. writerLock = new WriteLock(this);
    12. }
    13. }
  • 空参构造器会构造非公平锁,带参构造器可自由选择非公平或公平锁。

    6.2.4 什么是锁升降级

  • 锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。

  • 锁降级的过程是:先把持住当前拥有的写锁,再获取到读锁,随后释放先前拥有的写锁。
  • 锁降级中读锁的获取是否必要呢?答案是必要的。
    • 降级主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新,此时发生脏读。
    • 如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞(不可写),直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
  • RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,假如读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,一旦锁升级,写锁发生的数据更新对其他获取到读锁的线程是不可见的。

    7. 新增的StampedLock

    JDK1.8新增了一个锁类StampedLock。

ReentrantReadWriteLock 在沒有任何读写锁时,才可以取得写入锁,这可用于实现了悲观读取(Pessimistic Reading),即如果执行中进行读取时,经常可能有另一执行要写入的需求,为了保持同步,ReentrantReadWriteLock 的读取锁定就可派上用场。

然而,如果读取执行情况很多,写入很少的情况下,使用 ReentrantReadWriteLock 可能会使写入线程遭遇饥饿(Starvation)问题,也就是写入线程吃吃无法竞争到锁定而一直处于等待状态。

StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。

所谓的乐观读模式,也就是若读的操作很多,写的操作很少的情况下,你可以乐观地认为,写入与读取同时发生几率很少,因此不悲观地使用完全的读取锁定,程序可以在查看读取资料之后,根据是否遭到写入执行的变更,再采取后续的措施(重新读取变更信息,或者抛出异常) ,这一个小小改进,可大幅度提高程序的吞吐量!!