第一部分:Java多线程基础

第一节 进程与线程

1. 进程与线程的关系

  • 进程是一个“执行中的程序”,是系统进行资源分配和调度的一个独立单位。
  • 线程进程的一个实体,一个进程中一般拥有多个线程,线程是程序执行的最小单元

2.线程的状态有哪些

  • Java线程包括以下6种状态
  • 关系图

一般而言把Blocked、Timed_waiting、Waiting都称为阻塞状态。

3.线程的属性

编号

线程的id是从1开始的,我们自己创建的线程id早已不是2,启动一个子线程的同时启动了一些别的线程。

名称

线程的名字默认也是从thread_0开始。

守护线程(相对的是用户线程)

作用:给用户线程提供服务。

线程类型默认继承自父线程,如用户线程创建的都是用户线程。

以上两者线程的区别:整体无区别,唯一区别在于是否影响JVM的退出。

开发中不需要把线程设置为守护线程。

优先级

  • java中共10个级别,默认是5

  • 程序设计不应该依赖于优先级
    因为不同的操作系统的优先级不同,如linux就没有优先级;优先级会被操作系统改变,操作系统会根据线程的执行情况改变资源分配策略。

4.线程的异常

  • 线程的未捕获异常(UncaughtExceptionHandler)
    主线程轻松发现异常而子线程却不行(子线程抛出异常后停止,主线程继续运行)
    主线程中不能捕获子线程的异常

    • 解决方法一,在子线程内部捕获异常
    • 解决方法二,利用UncaughtExceptionHandler捕获,全局处理

第二节 实现线程的方式

(Oracle)官方文档有二种实现线程的方法:

继承 Thread 类

实现 Runnable 接口

1.实现Runnable接口

  1. public class MyRunnable implements Runnable {
  2. @Override
  3. public void run() {
  4. // ... 方法体,写需要执行的任务
  5. }
  6. }
  7. public static void main(String[] args) {
  8. MyRunnable target = new MyRunnable();
  9. Thread thread = new Thread(target);
  10. thread.start();
  11. }

2.继承 Thread类

  1. public class ThreadStyle extends Thread {
  2. @Override
  3. public void run() {
  4. // ... 任务
  5. }
  6. }
  7. public static void main(String[] args) {
  8. ThreadStyle mt = new ThreadStyle();
  9. mt.start();
  10. }

3.实现Runnable 接口VS 继承 Thread

实现Runnable接口会更好一些,因为:

  1. 解耦, 具体执行的任务和线程的创建销毁应该独立
  2. 继承Thread类每次都需要创建和销毁一个线程,开销大
  3. java类单继承,继承Thread类后不能继承其他类
  • Thread 类中的run方法
  1. public void run() {
  2. if (target != null) {
  3. target.run();
  4. }
  5. }

说明:实现Runnable 接口实际上是将target传给run方法执行,继承thread类是重写了run方法。

  • 同时使用两种方法
    1. public class BothRunnableThread{
    2. public static void main(String[] args){
    3. new Thread(new Runnable(){
    4. @Override
    5. public void run(){
    6. System.out.println("我来自Runnable");
    7. }){
    8. @Override
    9. public void run(){
    10. System.out.println("我来自Thread");
    11. }
    12. }
    13. }.start();
    14. }
    15. }
  1. 结果:我来自Thread

说明:第一个run方法是runnable 接口实现的run方法,第二个是Thread类重写的run方法 因为runnable是作为target递给Thread中的run方法去执行的,而此时thread中的run被重写,所以最终结果,是执行的thread中被重写的run。

4.总结

准确的讲,创建一个线程就只有一种方式,就是new Thread()对象,而实现线程的任务单元也只有两种方式:

  • 一种是继承Thread()类直接重写Thread类的run方法。

  • 另一种是实现Runnable()接口里面的run方法,把个target对象传递给Thread()线程,让run方法去调用target里面实现的run。

一些错误观点

  1. 线程池也是一种创建线程的方式。
    原因:线程池的本质还是用new thread(runnable) 实现的。

  2. 通过callable 和 future Task 也是一种创建线程的方式。

  1. 原因:本质还是实现Runnable接口

一些面试题

  1. 有多少种实现多线程的方法?
    答题思路: 从表面到源码,从表象到本质。
    表面看有两种,从源码看两者的本质相同只有一种。

  2. 实现Runnable接口和继承Thread类那种方式更好?

第三节 启动线程的方式

1.start()run()解读

start()方法:启动新线程, 通知JVM在空闲的时候执行线程。

  1. 两次调用start()会报错:IllegalThreadStateException(非法的线程状态)。
  1. public synchronized void start() {
  2. if (threadStatus != 0)//判断线程状态 0状态是new状态(就绪)
  3. throw new IllegalThreadStateException();
  4. group.add(this);
  5. boolean started = false;
  6. try {
  7. start0();//调用start0 native方法
  8. started = true;
  9. } finally {
  10. try {
  11. if (!started) {
  12. group.threadStartFailed(this);
  13. }
  14. } catch (Throwable ignore) {
  15. /* do nothing. If start0 threw a Throwable then
  16. it will be passed up the call stack */
  17. }
  18. }
  19. }
  1. 既然start()方法会调用run()方法,为什么我们不直接调用run()方法呢?
    new一个Thread,该线程进入了新建状态,调用start()方法,会启动一个线程并使该线程进入了就绪状态,当分配到时间片之后,该线程就可以运行了。
    start()方法会执行线程的相应准备工作,然后自动执行run()方法的内容,是真正的创建了一个线程去执行任务。
    直接运行run()方法,会把run()方法当成一个main线程下的普通方法去执行,并不会创建出一个新的线程去执行run()方法中写的任务。在main线程中调用run()方法后,要等run方法执行完毕后,才能继续执行接下来的方法,所以这不是一个真正的多线程工作。

第四节 停止线程

1.原理介绍

原理介绍:使用thread.interrupt();来通知,而不是强制

java中线程具有最高决定权,线程自身决定停不停至以及何时停止。一般线程等到执行结束自动停止,由于开发者并不一定清楚线程正在执行的业务,以及能不能停止,所以java设计者将线程停止的最高决定权交给了线程自身。

2. 如何正确停止线程

需要请求方、被停止方、子方法被调用方相互配合。

thread.interrupt发出一个请求,线程中能够响应中断的部分(能使线程阻塞的部分)去检查信号,收到信号后catch异常然后让线程执行结束。

最佳实践:

  • run()调用的方法应该将异常向上传递给run
  • 或者恢复被sleep()等方法清除的线程中断信号让run通过Thread.currentThread().interrupt()检查线程状态。

3. 响应中断的方法总结

4. 错误的停止方法

  1. stop()来停止,会导致程序运行一半突然停止,没办法保证一个原子性,造成脏数据。它会释放已锁定的所有监视器。suspend()挂起线程不会释放锁,可能会导致死锁。

  2. volatile设置值一个boolean 标志位。
    当线程在某个位置阻塞时这个标志位将不会被判断。线程持续处于阻塞状态。而interrupt方法会通知这些能够响应中断的阻塞去抛出InterruptedException异常让线程结束。

5.相关函数解析

  1. Thread.interrupted()方法
    静态方法,判断当前执行它的线程的状态,不关心是哪个线程调用它。
    返回boolean值,并且会清除线程状态(设置为false)

  2. thread1.isInterrupted()方法
    非静态方法,判断调用它的线程的状态,不会清除线程状态。

6.面试题

  1. 如何停止一个线程(线程可以响应中断时)?

第五节Thread类和Object类中的常用方法

1.方法概述

方法名 简介
Thread sleep() 本表格的所有方法都是指重载的方法
join() 等待其他线程执行完毕
yield() 放弃已经获取到的CPU资源
current Thread() 获取当前线程的引用
start()、run() 启动线程相关
interrtupt() 中断线程
stop()、suspend()、resume() 已废弃
Object wait()、notify()、notifyAll() 让线程暂时休息和唤醒

2.wait()、notify()、notifyAll()方法详解

作用

让一些线程休息和唤醒。

  • 首次需要获取锁。

  • 会释放锁。

  • notify唤醒任意一个线程

  1. 手写生产者消费者问题要求使用wait 、notify实现
  1. public class ProducerConsumerMode{
  2. public static void main(String[] args) {
  3. }
  4. }
  5. class Consumer implements Runnable{
  6. private EventStroage es = null;
  7. public Consumer(EventStroage es){
  8. this.es = es;
  9. }
  10. @Override
  11. public void run(){
  12. while(true){
  13. try {
  14. Thread.sleep(100);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. es.take();
  19. }
  20. }
  21. }
  22. class Producer implements Runnable{
  23. private EventStroage es = null;
  24. public Producer(EventStroage es){
  25. this.es = es;
  26. }
  27. @Override
  28. public void run(){
  29. while(true){
  30. try {
  31. Thread.sleep(100);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. es.put();
  36. }
  37. }
  38. }
  39. class EventStroage{
  40. private int maxSize;
  41. private LinkedList<Date> storage;
  42. public EventStroage(){
  43. maxSize = 10;
  44. storage = new LinkedList<>();
  45. }
  46. public synchronized void put(){
  47. if(maxSize >= storage.size()){
  48. Date date = new Date();
  49. storage.add(date);
  50. System.out.println(Thread.currentThread().getName() + "生产了产品" + date + ",仓库中现在有"+storage.size()+"个产品");
  51. this.notify();
  52. }else{
  53. try {
  54. this.wait();
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. public synchronized void take(){
  61. if(storage.size() > 0){
  62. Date date = storage.poll();
  63. System.out.println(Thread.currentThread().getName() + "消费了产品" + date + ",仓库中现在有"+storage.size()+"个产品");
  64. this.notify();
  65. }else{
  66. try {
  67. this.wait();
  68. } catch (InterruptedException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. }
  73. }
  1. 两个线程交替打印0~100的奇偶数
  1. 为什么wait() 需要在同步代码块里面使用,而sleep()不需要?

  2. 为什么线程通信的方法wait()、notify()、notifyAll()被定义在Object类里面,而sleep()定义在Thread里面?

  3. 调用Thread.wait会发生什么?

  4. 如何选择notify和notifyAll?

3.sleep 方法

作用:只想让线程在预期的时间内执行, 其他时间内不占用CPU资源。sleep不释放锁,休眠期被中断会响应异常。

  1. public void SleepDontReleaseLock implements Runnnable{
  2. private static final Lock lock = new ReentrantLock();
  3. @Override
  4. public void run(){
  5. lock.lock();
  6. //方法体
  7. try{
  8. Thread.sleep(1000);
  9. }catch(InterruptException e){
  10. e.printStackTrace();
  11. }
  12. lock.unlock();
  13. }
  14. }
  1. wait/notify 、sleep异同(属于那个对象?线程状态如何切换?)

4.join()方法

作用:因为新的线程加入了我们,所以我们要等他执行完毕再出发。

用法:main等待thread1执行完毕,注意谁等谁。

最普通的用法:

  1. public void JoinDemo{
  2. public static void main(String[] args){
  3. //Thread threadMain = Thread.currentThread();
  4. Thread thread = new Thread(new Runnable(){
  5. @Override
  6. public void run(){
  7. // threadMain.interrupt();
  8. //方法体
  9. }
  10. });
  11. thread.start();
  12. sout("开始等待子线程执行结束。");
  13. thread.join();
  14. //分析了join的底层原理,可以用以下三行代码来啊取代join
  15. // synchronized(thread){
  16. // thread.wait();
  17. // }
  18. sout("子线程执行完毕,继续执行后序部分。")
  19. }
  20. }

用法2:join让主线程进入WAITING状态,所以子线程可以通过threadMain.interrupt()信号让主线程中断阻塞状态结束等待。

join注意点:

  • 成熟的工具类CountDownLatchCyclicBarrier

join原理

  • 底层源码用wait()实现,每个线程的run()方法执行结束,就会执行一次类似于notify的操作来唤醒join

5. yield方法

作用:释放我的CPU时间片

定位:JVM不保证遵循,所以开发中一般不使用此方法。

第六节 线程安全与性能

1.线程安全

什么是线程安全?

什么情况下会出现线程安全问题?

两个线程同时访问a++时会出现线程安全问题

  • 死锁
    1. public class ThreadError implements Runnable{
    2. int flag = 0;
    3. static Object A = new Object();
    4. static Object B = new Object();
    5. public ThreadError(int flag){
    6. this.flag = flag;
    7. }
    8. @Override
    9. public void run(){
    10. if(flag == 1){
    11. synchronized(A){
    12. Tread.sleep(100);
    13. synchronized(B){
    14. //
    15. }
    16. }
    17. }else{
    18. synchronized(B){
    19. Tread.sleep(100);
    20. synchronized(A){
    21. //
    22. }
    23. }
    24. }
    25. }
    26. public static void main(String[] args){
    27. Thread thread1 = new (new ThreadError(1));
    28. Thread thread2 = new (new ThreadError(2));
    29. thread1.start();
    30. thread2.start();
    31. }
    32. }
  • 发布逸出(逃逸)
    方法返回一个私有的对象
    方法返回一个还未初始化的对象

  • 在构造函数中创建线程可能会导致异常
    如: 在构造函数中用线程初始化数据,在数据未初始化完毕的时候在外部去调用数据会导致空指针异常。

  • 访问共享的变量或者资源,会有并发的风险

2. 多线程的性能问题?

原因: 上下文切换、内存同步

  • 调度:上下文切换
    定义:保存线程现场,
    缓存开销:缓存失效
    解决: 设置最小的上下文切换周期。

第七节 java内存模型

1.什么是JMM?

  • JMM是一组规范,需要各个JVM的实现来遵守JMM规范,以便开发者可以利用这些规范更方便地开发多线程程序。如果没有这样的一个规范,那么很可能经过了不同JVM的重排序之后,导致不同虚拟机上运行的结果不一样。

  • volite 、synchronized、lock等的原理都是JMM

  • 如果没有JMM,那就需要我们自己指定什么时候用内存栅栏等。

重排序

什么是重排序:代码底层指令并不严格按照代码语句顺序执行就是发生了重排序。编译器、CPU都会优化指令而发生重排序。

好处

  • 提高处理速度

以上对比前后指令,优化了两行指令。

可见性

1.什么是可见性:线程读到的数据不一定是最新的。

解决:使用Volatile关键字修饰变量。

2.为什么会有可见性问题:

CPU结构存在多层缓存,会导致数据过期;每个核都有自己的独占缓存也因此导致了线程间数据共享的问题。

3.JMM如何解决可见性问题?

java作为高级语言屏蔽了CPU底层多级缓存的细节,抽象出了主内存和本地内存的概念。

主内存和本地内存是一个抽象上的概念。

4.Happens —Before原则

  • 什么是Happens —Before原则

  • Happens-Before规则有哪些?

    1. 单线程规则
  1. 说明: 单线程原则保证H-B原则
  1. 锁操作
  1. volatile 变量

……还有其他等等

5. volatile关键字

  • 什么是volatile?
    volatile是一种同步机制,比synchronize或者lock相关类更轻量

不适用volatile的场景:不适用a++操作原子性的保护

  • volatile的两点作用

原子性

1.什么是原子性?

定义:一系列操作,要么全部执行成功,要么全部执行不成功,不会出现执行一部分的情况,是不可分割的。(如ATM取款需要保证原子性)

  • 对一个没有用volatile修饰的long或double类型的写操作会被拆分成两次写,每次写该类型的32-bit数据,这就导致了在多线程的场景下,可能一个线程只看到了对这个64-bit数据类型写入的前32-bit数据。
  • 商用JVM已经保证了上述两操作的原子性,开发中不需要考虑。

常见面试问题

单例模式的八种写法

1、饿汉式(静态常量)[可用]

  1. //类在加载的过程中就完成了实例化,没有线程安全问题,但是提前加载浪费内存
  2. public class Singleton {
  3. private final static Singleton INSTANCE = new Singleton();
  4. private Singleton(){}
  5. public static Singleton getInstance(){
  6. return INSTANCE;
  7. }
  8. }

2、饿汉式(静态代码块)[可用]

  1. //与上述完全一致,只不过写法略有不同
  2. public class Singleton {
  3. private static final Singleton instance;
  4. static {
  5. instance = new Singleton();
  6. }
  7. private Singleton() {}
  8. public static Singleton getInstance() {
  9. return instance;
  10. }
  11. }

3、懒汉式(线程不安全)[不可用]

  1. //优点:懒汉式实现了懒加载,缺点:在多线程环境下有线程安全问题
  2. public class Singleton {
  3. private static Singleton singleton;
  4. private Singleton() {}
  5. public static Singleton getInstance() {
  6. //可能会有两个线程同时进入if
  7. if (singleton == null) {
  8. singleton = new Singleton();
  9. }
  10. return singleton;
  11. }
  12. }

4、懒汉式(线程安全,同步方法)[不推荐用]

  1. public class Singleton {
  2. private static Singleton singleton;
  3. private Singleton() {}
  4. //虽然解决了线程安全问题但是效率不高,每次获取都需要同步,但是实例化就只执行一次
  5. public static synchronized Singleton getInstance() {
  6. if (singleton == null) {
  7. singleton = new Singleton();
  8. }
  9. return singleton;
  10. }
  11. }

5、懒汉式(线程安全,同步代码块) [不可用]

  1. public class Singleton {
  2. private static Singleton singleton;
  3. private Singleton() {}
  4. //虽然解决了实例之后不需要再同步执行,但是没有解决线程安全问题
  5. public static Singleton getInstance() {
  6. if (singleton == null) {
  7. //可能有多个线程进入if
  8. synchronized (Singleton.class) {
  9. singleton = new Singleton();
  10. }
  11. }
  12. return singleton;
  13. }
  14. }

6、懒汉式(双重检查)[推荐用]

  1. public class Singleton {
  2. private static volatile Singleton singleton;//volatile关键字避免对象实例化过程重排序导致安全问题
  3. private Singleton() {}
  4. //双重检查实例化之后不需要再同步执行,并且实现了线程安全
  5. public static Singleton getInstance() {
  6. if (singleton == null) {//检查一
  7. synchronized (Singleton.class) {
  8. if (singleton == null) {//检查二
  9. singleton = new Singleton();
  10. }
  11. }
  12. }
  13. return singleton;
  14. }
  15. }

7、饿汉式(静态内部类)[推荐用]

  1. public class Singleton {
  2. //饿汉式的静态内部类实现了懒加载
  3. private Singleton() {}
  4. private static class SingletonInstance {
  5. private static final Singleton INSTANCE = new Singleton();
  6. }
  7. public static Singleton getInstance() {
  8. return SingletonInstance.INSTANCE;
  9. }
  10. }

8、枚举[推荐用](最优解)

  1. public enum Singleton {
  2. INSTANCE;
  3. public void whateverMethod() {
  4. }
  5. }

借助JDK1.5中添加的枚举来实现单例模式。不仅能避免多线程同步问题,而且还能防止反序列化重新创建新的对象。可能是因为枚举在JDK1.5中才添加,所以在实际项目开发中,很少见人这么写过。

第八节 死锁

死锁的产生于危害

1.什么是死锁?

死锁:两个或者多个线程之间互相持有对方所需求的资源又互不相让就造成了死锁。

2.什么是活锁?

活锁:任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试、失败、尝试、失败。在这期间线程状态会不停的改变。

3.什么是饥饿?

饥饿:一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行的状态。

原因:

1、高优先级线程吞噬所有的低优先级线程的 CPU 时间。

2、线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前 持续地对该同步块进行访问。

3、线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的 wait 方 法。

第二部分: JUC(Java并发工具类)

第一节 线程池

线程池的参数

  • 创建和销毁线程是有很大开销的,线程池可以复用线程避免了为每一个任务创建一个线程的资源浪费。而且系统的线程数是有上限的。
  • 实现了对资源的统一管理。

停止线程池的正确方法

  1. shutdown()方法
    仅仅是启动停止线程,不是立即停止,启动后线程池将拒绝新增任务抛出异常,继续执行完已经在任务队列里面的任务。

  2. IsShutdown()
    返回true表示已经执行了shutdown()方法

  3. isTerminated()
    判断是否线程池已经执行完毕所有任务并终止

  4. awaitTermination(time)
    判断线程在time时间内线程池是否执行结束,返回布尔值

  5. shutdownNow()
    通知正在执行的任务结束执行,队列中的任务不再被执行将被记录或者重新执行

线程池的原理、源码

第二节 ThreadLocal

两个作用:

  1. 让某个需要用到的对象在线程间隔离(每个线程有自己的独立对象)
  2. 在任何方法中都能轻松的获取到该对象

第三节 锁

1. lock锁

2.锁的分类

可重入锁

第四节 原子类

第五节 CAS原理

1.什么是CAS?

  • CAS(Compare And Swap):比较并交换

是一种用来实现线程安全的思想或算法,同时也是一种CPU指令,如Compare And Swap这一条指令实现了比较并交换这样一个组合操作,不会被打断。在多线程的情况下,各个代码的执行顺序是不能确定的,所以为了保证并发安全,我们可以使用互斥锁。而 CAS 的特点是避免使用互斥锁,当多个线程同时使用 CAS 更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试。CAS 被广泛应用在并发编程领域中,以实现那些不会被打断的数据交换操作,从而就实现了无锁的线程安全。

  • CAS工作原理

CAS有三个操作数:内存值V、预期值A、要修改的值B;当且仅当预期值A和内存值V 相同时,才将内存值修改为B,否则什么都不做。最后返回old Value值。

  • 等价代码
  1. /**
  2. *描述: 模拟CAS操作, 等价代码
  3. */
  4. public class SimulatedCAS{
  5. private volatile int value;
  6. public synchronized int compareAndSwap(int expectedValue, int newValue){
  7. int oldValue = value;
  8. if(oldValue == expectedValue){
  9. value = newValue;
  10. }
  11. return oldValue;
  12. }
  13. }

compare And Swap 方法是被 synchronized 修饰的,我们用同步方法为 CAS 的等价代码保证了原子性。volatile 保证了共享变量的可见性,让线程能够看到最新值。

2.应用场景

  • 乐观锁
  • 并发容器
  • 原子类

3.Java中是如何实现CAS原子操作的?

  • 通过 Unsafe 类中的native方法 compareAndSwapXXX 来实现的

Unsafe 提供了非常底层的,操作内存、线程的方法Unsafe 对象不能直接调用,只能通过反射获得

4.CAS 的特点

  • 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一,但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

为什么无锁的CAS效率高

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻
线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

5.CAS缺点

ABA问题

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的CAS就算失败,这时,仅比较值是不够的,需要再加一个版本号

自旋时间过长
由于单次 CAS 不一定能执行成功,所以 CAS 往往是配合着循环来实现的,有的时候甚至是死循环,不停地进行重试,直到线程竞争不激烈的时候,才能修改成功。可是如果我们的应用场景本身就是高并发的场景,就有可能导致 CAS 一直都操作不成功,这样的话,循环时间就会越来越长。而且在此期间,CPU 资源也是一直在被消耗的,这会对性能产生很大的影响。所以这就要求我们,要根据实际情况来选择是否使用 CAS,在高并发的场景下,通常 CAS 的效率是不高的。

范围不能灵活控制

通常我们去执行 CAS 的时候,是针对某一个,而不是多个共享变量的,这个变量可能是 Integer 类型,也有可能是 Long 类型、对象类型等等,但是我们不能针对多个共享变量同时进行 CAS 操作,因为这多个变量之间是独立的,简单的把原子操作组合到一起,并不具备原子性。

第六节 final 关键字

1、final关键字的作用

  • 修饰变量:防止变量被修改
    注:修饰对象变量时表示的是当前对象的引用不能被修改,对象的内容是可以被修改的。
    要注意被修饰的变量的赋值时机。

  • 修饰方法:防止方法被重写
    注: 构造方法不能被修饰为final的,静态方法不能被重写。static 方法时与类绑定的

  • 修饰类: 防止类被继承
    注:String 类被final修饰

注: final 是线程安全的,不需要同步开销

第七节 并发容器

1.过时的同步容器

  • Vector : 相当于线程安全的ArrayList

  • Hashtable: 线程安全的HashMap

注: 由于它们两个类里面有很多方法被Synchronized 修饰,所以导致了效率不高所以被弃用

同步的HashMapArrayList

  • 以上两者不是线程安全的类,但是可以用Collections.synchronizedlist(new ArrayList<E>())Collections.synchronizedMap(new HashMAp<K,V>())包装让它变成线程安全的。

  • 源码分析 ```java public static List synchronizedList(List list) {

    1. return (list instanceof RandomAccess ? //ArrayList 实现了随机访问(RandomAccess)标志
    2. new SynchronizedRandomAccessList<>(list) : //所以进入这个构造方法
    3. new SynchronizedList<>(list));

    } //—————————————————————————————————————- SynchronizedRandomAccessList(List list) {

    1. super(list); //传递给父类

    }

//—————————————————————————————————————— static class SynchronizedList //collections 中的一个静态内部类

  1. SynchronizedList(List<E> list) {
  2. super(list);//向上传递
  3. this.list = list;
  4. }

//——————————————————————————————————— static class SynchronizedCollection implements Collection, Serializable { SynchronizedCollection(Collection c) { this.c = Objects.requireNonNull(c);//传递给了C mutex = this;//锁对象 } public boolean add(E e) { synchronized (mutex) {return c.add(e);}//当前静态内部类同样适用synchronized来同步 }

  1. <a name="1ca3e6cd"></a>
  2. ### 2.比较不错的同步容器
  3. <a name="13b8622b"></a>
  4. #### 1.`ConcurrentHashMap`
  5. 1.
  6. Map 简介
  7. <br />典型实现类
  8. 2. 为什么需要`ConcurrentHasMap`
  9. -
  10. `HashMap`为什么不安全
  11. - 同时put**碰撞**导致数据丢失
  12. - 同时put**扩容**导致数据丢失
  13. - **死循环造成CPU 100%**
  14. > 存在于JDK7 ,原因:多个线程同时扩容,会造成链表的死循环
  15. > sun公司的解释:Hash Map是在非并发的情况使用的,并发情况下使用发生的问题不是问题是正常的。
  16. -
  17. 源码分析
  18. <br />
  19. - concurrent has map 组合操作并不保证线程安全性
  20. ```java
  21. /**
  22. * 描述: 演示ConcurrentHashMap的组合操作不是线程安全的以及替换方式
  23. */
  24. public class ConcurrentHashMapDemo implements Runnable{
  25. private static ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap();
  26. public static void main(String[] args) throws InterruptedException {
  27. map.put("小明",0);
  28. //在此启动两个线程
  29. }
  30. @Override
  31. public void run() {
  32. //1.线程不安全的使用方式
  33. // for(int i = 0; i < 1000; i++){
  34. // int score = map.get("小明");
  35. // score = score + 1;
  36. // map.put("小明",score);
  37. // }
  38. //2.加同步代码块的使用方式; 效率不高没有用到ConcurrentHashMap的性质
  39. // for(int i = 0; i < 1000; i++){
  40. // synchronized (ConcurrentHashMapDemo.class){
  41. // int score = map.get("小明");
  42. // score = score + 1;
  43. // map.put("小明",score);
  44. // }
  45. // }
  46. //3.正确的使用方式
  47. for(int i = 0; i < 1000; i++){
  48. while (true){
  49. int oldValue = map.get("小明");
  50. boolean flag = map.replace("小明",oldValue,oldValue+1);//操作失败返回false
  51. if(flag) { break; }
  52. }
  53. }
  54. }
  55. }

2.CopyOnWriteArrayList(写入时复制)

  • CopyOnWriteArrayList 是jdk1.5以后并发包中提供的一种并发容器,写操作通过创建底层数组的新副本来实现,是一种读写分离的并发策略,我们也成为“写时复制容器”,类似的容器还有 CopyOnWriteArraySet

  • 集合框架中的ArrayList是非线程安全的,Vector虽然是线程安全的,但是处理方式简单粗暴(synchronized),性能较差。而CopyOnWriteArrayList提供了不同的处理并发的思路。

  • 很多时候,我们系统中处理的都是读多写少的并发场景。CopyOnWriterArrayList允许并发的读,读操作是无锁的,性能较高。写操作的话,比如向容器增加一个元素,则首先将当前容器复制一份,然后在新副本上执行写操作,结束之后再将原容器的引用指向新容器。

优缺点分析

  • 优点:

读操作性能很高,因为无需任何同步措施,比较适用于读多写少的并发场景。Java 的 list 在遍历时,若中途有其他线程对容器进行修改,则会抛出ConcurrentModificationException 异常。而CopyOnWriteArrayList由于其“读写分离”的思想,遍历和修改操作分别作用在不同的 list容器,所以迭代的时候不会抛出 ConcurrentModificationExecption异常了。

  • 缺点:

缺点也很明显,一是内存占用问题,毕竟每次执行写操作都要将原容器拷贝一份,数据量大时,对内存压力较大,甚至可能引起频繁GC,二是无法保证实时性,Vector 对读写操作均加锁同步,可以保证容器的读写强一致性,CopyOnWriteArrayList由于其实现策略的原因,写和读分别作用于不容容器上,在写的过程中,读是不会发生阻塞的,未切换索引置新容器时,是读不到刚写入的数据的。

源码分析

  1. public boolean add(E e) {
  2. //加锁,对写操作保证线程安全
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. Object[] elements = getArray();
  7. int len = elements.length;
  8. //拷贝原容器,长度为原容器+1
  9. Object[] newElements = Arrays.copyOf(elements, len + 1);
  10. //在新副本执行添加操作
  11. newElements[len] = e;
  12. //底层数组指向新的数组
  13. setArray(newElements);
  14. return true;
  15. } finally {
  16. lock.unlock();
  17. }
  18. }
  19. 其中 底层数组定义如下:
  20. private transient volatile Object[] array;
  21. 增加内存可见性。
  1. public E get(int index) {
  2. return get(getArray(), index);//直接返回原数组的数据
  3. }
  4. final Object[] getArray() {
  5. return array;
  6. }

3.Queue

  • 并发队列关系图
  • BlockingQueue

对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:

1、抛出异常;

2、返回特殊值(null 或 true/false,取决于具体的操作);

3、阻塞等待此操作,直到这个操作成功;

4、阻塞等待此操作,直到成功或者超时指定时间。

put(e)take()这两个方法是带阻塞的。
BlockingQueue不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表失败,还是获取的值就是 null 值。

一个 BlockingQueue可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是Integer.MAX_VALUE(21亿多)。

  • ArrayBlockingQueue

BlockingQueue接口的有界队列实现类,底层采用数组来实现。

其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
ArrayBlockingQueue 共有以下几个属性:

  1. // 用于存放元素的数组
  2. final Object[] items;
  3. // 下一次读取操作的位置
  4. int takeIndex;
  5. // 下一次写入操作的位置
  6. int putIndex;
  7. // 队列中的元素数量
  8. int count;
  9. // 以下几个就是控制并发用的同步器
  10. final ReentrantLock lock;
  11. private final Condition notEmpty;
  12. private final Condition notFull;

第三部分 Java并发编程 - 图1

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。

对于ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:

1.队列容量,其限制了队列中最多允许的元素个数;
2.指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
3.可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

  • BlockingQueue 实现之LinkedBlockingQueue

底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。看构造方法:

  1. // 传说中的无界队列
  2. public LinkedBlockingQueue() {
  3. this(Integer.MAX_VALUE);
  4. }
  5. // 传说中的有界队列
  6. public LinkedBlockingQueue(int capacity) {
  7. if (capacity <= 0) throw new IllegalArgumentException();
  8. this.capacity = capacity;
  9. last = head = new Node<E>(null);
  10. }

我们看看这个类有哪些属性:

  1. // 队列容量
  2. private final int capacity;
  3. // 队列中的元素数量
  4. private final AtomicInteger count = new AtomicInteger(0);
  5. // 队头
  6. private transient Node<E> head;
  7. // 队尾
  8. private transient Node<E> last;
  9. // take, poll, peek 等读操作的方法需要获取到这个锁
  10. private final ReentrantLock takeLock = new ReentrantLock();
  11. // 如果读操作的时候队列是空的,那么等待 notEmpty 条件
  12. private final Condition notEmpty = takeLock.newCondition();
  13. // put, offer 等写操作的方法需要获取到这个锁
  14. private final ReentrantLock putLock = new ReentrantLock();
  15. // 如果写操作的时候队列是满的,那么等待 notFull 条件
  16. private final Condition notFull = putLock.newCondition();

这里用了两个锁,两个 Condition,简单介绍如下:

takeLocknotEmpty 怎么搭配:如果要获取(take)一个元素,需要获取takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。

putLock 需要和 notFull搭配:如果要插入(put)一个元素,需要获取putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。

首先,这里用一个示意图来看看LinkedBlockingQueue 的并发读写控制,然后再开始分析源码:
第三部分 Java并发编程 - 图2
看懂这个示意图,源码也就简单了,读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个就可以了。

先上构造方法:

  1. public LinkedBlockingQueue(int capacity) {
  2. if (capacity <= 0) throw new IllegalArgumentException();
  3. this.capacity = capacity;
  4. last = head = new Node<E>(null);
  5. }

注意,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也总是获取头节点后面的一个节点。count 的计数值不包括这个头节点。

  • BlockingQueue 实现之 SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 - - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

你不能在 SynchronousQueue 中使用 peek 方法(在这里这个方法直接返回 null),peek 方法的语义是只读取不移除,显然,这个方法的语义是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代的。虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的。当然,这个类也是不允许传递 null 值的(并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)。

下面,再说说前面说的公平模式和非公平模式的区别。

相信大家心里面已经有了公平模式的工作流程的概念了,我就简单说说 TransferStack 的算法,就不分析源码了。

1.当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而栈中的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对。然后返回相应的元素,或者如果被取消了的话,返回 null。
2.如果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节点进行匹配,然后将这两个节点出栈。配对和出栈的动作其实也不是必须的,因为下面的一条会执行同样的事情。
3.如果栈顶是进行匹配而入栈的节点,帮助其进行匹配并出栈,然后再继续操作。

  • BlockingQueue 实现之PriorityBlockingQueue

带排序的BlockingQueue 实现,其并发控制采用的是ReentrantLock,队列为无界队列(ArrayBlockingQueue是有界队列,LinkedBlockingQueue也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是PriorityQueue的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

它的源码相对比较简单,本节将介绍其核心源码部分。

我们来看看它有哪些属性:

  1. // 构造方法中,如果不指定大小的话,默认大小为 11
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  3. // 数组的最大容量
  4. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  5. // 这个就是存放数据的数组
  6. private transient Object[] queue;
  7. // 队列当前大小
  8. private transient int size;
  9. // 大小比较器,如果按照自然序排序,那么此属性可设置为 null
  10. private transient Comparator<? super E> comparator;
  11. // 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
  12. private final ReentrantLock lock;
  13. // 这个很好理解,其实例由上面的 lock 属性创建
  14. private final Condition notEmpty;
  15. // 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
  16. // 其使用 CAS 操作
  17. private transient volatile int allocationSpinLock;
  18. // 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
  19. private PriorityQueue q;

此类实现了 Collection 和 Iterator 接口中的所有接口方法,对其对象进行迭代并遍历时,不能保证有序性。如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。PriorityBlockingQueue提供了drainTo 方法用于将部分或全部元素有序地填充(准确说是转移,会删除原队列中的元素)到另一个集合中。还有一个需要说明的是,如果两个对象的优先级相同(compare 方法返回 0),此队列并不保证它们之间的顺序。

PriorityBlockingQueue使用了基于数组的二叉堆来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。

总结

ArrayBlockingQueue底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。

LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。

SynchronousQueue本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。

PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。

第八节、 控制并发流程

1. What?

  • 控制并发流程的工具类,作用就是帮助我们程序员更容易得让线程之间合作

  • 工具类

2.详解

1.CountDownLatch

  • 可以理解为一个倒数门阀

  • 主要方法

    • CountDownLatch( int count): 仅有一个构造函数,参数count是要倒数的数值
    • await(): 调用await()方法的线程会被挂起,等待直到count值为0才继续执行。
    • countDown(): 将count值减1,直到为0时,等待的线程会被唤醒。
  • 工作示意图
  • 代码演示
  1. /**
  2. * 演示CountDownLatch的两种用法
  3. * 描述: 用CountDownLatch模拟5名运动员一声令下共同起跑,到达终点比赛结束
  4. */
  5. public class CountDownLatchDemo {
  6. public static void main(String[] args) throws InterruptedException {
  7. CountDownLatch begin = new CountDownLatch(1);
  8. CountDownLatch end = new CountDownLatch(5);
  9. ExecutorService service = Executors.newFixedThreadPool(5);
  10. for(int i = 1; i <= 5; i++){
  11. final int no = i;
  12. Runnable runnable = new Runnable() {
  13. @Override
  14. public void run() {
  15. System.out.println("No."+no+"开始等待,准备起跑。");
  16. try {
  17. begin.await();
  18. System.out.println("No."+no+"等带结束,起跑");
  19. Thread.sleep((long)(Math.random()*10000));
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } finally {
  23. System.out.println("No."+no+"到达终点。");
  24. end.countDown();
  25. }
  26. }
  27. };
  28. service.execute(runnable);
  29. }
  30. System.out.println("准备完毕开始等待。");
  31. Thread.sleep(500);
  32. System.out.println("等带结束,开始起跑。");
  33. begin.countDown();
  34. end.await();
  35. System.out.println("比赛结束。");
  36. service.shutdown();
  37. }
  38. }
  • 注意点 : 用法可以是1 等多,或者多等1;不能够重用

2.Semaphore:信号量

  • 用许可证来控制或限制有限的资源的使用。
  • 使用流程

    • new Semaphore(int permits,boolean fair)初始化许可证数量和公平策略(true 表示Semaphore 会把之前等待的线程放到FIFO队列里,以便让等的时间最长的线程优先获得许可证)
    • 使用acquire() 或者 acquireUninterruptibly()方法获取许可证(前者可以响应中断,后者不可以响应中断)
    • 执行结束release()释放许可证
  • 注意点

3. Condition接口

作用

第九节、AQS(AbstractQueue)原理

什么是AQS

全称 AbstractQueuedSynchronizer,它是一个框架,为同步状态的原子性管理线程的阻塞和唤醒以及队列模型提供一种通用机制。

JAVA中的同步器(ReentrantLockCountDownLatchSemphore等等)都基于他所构建的

为什么要学

  • 理解各类同步器是怎么实现的,理解并发
  • 理解这个框架的设计思路和方法,可以学到一些抽象的思维
  • 变的更强

基本功能

AQS定义了一个同步器至少包含两种方法

  • acquire:阻塞线程,直到同步状态允许其继续执行
  • release:释放线程,通过某种方式改变同步状态,使得一或多个被Acquire的线程继续执行

j.u.c包中并没有对同步器的API做一个统一的定义。有一些类定义了通用的接口(如Lock),而另外一些则定义了其专有的版本。因此在不同的类中,以acquire和release操作的名字和形式会各有不同。 例如:Lock.lockSemaphore.acquireCountDownLatch.awaitFutureTask.get,在这个框架里,这些方法都是acquire操作

基于约定,每一个同步器还要实现以下的功能

  • 阻塞和非阻塞的尝试(例如tryLock)
  • 可选的超时设置,让调用者可以放弃等待
  • 通过中断实现的任务取消

为了使框架能得到广泛应用,要支持以下两种模式的同步器

  • 独占式 exclusive:在同一时间只有一个线程可以通过阻塞点
  • 共享式 shared:允许多个线程通过阻塞点

例如ReentrantLock是使用独占式模式实现的,而CountDownLatch用的是共享式。

设计

同步器背后的基本思想非常简单。

acquire操作伪代码如下:

  1. while (synchronization state does not allow acquire) {
  2. enqueue current thread if not already queued;
  3. possibly block current thread;
  4. }
  5. dequeue current thread if it was queued;
  6. 复制代码

翻译如下:

  1. while (当同步状态不允许获取的时候) {
  2. if(该线程没有入队){
  3. 入队
  4. }
  5. 阻塞当前线程(可能)
  6. }
  7. 将当前线程出队(如果入队)
  8. 复制代码

release操作位伪代码如下:

  1. update synchronization state;
  2. if (state may permit a blocked thread to acquire)
  3. unblock one or more queued threads;
  4. 复制代码

翻译如下:

  1. 更新线程的同步状态
  2. if(状态允许一个阻塞的线程去获取){
  3. 释放一个或者多个在入列的线程
  4. }
  5. 复制代码

为了实现上面的acquirerelease操作,需要下面这三个组件相互协作

  • 同步状态的原子性管理
  • 线程的阻塞与解除阻塞
  • 队列的管理

创建一个框架分别实现这三个组件是有可能的。但是,这会让整个框架既难用又没效率。例如:存储在队列节点的信息必须与解除阻塞所需要的信息一致,而暴露出的方法的签名必须依赖于同步状态的特性。

所以AQS的核心其实是为以上三个组件提供一个具体的实现

下面我们来聊一下这个具体的实现

实现

同步状态

AQS类使用单个int(32位)来保存同步状态,并暴露出getStatesetState以及compareAndSet操作来读取和更新这个状态。这些方法都依赖于j.u.c.atomic包的支持。

这个包提供了volatile在读和写上的语义,并且通过使用本地的compare-and-swapload-linked/store-conditional指令来实现compareAndSetState,使得仅当同步状态拥有一个期望值的时候,才会被原子地设置成新值。这个也就是我们常说的CAS操作

基于AQS的具体实现类必须根据暴露出的状态相关的方法定义tryAcquiretryRelease方法,以控制acquirerelease操作。

当同步状态满足时,tryAcquire方法必须返回true

而当新的同步状态允许后续acquire时,tryRelease方法也必须返回true。

这些方法都接受一个int类型的参数用于传递想要的状态。

这个参数主要用来实现不同子类功能的,例如ReentrantLock使用该参数去操作线程的同步状态实现了重入的计数

阻塞

AQS没有采用Thread.suspendThread.resume这两种方式,以上两种方式都有严重的安全问题,例如容易造成死锁等。

AQS采用了j.u.c包下的LockSupport类。该类可以响应中断操作,可以设置超时时间等。

队列

整个框架的关键就是如何管理被阻塞的线程的队列,该队列是严格的FIFO队列,因此,框架不支持基于优先级的同步。

AQS的锁策略采用的CLH而不是MCS,原因是CLH要比MCS更适合处理取消和超时。

因此我们选择了CLH锁作为实现的基础。但是最终的设计已经与原来的CLH锁有较大的出入。


这里简单介绍一下CLH

CLH队列实际上并不那么像队列,因为它的入队和出队操作都与它的用途(即用作锁)紧密相关。它是一个链表队列,通过两个字段headtail来存取,这两个字段是可原子更新的,两者在初始化时都指向了一个空节点。

第三部分 Java并发编程 - 图3

一个新的节点,node,通过一个原子操作入队:

  1. do {
  2. pred = tail;
  3. } while(!tail.compareAndSet(pred, node));
  4. 复制代码

每一个节点的“释放”状态都保存在其前驱节点中。因此,自旋锁的“自旋”操作就如下:

  1. while (pred.status != RELEASED); // spin
  2. 复制代码

自旋后的出队操作只需将head字段指向刚刚得到锁的节点:

  1. head = node;
  2. 复制代码

使用CLH锁有以下优点

  • 入队和出队操作是快速、无锁的,以及无障碍的(即使在竞争下,某个线程总会赢得一次插入机会而能继续执行)
  • 判断是否有线程正在等待也很快(测试一下head是否与tail相等)
  • “释放”状态是分散的(几乎每个节点都保存了这个状态,当前节点保存了其前驱节点的“释放”状态,因此它们是分散的,不是集中于一块的),避免了一些不必要的内存竞争。

为了将CLH队列用于阻塞式同步器,AQS做出了以下改进:

  • 给每一个节点增加next

在自旋锁中,一个节点只需要改变其状态,下一次自旋中其后继节点就能注意到这个改变,所以节点间的链接并不是必须的

但在阻塞式同步器中,一个节点需要显式地唤醒(unpark)其后继节点

所以AQS增加了节点node访问其后继节点的next

由于AQS队列是双向队列,所以CAS操作也没有很好的方式对两个方向都做到完全的原子性更新。后继结点的更新就采用了下面的简单赋值

  1. pred.next = node;
  2. 复制代码

next链接仅是一种优化。如果通过某个节点的next字段发现其后继结点不存在(或看似被取消了),总是可以使用pred字段从尾部开始向前遍历来检查是否真的有后续节点

  • 每个节点都有自己的状态字段用于控制阻塞而非自旋

论文这里作者用了很大的篇幅去写节点状态位的东西,我简单的归纳成两个问题:

  • 一个released状态位够不够?
  • 如果不够,还要哪些?加这些状态位有什么好处?

问题1解答:只有一个released位是不够的,AQS还需要当一个活动线程在头结点时候仅调用tryAcquire

在同步器框架中,仅在线程调用具体子类中的tryAcquire方法返回true时,队列中的线程才能从acquire操作中返回

单个“released”位是不够的,还需要确保一个活动的线程仅在队列的头部,调用tryAcquire方法,这时的acquire可能会失败,然后(重新)阻塞

这时候不需要一个前驱的状态去判断是否阻塞,直接可以判断这个前驱的节点是不是头部,不像自旋锁需要内存复制的竞争

但是取消状态还是要读前驱的状态

这个节点的状态还可以避免不必要的park和unpark,虽然这些方法跟阻塞原语一样快,但在跨越Java和JVM以及操作系统边界时仍有可避免的开销。

在调用park前,线程设置一个“唤醒(signal)”位,然后再一次检查同步和节点状态。一个释放的线程会清空其自身状态,这样线程就不必频繁地尝试阻塞。

  • 依赖JVM回收节点内存,这就避免了一些复杂性和开销

AQS主要使用在出队的时候置null方式回收节点内存,这可以有效的避免复杂的处理和瓶颈。

抛开这些细节,基本的acquire操作的最终实现的一般形式如下

  1. if(!tryAcquire(arg)) {
  2. node = create and enqueue new node;
  3. pred = node's effective predecessor;
  4. while (pred is not head node || !tryAcquire(arg)) {
  5. if (pred's signal bit is set)
  6. park()
  7. else
  8. compareAndSet pred's signal bit to true;
  9. pred = node's effective predecessor;
  10. }
  11. head = node;
  12. }
  13. 复制代码

翻译如下

  1. if (!tryAcquire(arg)) {
  2. node = 创建队列并且新入队节点;
  3. pred = 节点的有效前驱节点;
  4. while (pred 不是头节点 || !tryAcquire(arg)) {
  5. if (pred的状态位是Signal信号)
  6. park();
  7. else
  8. CAS操作设置predSignal信号;
  9. pred = node节点的有效前驱节点;
  10. }
  11. head = node;
  12. }
  13. 复制代码

release的操作如下

  1. if(tryRelease(arg) && head node's signal bit is set) {
  2. compareAndSet head's bit to false;
  3. unpark head's successor, if one exist
  4. }
  5. 复制代码

翻译如下

  1. release {
  2. if (tryRelease(arg) && 头节点的状态是Signal) {
  3. 将头节点的状态设置为不是Signal;
  4. 如果头节点的后继结点存在,则将其唤醒。
  5. }
  6. }
  7. 复制代码

acquire操作的主循环次数依赖于具体实现类中tryAcquire的实现方式。

另一方面,在没有“取消”操作的情况下,每一个组件的acquirerelease都是一个O(1)的操作(忽略park中发生的所有操作系统线程调度)

第十节 、 Future和Callable

Runnable

  1. public abstract void run();
  • 不能返回值;因为Runnable接口中定义了run方法的返回值为void

  • run 方法无法抛出checked Exception,同样是由于接口中run 方法没有异常的抛出

思考: 为什么run方法如此设计?

如果抛出异常或者返回值,接受者是Thread类,如何处理将不再是由我们规定。

Callable

  1. V call() throws Exception;
  • 弥补Runnable的不足

Future

  • Future 核心思想: 一个方法的计算可能比较耗时,主线程不需要原地等待,让子线程去执行;主线程需要结果的时候future会去获取结果;、

Future和Callable的关系

1. Future中的方法