为何需要多线程

一个运行的程序往往需要三个模块的协调运行:CPU,内存,IO设备。众所周知,这三者的速度差异是较大的,所以为了平衡差异,操作系统、计算机体系结构、编译程序都采取了相关措施:

  • CPU增加了缓存,以均衡与内存的速度差异。导致了 可见性 问题。
  • 操作系统增加了进程、线程的概念,分时复用CPU,均衡CPU与IO设备的速度差异。导致了 原子性 问题。
  • 编译程序优化指令执行的顺序,使缓存能够更合理地利用。导致了 有序性 的问题。

Java中线程的6种状态

通过 Thread.State 的源码可以看到,线程分为6个状态:

状态 解释 例子
NEW 刚刚创建完尚未启动的线程。
RUNNABLE 包括操作系统中的RUNNING
READY
两种概念,表示当前线程是可执行的。
BLOCKED 阻塞态,等待一个排他锁,这个事件将在另一个线程放弃这个锁的时候发生。所以此状态是多线程竞争锁失败时的状态。 程序等待进入同步块的时候
TIMED_WAITING 限期等待,不会分配处理机,在一定时间后由系统自动唤醒 Thread::sleep
,设置了Timeout参数的Thread::wait
WAITING 无限等待,不会分配处理机,需要由其他线程显示唤醒 Thread::join
, Object::wait
TERMINATED 线程结束。
  1. /**
  2. * 高内聚资源
  3. * 所有资源只由自己操作
  4. */
  5. class Ticket {
  6. private int number = 30;
  7. private Lock lock = new ReentrantLock();
  8. // 对外暴露接口
  9. public void sale() {
  10. lock.lock();
  11. try { // 加锁后,此块代码只能由一个线程同步访问
  12. if (number > 0) {
  13. System.out.println(Thread.currentThread().getName()
  14. + " sold the" + number + "th ticket, left" + number);
  15. number--;
  16. }
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. }
  24. /**
  25. *
  26. * 三个售票员 卖出 30张票
  27. * 如何编写企业级 高内聚低耦合 的多线程代码
  28. * 固定的编程套路+模板是什么
  29. *
  30. * 1. 在高内聚低耦合的前提下 线程 操作 资源类
  31. * 先创建一个资源类
  32. * 2. 不要再 Thread t = new Thread()啦,要与时俱进使用java8的新特性
  33. * 直接lambda表达式
  34. */
  35. public class SaleTicketDemo {
  36. public static void main(String[] args) { // 主线程,main只是程序的入口,业务逻辑写外边
  37. testSafe();
  38. }
  39. private static void testSafe() {
  40. // 创建资源类
  41. Ticket tickets = new Ticket();
  42. // 线程操作资源类
  43. new Thread(() -> { for (int i = 0; i < 40; ++i) tickets.sale(); }, "A").start();
  44. new Thread(() -> { for (int i = 0; i < 40; ++i) tickets.sale(); }, "B").start();
  45. new Thread(() -> { for (int i = 0; i < 40; ++i) tickets.sale(); }, "C").start();
  46. }
  47. }

设计规范

原则:高内聚低耦合,线程操作资源类

就是将资源类高度封装,所有对资源的直接操作都由资源类自己内部操作,而尽量不依赖外部操作,对外只暴露方法接口。业务逻辑部分由多个线程来分别操作资源。

生产者消费者模型

线程操作资源类

多个线程操作资源类,一部分线程作为生产者生产资源;一部分线程作为消费者消费资源;且对于资源的数量是有约束的。

使用四个线程来操作一个数字,两个线程用来增加数字,两个线程用来减少数字,要求数字的范围约束在0和1,每个线程都操作10轮。

题目中的数字就是资源,加数的线程就是生产者,减数的线程就是消费者。由于要约束资源的数量,所以我们需要线程之间的相互通信

JUC - 图1

Java的线程库中有两种方式:

  • 比较老的版本,synchroninzed关键字锁住资源,线程访问的时候使用循环判断,若不能操作则wait()等待,注意wait()的等待时线程交出资源的控制权,而sleep()的等待是单纯阻塞,不会交出资源控制权;若能操作,则操作完之后用notifyAll()通知唤醒其余线程。
  • 比较新的版本,使用Lock类来加锁。从官方的API可以知道Lock类是比synchronized更加强大的新技术。其线程之间通信的API也有更新:
    • 不再使用Object类提供的wait() notify()方法,使用Condition类的await() signal()方法。

JUC - 图2

总的原则

  • 线程操作资源类。
  • 判断 -> 操作 -> 通知。
  • 多线程交互,防止线程的虚假唤醒,使用循环判断
  1. class NumSource {
  2. private int number = 0;
  3. Lock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. public void increment() {
  6. lock.lock();
  7. try {
  8. while (number != 0) {
  9. condition.await();
  10. }
  11. number++;
  12. condition.signalAll();
  13. System.out.println(Thread.currentThread().getName() + "\t" + number);
  14. } catch (Exception e) { e.printStackTrace(); } finally {
  15. lock.unlock();
  16. }
  17. }
  18. public void decrement() {
  19. lock.lock();
  20. try {
  21. while (number == 0) {
  22. condition.await();
  23. }
  24. number--;
  25. condition.signalAll();
  26. System.out.println(Thread.currentThread().getName() + "\t" + number);
  27. } catch (Exception e) { e.printStackTrace(); } finally {
  28. lock.unlock();
  29. }
  30. }
  31. }
  32. public class WaitNotifyDemo {
  33. public static void main(String[] args) {
  34. NumSource src = new NumSource();
  35. new Thread(() -> { for (int i = 0; i < 10; ++i) src.increment(); }, "A").start();
  36. new Thread(() -> { for (int i = 0; i < 10; ++i) src.decrement(); }, "B").start();
  37. new Thread(() -> { for (int i = 0; i < 10; ++i) src.increment(); }, "C").start();
  38. new Thread(() -> { for (int i = 0; i < 10; ++i) src.decrement(); }, "D").start();
  39. }
  40. }

精确通知

新API中LockCondition的结合使用相比起synchronized,最大的不同就在于其能够支持线程之间的精确通知(即同步),即A线程执行完后可以指定下一个执行的线程必须为B,增强了线程操作的可控性

多个线程之间按照顺序调用,实现 A->B->C。三个线程的启动要求如下:AA打印5次,BB打印10次,CC打印15次;AA打印5次,BB打印10次,CC打印15次……一共来10轮。

多线程之间的精确通知可以通过一个标志位来辅助实现,基本原则:

  • 线程操作资源类
  • 判断、操作、通知
  • 多线程交互,防止虚假唤醒,循环判断
  • 标志位修改和定位
  1. /**
  2. * Lock主要和Condition结合使用
  3. * 精准通知,精准唤醒
  4. */
  5. class ConditionSource {
  6. private Lock lock = new ReentrantLock();
  7. private Condition condition1 = lock.newCondition(); // 每一个条件服务一个线程
  8. private Condition condition2 = lock.newCondition();
  9. private Condition condition3 = lock.newCondition();
  10. private int flag = 1; // 标志位
  11. // 精确唤醒,实现同步
  12. public void Asay() {
  13. lock.lock();
  14. try {
  15. while (flag != 1) {
  16. condition1.await();
  17. }
  18. for(int i = 1; i <= 5; ++i)
  19. System.out.println(Thread.currentThread().getName() + "\t" + i);
  20. flag = 2;
  21. condition2.signal();
  22. } catch (Exception e) { e.printStackTrace(); } finally {
  23. lock.unlock();
  24. }
  25. }
  26. public void Bsay() {
  27. lock.lock();
  28. try {
  29. while (flag != 2) {
  30. condition2.await();
  31. }
  32. for(int i = 1; i <= 10; ++i)
  33. System.out.println(Thread.currentThread().getName() + "\t" + i);
  34. flag = 3;
  35. condition3.signal();
  36. } catch (Exception e) { e.printStackTrace(); } finally {
  37. lock.unlock();
  38. }
  39. }
  40. public void Csay() {
  41. lock.lock();
  42. try {
  43. while (flag != 3) {
  44. condition3.await();
  45. }
  46. for(int i = 1; i <= 15; ++i)
  47. System.out.println(Thread.currentThread().getName() + "\t" + i);
  48. flag = 1;
  49. condition1.signal();
  50. } catch (Exception e) { e.printStackTrace(); } finally {
  51. lock.unlock();
  52. }
  53. }
  54. }
  55. public class ConditionDemo {
  56. public static void main(String[] args) {
  57. ConditionSource source = new ConditionSource();
  58. for (int i = 0; i < 10; ++i) new Thread(source::Asay, "A").start();
  59. for (int i = 0; i < 10; ++i) new Thread(source::Bsay, "B").start();
  60. for (int i = 0; i < 10; ++i) new Thread(source::Csay, "C").start();
  61. }
  62. }

获得多线程的方式

实现Runnable接口

  1. public class MyRunnable implements Runnable {
  2. @Override
  3. public void run() {
  4. }
  5. }
  6. public static void main(String[] args) {
  7. MyRunnable inst = new MyRunnable();
  8. Thread thread = new Thread(inst);
  9. thread.start(); // 创建一个线程并启动
  10. }

继承Thread类

  1. public class MyThread extends Thread {
  2. @Override
  3. public void run() {
  4. }
  5. }
  6. public static void main(String[] args) {
  7. MyThread th = new MyThread();
  8. th.start();
  9. }

实现Callable接口并用FutureTask包装

  1. class MyCall implements Callable<Integer> {
  2. @Override
  3. public Integer call() throws XXXException {
  4. return XXX;
  5. }
  6. }
  7. public class CallableDemo {
  8. public test() throws XXXException {
  9. FutureTask<Integer> task = new FutureTask<>(new MyCall());
  10. new Thread(task).start();
  11. task.get(); // 阻塞,等待线程执行完后的返回值
  12. }
  13. }

注意点:

  • .start()创建一个线程执行run()方法;
  • .run()只是调用run()方法。

JMM

Java内存模型是一种多线程访问共享变量的规范,屏蔽了各种硬件和操作系统内存访问的差异,以实现Java在各个平台下都能达到一致的内存访问效果。规范要求保证可见性、原子性、有序性
image.png

  • 可见性问题,一是由于 CPU、缓存、内存等存储结构的速度差异所造成;二是由于线程工作在自己的私有工作内存中,使得某一个线程对某个变量的更新不能够立即又其他线程看见。
  • 原子性问题,是由于多线程/多进程情况下分时复用CPU所造成;
  • 有序性问题,是编译器尝试优化程序而对指令重排列所造成。

JMM 规范内容

  1. 所有变量都存储在主内存中。
  2. 每一个线程都有一个私有的工作内存,工作内存存储了线程要读写的共享变量副本。
  3. 线程不能够直接操作主内存,必须通过拷贝到私有工作内存的方式来进行操作。
  4. 不同线程的私有内存相互之间不可见。

JUC - 图4
以上就是 Java 内存模型的基本内容,具体采用了 8 种操作来进行实现。

操作 作用对象 效果

1. lock
主内存的共享变量 将变量锁定,为某一个线程占有

2. unlock
主内存的共享变量 解除锁定,可以被多个线程来访问

3. read
主内存的共享变量 主存变量拷贝到线程工作内存

4. load
线程工作内存的变量 读取工作内存的变量值到变量中

5. use
线程工作内存的变量 将工作内存的值传递给执行引擎

6. assign
线程工作内存的变量 执行引擎中的值赋值给工作内存变量

7. store
线程工作内存的变量 工作内存的变量传到主内存

8. write
主内存的共享变量 主内存将工作内存传入的值赋值到对应的变量中

image.png

volatile 实现原理

image.png
image.png
image.png

可见性

volatile 底层实际上就是 MESI 协议来实现共享变量的可见性:CPU 监听总线,一旦发现共享变量发生改变,则将私有内存的拷贝失效,下一次使用的时候就是从主内存得到的最新值。
image.png
给变量加上 volatile 关键字,会在修改变量的时候多一个 lock 前缀指令,保证变量变动后立即执行写回到主内存操作(store write),在 store 的时候通过总线,其他 CPU 能够感知到变量的变化。

可以发现这个加锁的点在 store、write,这样子保证了其他 CPU 读主存的时候,最新的变量已经 write 到了主存。

与 synchronized 进一步比较,synchronized 相当于把锁加在了 read 开始,所以整个过程包括 use、assign 等都被一个线程独占,必然保证了可见性、原子性,但是锁的粒度太大,若 use 是一个耗时操作则大大降低了整体的并发度;相比之下,volatile 在只锁住了 store write,大大减小了锁的粒度,所以 volatile 是一个轻量级锁,同时解决了并发与性能问题!

有序性

happens-before 是一系列重排序的原则,让 CPU 在不影响单线程执行结果的前提下,重排序指令来最大限度发挥机器性能。as-if-serial 也是类似,要求在单线程下 CPU 对指令重排序后最终的执行结果,要和顺序执行是一样的。

happens-before 原则的例子,比如 volatile 变量完全写入内存必须在下一次读之前完成;同步代码块不能与其前后的语句调换顺序;存在依赖关系的指令不能调换顺序。

在单线程条件下,依赖以上原则能够保证结果的正确,多线程环境下则会出现问题。所以需要使用 volatile 来禁止指令重排序。

volatile VS synchronized

synchronized 由于锁的粒度从 read 开始,所以能够完全保证原子性、可见性、有序性;volatile 不能保证原子性
image.png
之前与 synchronized 比较的过程中其实已经提到过,volatile 仅仅锁住了 store write 这个过程,所以 use 等修改操作可能会丢失。比如,两个 CPU 同时修改变量 num,其中一个 CPU 先修改完成并进行 store write,则另一个 CPU 检测到这个 num 变化后就将自己工作内存的 num 失效,导致了修改的丢失。

所以,对于这种情况,应该进一步拓大锁的范围,使用 synchronized 或其他手段在 read 的时候就进行锁定。

  1. public class Test {
  2. private volatile int flag = 0;
  3. public void incr() {
  4. // 由于++不是原子操作,多线程操作时,修改可能会丢失
  5. flag++;
  6. }
  7. public void safeAssign() {
  8. // 赋值是原子操作,多线程操作是安全的
  9. flag = 2;
  10. }
  11. public synchronized void safeIncr1() {
  12. // synchronized 从 read 开始就加锁,保证修改不会丢失
  13. flag += 1;
  14. }
  15. }

创建对象重排序问题

DCL单例模式

  1. class Singleton {
  2. private static volatile Singleton instance = null;
  3. public Singleton getInstance() {
  4. if (instance == null) {
  5. synchronized(Singleton.class) {
  6. if (instance == null)
  7. instance = new Singleton();
  8. }
  9. }
  10. return instance;
  11. }
  12. }

在这个 DCL 单例中如果 instance 对象不使用 volatile 修饰,则存在指令重排序的风险。一个对象在创建的时候分为三步:

  1. instance = new XXXX();
  2. ==> 伪代码
  3. memory = allocate(); // 1. 内存中分配对象空间
  4. instance_init(memory); // 2. 初始化对象
  5. instance = memory; // 3. 获取对象的引用,接着instance != null

可以发现步骤2和步骤3是不存在数据依赖关系的,所以是有可能调换顺序的,即先获取引用后初始化对象,这样就会有操作部分初始化对象的风险!也就是说,另外一个线程可能引用到一个部分初始化的 singleton 对象。

所以需要增加 volatile,这样就使得 instance = new Singleton() 这条语句不会重排序,最终完整地初始完对象后才返回引用。

CAS

比较并交换,是一种操作系统的原语。原子包装类中的许多操作底层就是通过unsafe类调用操作系统cas原语。

这个原语的基本逻辑就是:通过内存寻址获取工作内存变量,比较当前工作内存的值是否与主内存中的值相同,相同则说明变量没有被其他线程修改,可以进行操作;否则不进行修改,再循环来判断直至成功。

  • var1:工作内存中的值;
  • var2:内存偏移量;
  • var4:expect;
  • var5:update;
  1. public final native boolean compareAndSwapInt(Object val, long offset, int expect, int update);

问题缺陷

  1. 更改失败,变成自旋锁一直循环。
  2. 出现ABA问题,即值被修改过,过程中出现过变化,但最初值和当前值一样。由于cas只进行值比较,所以不能够检测出中间过程的变化
    解决方法:使用带有版本号的原子引用创建对象,AtomicStampedReference, 这样再cas的过程中就不仅比较值而且比较对象的当前修改版本。

    1. public void test() {
    2. AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 1);
    3. new Thread(() -> {
    4. int stamp = ref.getStamp();
    5. ref.compareAndSet(100, 101, ref.getStamp(), ref.getStamp() + 1); // AB
    6. ref.compareAndSet(101, 100, ref.getStamp(), ref.getStamp() + 1); // BA
    7. }).start();
    8. int stamp = ref.getStamp();
    9. new Thread(() -> {
    10. System.out.println("获取版本号\t" + stamp);
    11. // 。。。sleep for a while
    12. ref.compareAndSet(100, 111, stamp, stamp + 1);
    13. }).start();
    14. }

线程的同步机制

synchronize和ReentrantLock

  1. 构成:
    synchronized是JVM提供的同步机制关键字底层是管程对象来管理
    ReentranrLock则是API层面的一个Lock接口实现类
  2. 使用:
    synchronized一切由底层的管程控制;
    ReentantLock需要用户主动释放锁,否则可能会出现阻塞、死锁等状况。
  3. 中断:
    synchronized不可中断,除非抛出异常或者运行完成;
    ReentrantLock可以中断, trylock(timeout, unit), lock.lockInterruptibly()
  4. 公平:
    • synchronized非公平;
    • ReentrantLock默认非公平,但可以设置参数改变。
  5. 通知:
    • synchronized只能够随机通知;``
    • ReentrantLock可以和Condition接口结合使用,实现精确通知

使用选择

除非要使用ReentrantLock的一些高级功能,否则优先使用synchronized。因为synchronized是JVM实现的一种锁机制,JVM原生支持,而且由于用户对线程的可控度更小,意味着使用起来更加简单,误操作的概率更小。

并发中的集合类

List

线程不安全:ArrayList, LinkedList

故障现象:ConcurrentModificationException,并发修改异常

解决方案

  1. Vector: 内部方法加了重锁synchronized,不推荐使用
  2. Conllections.synchronizedList(new ArrayList):内部方法内使用synchronized封锁代码块。
  3. CopyOnWriteArrayList,写时复制,并发读。写操作使用lock接口锁住代码块:将共享数组拷贝一份,在副本上进行修改之后将修改后的数组设置为共享数组。

JUC - 图11

Map

线程不安全:HashMap

解决方案

  1. Collections.synchronizedMap() (使用synchronized在方法内部进行代码块封锁)
  2. ConcurrentHashMap (实现基本和HashMap一致,在修改操作部分使用synchronized来封锁代码)

Set

线程不安全:HashSet

解决方案

  1. Collections.synchronizedSet(),底层其实就是CopyOnWriteArrayList。
  2. CopyOnWriteArraySet。

BlockingQueue接口

实现类有:

  1. ArrayBlockingQueue:底层Object数组。
  2. LinkedBlockingQueue:链表结构的有界阻塞队列(默认大小Integer.MAX_VALUE)。
  3. PriorityBlockingQueue.
  4. DelayQueue.
  5. SynchronizedQueue:不存储元素的阻塞队列,每一个take操作要等待一个put操作,反之亦然。可以用于实现线程间的某个变量的传递。
  6. LinkedTransferQueue.
  7. LinkedBlockingDeque:链表结构的双向阻塞队列。

阻塞队列很好地表现出了生产者消费者的模型,队列的大小就是资源的的数目上界。其内部的底层就是利用Lock和Condition实现的,已经设计好了线程的操作时的阻塞唤醒,不再需要设计者控制这些线程的细节。最好使用设置超时类型

方法类型 抛出异常 特殊值 阻塞 有限阻塞,特殊值
插入 add(e) offer(e)->false put(e) offer(e,time,unit)
删除 remove() poll()->null take() poll(time,unit)
访问 element() peek() / /

JUC - 图12

手动实现

① 使用Lock和condition(jdk源码的实现方法)

  1. public class MyBlockingQueue1<T> {
  2. private Queue<T> queue;
  3. private final int capacity;
  4. private int size;
  5. private Lock lock = new ReentrantLock();
  6. private Condition notFull = lock.newCondition();
  7. private Condition notEmpty = lock.newCondition();
  8. public MyBlockingQueue1(int capacity) {
  9. this.capacity = capacity;
  10. queue = new LinkedList<>();
  11. }
  12. public void put(T data) {
  13. lock.lock();
  14. try {
  15. while (size == capacity) {
  16. notFull.await();
  17. }
  18. queue.add(data);
  19. size++;
  20. notEmpty.signal();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. } finally {
  24. lock.unlock();
  25. }
  26. }
  27. public T take() {
  28. T e = null;
  29. lock.lock();
  30. try {
  31. while (size == 0) {
  32. notEmpty.await();
  33. }
  34. e = queue.remove();
  35. size--;
  36. notFull.signal();
  37. } catch (InterruptedException interruptedException) {
  38. interruptedException.printStackTrace();
  39. } finally {
  40. lock.unlock();
  41. }
  42. return e;
  43. }
  44. }

② 使用信号量Semaphore实现 最简洁的方式,因为临界的判断已经封装在Semaphore内部,只需要在初始化信号量的时候指定阈值即可。

  1. public class MyBlockingQueue2<T> {
  2. private final Semaphore empty;
  3. private final Semaphore full;
  4. private final Semaphore mutex = new Semaphore(1);
  5. private Queue<T> queue = new LinkedList<>();
  6. public MyBlockingQueue2(int capacity) {
  7. this.full = new Semaphore(0); // 初始可以取的内容
  8. this.empty = new Semaphore(capacity); // 初始的可供选择的空位置
  9. }
  10. public void put(T data) {
  11. try {
  12. empty.acquire(); // 先同步后互斥,防止死锁
  13. mutex.acquire();
  14. queue.add(data);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } finally {
  18. mutex.release();
  19. full.release();
  20. }
  21. }
  22. public T take() {
  23. T t = null;
  24. try {
  25. full.acquire();
  26. mutex.acquire();
  27. t = queue.remove();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. } finally {
  31. mutex.release();
  32. empty.release();
  33. }
  34. return t;
  35. }
  36. }

③ 最传统的方式,使用synchronized关键字

  1. public class MyBlockingQueue3<T> {
  2. private final int capacity;
  3. private int size = 0;
  4. private Queue<T> queue = new LinkedList<>();
  5. public MyBlockingQueue3(int capacity) {
  6. this.capacity = capacity;
  7. }
  8. public void put(T data) {
  9. synchronized (this) {
  10. try {
  11. while (size == capacity) {
  12. this.wait();
  13. }
  14. queue.add(data);
  15. size++;
  16. this.notifyAll();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. public T take() {
  23. T t = null;
  24. synchronized (this) {
  25. try {
  26. while (size == 0) {
  27. this.wait();
  28. }
  29. t = queue.remove();
  30. size--;
  31. this.notifyAll();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. return t;
  37. }
  38. }

lambda

Stream

Java8之后的一个重要新特性就是链式编程、流式计算。stream是一种数据的渠道,有些类似管道将每一段处理的结果往后传,最终获得一个新的结果。

基本的使用套路就是:

  1. 创建一个Stream:一个数据源(数组、集合)。
  2. 中间计算操作:处理数据源。
  3. 终止操作:一个终止操作,执行中间操作链,产生最终的结果。

四大函数接口

lambda用于简化实现函数式接口,函数式接口需要保证其中的方法无二义性,即只能有一个未定义的方法,可以有多个staticdefault方法。

Java中将常用的接口的类型高度封装成了四大类型:

类型 泛型
Consumer 消费性接口 输入参数类型,无返回值。
Supplier 供给型接口 无输入参数,有返回值类型。
Function 操作型接口 T输入参数类型,R返回值类型。
Predicate 判断型接口 T输入参数类型,boolean返回值类型

线程池

为什么用

在普通情况下,一个线程的使用过程基本就是创建、使用、销毁,但是当我们使用的线程较多的时候就会有较多的时间消耗在创建和销毁这两个对工作无用的过程,同时过多的线程会使系统不堪负重,所以引入了线程池的概念,来增加线程的复用性

优势作用

线程池的工作就是维护一定运行线程的数量,接受到任务后,从池中派出空闲的线程去处理,处理完后不销毁线程而是留着之后复用。如果请求的线程数超出了限定的数量,超出的线程排队等待,等其他线程执行完任务后,在从阻塞队列中取出任务来执行。

主要的特点就是:

  1. 降低系统资源消耗。复用线程,减少创建和销毁的消耗。
  2. 提高响应速度。任务到达时,不需要再等待线程的创建,而是直接使用已经创建好的线程。
  3. 提高可管理性,控制最大并发数。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性。线程池则可以将限定数量的多个线程进行统一的管理分配。

如何使用

架构说明

Java中的线程池基于Executor接口,进一步基于ExecutorService接口。最终线程池用到的就是ThreadPoolExecutor类。

JUC - 图13

三种常用实现

利用线程的工具类Executors来实现创建线程池。

  1. 固定长度的线程池。

    1. Executors.newFixedThreadPool(5); // 一池5个线程
  2. 一个池一个线程。

    1. Executors.newSingleThreadExecutor(); // 一个池1个线程,一个任务一个任务地执行
  3. 可伸缩的线程池。

    1. Executors.newCachedThreadPool(); // 可伸缩的线程池

以上三种的底层本质都是ThreadPoolExecutor

JUC - 图14

JUC - 图15

JUC - 图16

线程池7个重要参数

  1. corePoolSize: 线程池中的常驻核心线程。
  2. maximumPoolSize: 池中最大线程数,即并发上限。
  3. keepAlive: 拓容后,多于常驻数量的空闲线程的最大存活时间,用于控制线程池的伸缩。最终线程的数量维护在corePoolSize的水平。
  4. unit: 计量的时间单位。
  5. workQueue: 等待执行的任务阻塞队列。
  6. threadFactory: 创建线程池中线程的工厂,一般使用默认的即可。
  7. handler: 当池子和阻塞队列都已经满后,对于新的请求的拒绝策略。默认为AbortPolicy,即中断程序。

JUC - 图17

设置合理参数

  1. 线程池的最大容量。一般对于CPU密集型的系统,最大容量设置在CPU核数+1左右。因为线程就是处理的分配单位,过多的线程会使得机器忙不过来。
  2. 拒绝策略。拒绝策略作为ThreadPoolExecutor的内部类有四种
    • AbortPolicy: 直接抛出RejectedExecutionException异常,终止程序运行。
    • CallerRunsPolicy: 一种向上一级返回任务的处理机制,若当前任务处理不过来,不抛出异常,而是将某些任务回退给调用者处理,从而降低新任务的流量。

JUC - 图18

  • DiscardOldestPoicy: 直接抛弃阻塞队列中等待得最久的任务,然后把当前任务加入阻塞队列中。长江后浪推前浪,前浪死在沙滩上。
  • DiscardPolicy: 默默丢弃无法处理的任务,无异常抛出。如果运行任务可丢失,这是最好的一种策略

线程池底层工作原理

JUC - 图19

step1:主线程通过execute()方法提交任务请求给线程池,线程池优先使用corePool中的核心常驻线程来处理任务。(corePool中的线程是随着任务的提交逐个建立起来的)

step2:若核心线程池满了,将多余的任务请求放入阻塞队列

step3:若阻塞队列也满了,立即拓容到最大容量,先处理阻塞队列中的任务。

step4:拓展到最大容量后仍然满,开始启用饱和拒绝策略,不再接受新任务。

step5:拓容后,空闲的线程存活时间达到keepAliveTime,则将其销毁,逐渐收缩回核心池的大小。

线程池的配置

  1. CPU密集型:任务需要大量的运算,基本没有阻塞,所以最大池容量可设为(CPU核数+1)。
  2. IO密集型:任务需要大量的IO,意味着大量的阻塞,所以尽可能配置更多的线程,一个参考配置是(CPU核数/1-阻塞系数)

小注意点

  • shutdown(): 停止接收任务,并且会完成已经接收的任务。

AQS

AbstractQueuedSynchronizer 抽象队列同步器,将多线程情况下基本的获取锁、释放锁的行为封装,供构建其他同步器组件时复用。

内置双向队列来完成线程获取资源的排队机制,使用一个volatile int变量state来表示资源的当前状态。

  • state=0,表示资源没有被占用;
  • state=1,表示资源被占用。

CountDownLatch

倒计数

用来控制一个或者多个线程等待多个线程。维护了一个计数器cnt,每一次调用countDown()方法,计数器值减1,当减到0时,await之后的代码就可以执行。

  1. CountDownLatch countDownLatch = new CountDownLatch(6);
  2. for (int i = 0; i < 6; i++) {
  3. int finalI = i;
  4. new Thread(() -> {
  5. System.out.println(finalI + " get out");
  6. countDownLatch.countDown(); // 计数器减一
  7. }).start();
  8. }
  9. try {
  10. countDownLatch.await(); // 阻塞之后的代码,当计数器<=0时放行
  11. System.out.println("count down finish");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }

CyclicBarrier

正计数

控制多个线程相互等待,当多个线程都同时到达时,之后的任务才继续执行。

  1. public class CyclicBarrierDemo {
  2. public static void main(String[] args) {
  3. CyclicBarrier barrier = new CyclicBarrier(5);
  4. ExecutorService executorService = Executors.newCachedThreadPool();
  5. for (int i = 0; i < 5; i++) {
  6. executorService.execute(() -> {
  7. System.out.println("before...");
  8. try {
  9. barrier.await();
  10. } catch (InterruptedException | BrokenBarrierException e) {
  11. e.printStackTrace();
  12. }
  13. System.out.println("after barrier await");
  14. });
  15. }
  16. executorService.shutdown();
  17. }
  18. }

Semaphore

信号量的Java封装版本,初始化的时候指定阈值即可,api 有 acquire release

Java锁

公平锁、非公平锁

公平锁就是满足先来后到地获取锁,维护一个等待队列;

非公平锁允许插队抢锁,抢锁失败才排到队尾 ==> 效率更高,但会出现饿死。

ReentrantLock默认就是非公平锁,synchronized也是非公平锁。

可重入锁(递归锁)

锁的粒度是线程,而不是调用,所以一个线程可以进入任何一个它拥有的锁同步的代码块。这样做的好处是可以减少死锁的出现

  1. class Test {
  2. public void test() {
  3. synchronized (this) { // 第一次申请锁之后,monitor会维护一个计数器
  4. ...
  5. synchronized (this) { ... }
  6. }
  7. }
  8. public void test2() {
  9. synchronized (this) {
  10. test();
  11. }
  12. }
  13. Lock lock = new ReentrantLock();
  14. public void sendMS(int count) {
  15. if (count == 4) return;
  16. lock.lock();
  17. try {
  18. sendMS(count + 1);
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. }

自旋锁

自旋锁就是不阻塞而循环尝试获取锁,循环的时候占用着处理机,使得处理机的利用率下降,但是减少了线程上下文切换的开销。其本质就是while和CAS

  1. // 手写自旋锁
  2. // 本质 while + CAS,锁的粒度为线程
  3. public class SpinLock {
  4. AtomicReference<Thread> threadRef = new AtomicReference<>();
  5. public void lock() {
  6. Thread thread = Thread.currentThread();
  7. while (!threadRef.compareAndSet(null, thread)) {
  8. // 自旋
  9. }
  10. }
  11. // 解锁不需要while
  12. // 需要解锁说明已经获得了锁,否则就是非法操作
  13. public void unlock() {
  14. Thread thread = Thread.currentThread();
  15. threadRef.compareAndSet(thread, null);
  16. }
  17. }

独占锁、共享锁

独占锁(互斥锁)指该锁一次只能被一个线程持有,ReentrantLock和synchronized都是独占锁。

共享锁指一个锁能够被多个线程共同持有。

加锁的原本目的就是保证在高并发的情况下数据的一致性,然而读操作并不会修改数据,所以读操作往往使用共享锁;写操作会改变数据,所以写操作需要独占锁。故读读无需互斥,可以提高并发性。juc包中提供的ReentrantReadWriteLock中的读锁就是共享锁,写锁就是独占锁。

死锁处理

排查

死锁是指多个线程在占有着锁/资源的同时又去申请另一个线程占有的锁/资源,导致互相等待。排查方式可以使用jdk提供的命令

  1. $ jps -l # 查看当前jvm运行的进程,定位进程号
  2. $ jstack [pid] # 通过进程id查看进程的栈情况,找到死锁查看

JUC - 图20

JUC - 图21

  1. public class DeadLockDemo {
  2. public void acquireLock(String lock1, String lock2) {
  3. synchronized (lock1) {
  4. try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
  5. synchronized (lock2) { ... }
  6. }
  7. }
  8. public static void main(String[] args) {
  9. String lock1 = "lock1";
  10. String lock2 = "lock2";
  11. DeadLockDemo source = new DeadLockDemo();
  12. new Thread(() -> { source.acquireLock(lock1, lock2); }).start();
  13. new Thread(() -> { source.acquireLock(lock2, lock1); }).start();
  14. }
  15. }

处理

处理没什么好说的,重启吧。

虚拟机内部锁优化

我们知道一个JVM提供的一个同步关键字synchronized,其底层是一个管程进行锁访问计数:

  • monitorenter尝试获取锁成功则计数器+1。
    • 第一次获取该锁,则线程持有该锁对象。
    • 已经持有该锁,则直接访问。
  • monitorexit释放锁,则计数器-1。一旦计数器的值=0,则该锁对象就背释放了。

但从执行成本来看,获取锁是一个重量级的操作。因为Java的线程本质上是映射到操作系统的原生内核线程之上的,所以如果要对线程进行操作,底层实际上是要申请操作系统来进行管理,即涉及到系统调用接着就是用户态和核心态的转换,这种状态的转换对处理器的资源开销是很大的。

然而,有时候同步块执行时间并不长,所以这种对互斥量的申请开销就显得更加大了。

自旋锁、自适应自旋

自旋忙等开销线程切换开销的权衡。

针对上述提到的问题:在许多应用上,共享数据的锁定状态持续时间很短。那么,为了这段时间而进行线程的挂起、恢复就非常不值得,所以可以使用让线程自旋忙等一会的方式,不放弃处理机的时间进行短时间的等待。

这就是对自旋忙等开销与线程切换开销的一种权衡。一旦自旋的次数超过一定范围,再恢复进行传统的方式挂起线程。

所谓的自适应自旋就是在自旋次数上的一种调配,根据之前成功访问锁定状态的自旋次数,来合理分配当前次数。例如,若上一次的自旋很快就成功进入临界区,那么本次的自旋也就认为能够比较快成功,故设定的等待时间就会更加长。

锁消除

锁消除就是指虚拟机编译器在运行时,对于一些代码要求同步,但是对被检测到不可能存在共享数据竞争的锁进行消除。这个判断依据主要源于逃逸分析的数据支持,若发现在堆上的所有数据都不会逃逸出去被其他线程访问,则可以去掉同步措施只有对象线程逃逸时,才需要保留同步措施

锁粗化

锁粗化的意思就是:如果一系列的连续操作都是对一个对象反复加锁、解锁,且对象的作用域限定在这一系列操作范围内,通过扩大加锁同步的范围来减小连续加锁、减锁的开销

  1. public String concatString(String s1, String s2, String s3) {
  2. StringBuffer b = new StringBuffer();
  3. b.append(s1);
  4. b.append(s2);
  5. b.append(s3);
  6. return b.toString();
  7. }

典型的例子就是拼接字符串,append操作内部就是一个互斥同步的过程,所以这一系列连续的append操作就相当于连续进行同步互斥,这么频繁的互斥同步操作导致了不必要的开销。所以虚拟机检测到这种情况,就会把加锁同步的范围拓展到整个操作序列外部

但是有一个重要的前提就是b的作用域是限定在这一系列操作范围内的,外部不会再对b进行引用。

  1. 锁定 {
  2. b.append(s1)
  3. b.append(s2)
  4. b.append(s3)
  5. }

偏向锁

当前锁对象认为只会有一个线程来访问它。

线程只在最初始的时候通过CAS修改对象偏向的线程ID,之后访问都只需要比对ID,消除了所有同步操作。没有第二个线程来,就一直不会释放偏向锁。一旦有另一个线程来临,就会检查当前偏向锁的持有线程是否执行完,执行完则对象恢复到无锁状态重新偏向,否则偏向锁升级为轻量级锁

轻量级锁

没有多线程,就不同步

线程通过CAS来获取一个锁对象,通过CAS来释放锁对象。在执行同步代码块的时候,一律都不进行同步互斥的操作。

  • 一旦另一条线程自旋忙等超过一定时间,则锁升级。
  • 一旦有两条以上的线程竞争同一个锁对象,那么轻量级锁就要升级为重量级锁,即进行同步互斥操作,调用操作系统的服务阻塞竞争失败的线程。

所谓的获取锁对象就是将对象的Mark Word更新为指向线程私有栈中的Lock Record字段的指针。

JUC - 图22

锁升级

偏向锁,轻量级锁都是乐观锁,所谓乐观就是认为不存在竞争或者是竞争很小,通过自旋就能够解决; 重量级锁是悲观锁,认为竞争有必要让操作系统来处理,阻塞竞争失败的线程。

JUC - 图23

偏向锁 -> 轻量级锁

一个对象刚开始实例化的时候,没有任何线程来访问它,它是可偏向的,认为只可能有一个线程来访问它。那么。当第一个线程来访问它的时候,线程通过CAS修改对象头的锁状态、ThreadID为自己的ID,之后该线程就可以通过ID来顺利访问该对象,虚拟机不再进行任何同步操作(加锁、解锁、修改Mark Word)。此时的对象就偏向了这个线程

因为偏向锁不会主动释放,所以第二个线程来访问此对象的时候就会发现偏向锁不是偏向自己,就会检查该锁的持有线程是否还在同步块中工作:

  • 如果不在,则可以将对象变为无锁状态,然后重新偏向新的线程;
  • 否则,存在竞争,修改Mark Word升级为轻量级锁,进行自旋等待。

JUC - 图24

轻量级锁 -> 重量级锁

轻量级锁认为竞争存在,但是竞争的程度很轻,一个线程可以通过自旋忙等来稍微等待另一个线程完成任务。 但是当自旋超过一定的次数;或者有第三个线程参与竞争,轻量级锁升级为重量级锁。重量级锁就开始调用操作系统的服务,阻塞竞争失败的线程。

JUC - 图25

锁升级带来的好处

能够不使用同步互斥措施就不使用,实在需要一定措施的话就尽量在用户态解决同步问题,减少操作系统级别的同步互斥的开销。