JUC笔记

读写锁:一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享

一.JUC概述

1.juc简介:

在 Java 中,线程部分是一个重点,本篇文章说的 JUC 也是关于线程的。JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5 开始出现的。

2.线程和进程

进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程— —资源分配的最小单位。

线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个 单元执行流。线程——程序执行的最小单位。

3.线程的状态

public enum State {

  • NEW,(新建)
  • RUNNABLE,(准备就绪)
  • BLOCKED,(阻塞)//这种状态是指一个阻塞线程在等待monitor锁。
  • WAITING,(不见不散)//一个线程在等待另一个线程执行一个动作时在这个状态
  • TIMED_WAITING,(过时不候)//一个线程在一个特定的等待时间内等待另一个线程完成一个动作会在这个状态
  • TERMINATED;(终结)

}

waiting和timed_waiting的区别:

当线程调用以下方法时会进入WAITING状态:

  1. Object#wait() 而且不加超时参数
  2. Thread#join() 而且不加超时参数
  3. LockSupport#park()

调用了以下方法的线程会进入TIMED_WAITING

  1. Thread#sleep()
  2. Object#wait() 并加了超时参数
  3. Thread#join() 并加了超时参数
  4. LockSupport#parkNanos()
  5. LockSupport#parkUntil()

4.wait/sleep 的区别

(1)sleep 是 Thread 的静态方法,wait 是 Object 的方法,任何对象实例都 能调用。

(2)sleep 不会释放锁,它也不需要占用锁。wait 会释放锁,但调用它的前提 是当前线程占有锁(即代码要在 synchronized 中)

(3)它们都可以被 interrupted 方法中断。

5.管程

管程(monitor)是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同 一时刻只被一个进程调用(由编译器实现).但是这样并不能保证进程以设计的顺序执行 。

JVM 中同步是基于进入和退出管程(monitor)对象实现的,每个对象都会有一个管程 (monitor)对象,管程(monitor)会随着 java 对象一同创建和销毁

执行线程首先要持有管程对象,然后才能执行方法,当方法完成之后会释放管程,方 法在执行时候会持有管程,其他线程无法再获取同一个管程

6.用户线程和守护线程

用户线程:平时用到的普通线程,自定义线程

守护线程:运行在后台,是一种特殊的线程,比如垃圾回收

当主线程结束后,用户线程还在运行,JVM 存活

如果没有用户线程,都是守护线程,JVM 结束

  1. //isDaemon false代表用户线程,true代表守护线程
  2. System.out.println(Thread.currentThread().getName()+"::"+Thread.currentThread().isDaemon());

二.Lock接口

1 .Synchronized 关键字回顾

synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:

  1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象;
  2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用 的对象是调用这个方法的对象;
  3. 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的 所有对象;
  4. 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主 的对象是这个类的所有对象。

虽然可以使用 synchronized 来定义方法,但 synchronized 并不属于方法定 义的一部分,因此,synchronized 关键字不能被继承。如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。当然,还可以在子类方法中调用父类中相应的方 法,这样虽然子类中的方法不是同步的,但子类调用了父类的同步方法,因此, 子类的方法也就相当于同步了。

2.售票案例

```java package com.zjl.test.estest.JUCTest;

class Ticket { //100张票 private int num = 100;

public synchronized void sale() { if (num > 0) { System.out.println(Thread.currentThread().getName() + “:卖出” + (num—) + “,剩余:” + num); } } }

public class SaleTicket {

public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(()->{ for (int i = 0; i <100 ; i++) { ticket.sale(); }

  1. },"AA").start();
  2. new Thread(()->{
  3. for (int i = 0; i <100 ; i++) {
  4. ticket.sale();
  5. }
  6. },"BB").start();
  7. new Thread(()->{
  8. for (int i = 0; i <100 ; i++) {
  9. ticket.sale();
  10. }
  11. },"CC").start();

} } //解释和缺陷 如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执 行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里 获取锁的线程释放锁只会有两种情况: 1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有; 2)线程执行发生异常,此时 JVM 会让线程自动释放锁。 那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一 下,这多么影响程序执行效率。 因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等 待一定的时间或者能够响应中断),通过 Lock 就可以办到。

  1. <a name="c1ebb3cc"></a>
  2. ### 3.Lock概述
  3. > Lock 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允 许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对 象。Lock 提供了比 synchronized 更多的功能。
  4. > Lock 与的 Synchronized 区别
  5. > • Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内 置特性。Lock 是一个类,通过这个类可以实现同步访问;
  6. > • Lock 和 synchronized 有一点非常大的不同,采用 synchronized 不需要用户 去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后, 系统会自动让线程释放对锁的占用;而 Lock 则必须要用户去手动释放锁,如 果没有主动释放锁,就有可能导致出现死锁现象。
  7. > public interface Lock {
  8. > void lock();
  9. > void lockInterruptibly() throws InterruptedException;
  10. > boolean tryLock();
  11. > boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  12. > void unlock();
  13. > Condition newCondition();
  14. > }
  15. <a name="e2fd3062"></a>
  16. ### 4.lock方法概述
  17. > lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他 线程获取,则进行等待。
  18. > 采用 Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一 般来说,使用 Lock 必须在 try{}catch{}块中进行,并且将释放锁的操作放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用 Lock 来进行同步的话,是以下面这种形式去使用的:
  19. > Lock lock = ...;
  20. > lock.lock();
  21. > try{
  22. >

//处理任务

  1. > }catch(Exception ex){ }
  2. > finally{
  3. > lock.unlock(); //释放锁
  4. > }
  5. > //使用lock关键字完成买票
  6. > ```java
  7. package com.zjl.test.estest.JUCTest.lock;
  8. import java.util.concurrent.locks.ReentrantLock;
  9. class Ticket {
  10. private int num = 100;
  11. private final ReentrantLock lock=new ReentrantLock();
  12. public void sale() {
  13. lock.lock();
  14. try {
  15. if (num > 0) {
  16. System.out.println(Thread.currentThread().getName() + ":卖出" + (num--) + ",剩余:" + num);
  17. }
  18. }finally {
  19. lock.unlock();
  20. }
  21. }
  22. }
  23. public class LockSaleTicket {
  24. public static void main(String[] args) {
  25. Ticket ticket = new Ticket();
  26. new Thread(()->{
  27. for (int i = 0; i <100 ; i++) {
  28. ticket.sale();
  29. }
  30. },"AA").start();
  31. new Thread(()->{
  32. for (int i = 0; i <100 ; i++) {
  33. ticket.sale();
  34. }
  35. },"BB").start();
  36. new Thread(()->{
  37. for (int i = 0; i <100 ; i++) {
  38. ticket.sale();
  39. }
  40. },"CC").start();
  41. }
  42. }

5.newCondition

关键字 synchronized 与 wait()/notify()这两个方法一起使用可以实现等待/通 知模式, Lock 锁的 newContition()方法返回 Condition 对象,Condition 类 也可以实现等待/通知模式。

用 notify()通知时,JVM 会随机唤醒某个等待的线程, 使用 Condition 类可以 进行选择性通知, Condition 比较常用的两个方法:

  1. await()会使当前线程等待,同时会释放锁,当其他线程调用 signal()时,线程会重 新获得锁并继续执行。
  2. signal()用于唤醒一个等待的线程。

注意:在调用 Condition 的 await()/signal()方法前,也需要线程持有相关 的 Lock 锁,调用 await()后线程会释放这个锁,在 singal()调用后会从当前 Condition 对象的等待队列中,唤醒 一个线程,唤醒的线程尝试获得锁, 一旦 获得锁成功就继续执行。

6.线程间的通信

a.使用Synchronized

  1. package com.zjl.test.estest.JUCTest.Sync;
  2. class Share{
  3. //创建资源类
  4. //资源类里,判断干活通知
  5. private int num=0;
  6. public synchronized void incr() throws InterruptedException {
  7. while (num!=0) {
  8. this.wait();
  9. }
  10. num++;
  11. System.out.println(Thread.currentThread().getName()+"::"+num);
  12. this.notifyAll();
  13. }
  14. public synchronized void decr() throws InterruptedException {
  15. while (num!=1) {
  16. this.wait();
  17. }
  18. num--;
  19. System.out.println(Thread.currentThread().getName()+"::"+num);
  20. this.notifyAll();
  21. }
  22. }
  23. public class JUCThread2 {
  24. public static void main(String[] args) {
  25. Share share = new Share();
  26. new Thread(()->{
  27. for (int i = 0; i <10 ; i++) {
  28. try {
  29. share.incr();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. },"AA").start();
  35. new Thread(()->{
  36. for (int i = 0; i <10 ; i++) {
  37. try {
  38. share.decr();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. },"BB").start();
  44. // new Thread(()->{
  45. // for (int i = 0; i <10 ; i++) {
  46. // try {
  47. // share.incr();
  48. // } catch (InterruptedException e) {
  49. // e.printStackTrace();
  50. // }
  51. // }
  52. // },"CC").start();
  53. // new Thread(()->{
  54. // for (int i = 0; i <10 ; i++) {
  55. // try {
  56. // share.decr();
  57. // } catch (InterruptedException e) {
  58. // e.printStackTrace();
  59. // }
  60. // }
  61. // },"DD").start();
  62. }
  63. }
  64. //虚假唤醒,如果多个线程操作(>2)不是while判断会出现虚假唤醒

b.使用lock

  1. package com.zjl.test.estest.JUCTest.lock;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. class Share{
  6. //创建资源类
  7. //资源类里,判断干活通知
  8. private int num=0;
  9. private Lock lock=new ReentrantLock();
  10. private Condition condition=lock.newCondition();
  11. public void incr() throws InterruptedException {
  12. lock.lock();
  13. try {
  14. while (num!=0) {
  15. condition.await();
  16. }
  17. num++;
  18. System.out.println(Thread.currentThread().getName()+"::"+num);
  19. condition.signalAll();
  20. }finally {
  21. lock.unlock();
  22. }
  23. }
  24. public void decr() throws InterruptedException {
  25. lock.lock();
  26. try {
  27. while (num!=1) {
  28. condition.await();
  29. }
  30. num--;
  31. System.out.println(Thread.currentThread().getName()+"::"+num);
  32. condition.signalAll();
  33. }finally {
  34. lock.unlock();
  35. }
  36. }
  37. }
  38. public class LockThread2 {
  39. public static void main(String[] args) {
  40. Share share = new Share();
  41. new Thread(()->{
  42. for (int i = 0; i <10 ; i++) {
  43. try {
  44. share.incr();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. },"AA").start();
  50. new Thread(()->{
  51. for (int i = 0; i <10 ; i++) {
  52. try {
  53. share.decr();
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. },"BB").start();
  59. new Thread(()->{
  60. for (int i = 0; i <10 ; i++) {
  61. try {
  62. share.incr();
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. },"CC").start();
  68. new Thread(()->{
  69. for (int i = 0; i <10 ; i++) {
  70. try {
  71. share.decr();
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. },"DD").start();
  77. }
  78. }

7.线程间的定制化通信

需求:启动三个线程,按照如下要求打印:AA打印5次,BB打印10次,CC打印15次,进行10轮

  1. package com.zjl.test.estest.JUCTest.lock;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. //创建一个资源类
  6. class ShareResource{
  7. private int flag=1;
  8. private Lock lock=new ReentrantLock();
  9. private Condition condition1=lock.newCondition();
  10. private Condition condition2=lock.newCondition();
  11. private Condition condition3=lock.newCondition();
  12. public void print5(int loop){
  13. lock.lock();
  14. try {
  15. while(flag!=1){
  16. condition1.await();
  17. }
  18. for (int i = 1; i <=5 ; i++) {
  19. System.out.println(Thread.currentThread().getName()+"::"+i+":轮数:"+loop);
  20. }
  21. flag=2;
  22. condition2.signal();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. } finally {
  26. lock.unlock();
  27. }
  28. }
  29. public void print10(int loop){
  30. lock.lock();
  31. try {
  32. while(flag!=2){
  33. condition2.await();
  34. }
  35. for (int i = 1; i <=10 ; i++) {
  36. System.out.println(Thread.currentThread().getName()+"::"+i+":轮数:"+loop);
  37. }
  38. flag=3;
  39. condition3.signal();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. public void print15(int loop){
  47. lock.lock();
  48. try {
  49. while(flag!=3){
  50. condition3.await();
  51. }
  52. for (int i = 1; i <=15 ; i++) {
  53. System.out.println(Thread.currentThread().getName()+"::"+i+":轮数:"+loop);
  54. }
  55. flag=1;
  56. condition1.signal();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. } finally {
  60. lock.unlock();
  61. }
  62. }
  63. }
  64. public class LockThread3 {
  65. public static void main(String[] args) {
  66. ShareResource shareResource = new ShareResource();
  67. new Thread(()->{
  68. for (int i = 1; i <=10 ; i++) {
  69. shareResource.print5(i);
  70. }
  71. },"AA").start();
  72. new Thread(()->{
  73. for (int i = 1; i <=10 ; i++) {
  74. shareResource.print10(i);
  75. }
  76. },"BB").start();
  77. new Thread(()->{
  78. for (int i = 1; i <=10 ; i++) {
  79. shareResource.print15(i);
  80. }
  81. },"CC").start();
  82. }
  83. }

三.集合的线程安全

1.List

不安全案例

  1. public class NotSafeDemo {
  2. /**
  3. * 多个线程同时对集合进行修改
  4. * @param args
  5. */
  6. public static void main(String[] args) {
  7. List list = new ArrayList();
  8. for (int i = 0; i < 100; i++) {
  9. new Thread(() ->{
  10. list.add(UUID.randomUUID().toString());
  11. System.out.println(list);
  12. }, "线程" + i).start();
  13. }
  14. }
  15. }
  16. //异常内容
  17. java.util.ConcurrentModificationException

解決方法

  • 使用vector(里面的操作方法全都加synchronized,是线程安全的)
  • Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的
  • CopyOnWriteArrayList

CopyOnWriteArrayList讲解:

  1. 它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多 于可变操作,需要在遍历期间防止线程间的冲突。
  2. 它是线程安全的。
  3. 因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。
  4. 迭代器支持 hasNext(), next()等不可变操作,但不支持可变 remove()等操作。
  5. 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代 器时,迭代器依赖于不变的数组快照。

原理:

  1. 独占锁效率低:采用读写分离思想解决
  2. 写线程获取到锁,其他写线程阻塞
  3. 复制思想:当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容 器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素 之后,再将原容器的引用指向新的容器。 这时候会抛出来一个新的问题,也就是数据不一致的问题。如果写线程还没来 得及写会内存,其他的线程就会读到了脏数据。

示例

  1. //vector
  2. public static void main(String[] args) {
  3. List list = new Vector();
  4. for (int i = 0; i < 100; i++) {
  5. new Thread(() ->{
  6. list.add(UUID.randomUUID().toString());System.out.println(list);
  7. }, "线程" + i).start();
  8. }
  9. }
  10. //Collections
  11. public static void main(String[] args) {
  12. List list = Collections.synchronizedList(new ArrayList<>());
  13. for (int i = 0; i < 100; i++) {
  14. new Thread(() ->{
  15. list.add(UUID.randomUUID().toString());
  16. System.out.println(list);
  17. }, "线程" + i).start();
  18. }
  19. }
  20. //CopyOnWriteArrayList
  21. public static void main(String[] args) {
  22. List list = new CopyOnWriteArrayList();
  23. for (int i = 0; i < 100; i++) {
  24. new Thread(() ->{
  25. list.add(UUID.randomUUID().toString());
  26. System.out.println(list);
  27. }, "线程" + i).start();
  28. }
  29. }
  30. //CopyOnWriteArrayList的add源码
  31. public boolean add(E e) {
  32. final ReentrantLock lock = this.lock;
  33. lock.lock();
  34. try {
  35. Object[] elements = getArray();//elements使用volatile修饰
  36. int len = elements.length;
  37. Object[] newElements = Arrays.copyOf(elements, len + 1);
  38. newElements[len] = e;
  39. setArray(newElements);
  40. return true;
  41. } finally {
  42. lock.unlock();
  43. }
  44. }
  45. /*
  46. 原因分析(重点):==动态数组与线程安全==
  47. 下面从“动态数组”和“线程安全”两个方面进一步对
  48. CopyOnWriteArrayList 的原理进行说明。
  49. • “动态数组”机制
  50. o 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据
  51. 时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该
  52. 数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
  53. o 由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的
  54. 操作,CopyOnWriteArrayList 效率很低;但是单单只是进行遍历查找的话,
  55. 效率比较高。
  56. • “线程安全”机制
  57. o 通过 volatile 和互斥锁来实现的。
  58. o 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看
  59. 到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读
  60. 取到的数据总是最新的”这个机制的保证。
  61. o 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,
  62. 再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥
  63. 锁”,就达到了保护数据的目的。
  64. */

2.HashSet线程不安全

解决方案 CopyOnWriteArraySet

3.HashMap线程不安全

解决方案 ConcurrentHashMap

四.多线程锁

1.演示锁的八种情况

  1. class Phone {
  2. public static synchronized void sendSMS() throws Exception {
  3. //停留 4 秒
  4. TimeUnit.SECONDS.sleep(4);
  5. System.out.println("------sendSMS");
  6. }
  7. public synchronized void sendEmail() throws Exception {
  8. System.out.println("------sendEmail");
  9. }
  10. public void getHello() {
  11. System.out.println("------getHello");
  12. }
  13. }
  14. /**
  15. * @Description: 8 锁
  16. *
  17. 1 标准访问,先打印短信还是邮件
  18. ------sendSMS
  19. ------sendEmail
  20. 2 停 4 秒在短信方法内,先打印短信还是邮件
  21. ------sendSMS
  22. ------sendEmail
  23. 3 新增普通的 hello 方法,是先打短信还是 hello
  24. ------getHello
  25. ------sendSMS
  26. 4 现在有两部手机,先打印短信还是邮件
  27. ------sendEmail
  28. ------sendSMS
  29. 5 两个静态同步方法,1 部手机,先打印短信还是邮件
  30. ------sendSMS
  31. ------sendEmail6 两个静态同步方法,2 部手机,先打印短信还是邮件
  32. ------sendSMS
  33. ------sendEmail
  34. 7 1 个静态同步方法,1 个普通同步方法,1 部手机,先打印短信还是邮件
  35. ------sendEmail
  36. ------sendSMS
  37. 8 1 个静态同步方法,1 个普通同步方法,2 部手机,先打印短信还是邮件
  38. ------sendEmail
  39. ------sendSMS
  40. 结论:
  41. 一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用其中的
  42. 一个 synchronized 方法了,
  43. 其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些
  44. synchronized 方法
  45. 锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它的
  46. synchronized 方法
  47. 加个普通方法后发现和同步锁无关
  48. 换成两个对象后,不是同一把锁了,情况立刻变化。
  49. synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。
  50. 具体表现为以下 3 种形式。
  51. 对于普通同步方法,锁是当前实例对象。
  52. 对于静态同步方法,锁是当前类的 Class 对象。
  53. 对于同步方法块,锁是 Synchonized 括号里配置的对象
  54. 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
  55. 也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方
  56. 法必须等待获取锁的方法释放锁后才能获取锁,
  57. 可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
  58. 所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
  59. 所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所
  60. 以静态同步方法与非静态同步方法之间是不会有竞态条件的。
  61. 但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才
  62. 能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同
  63. 步方法之间,只要它们同一个类的实例对象!
  64. */

2.公平锁和非公平锁

  • 公平锁:直接调用线程执行。线程饿死,效率高。
  • 非公平锁:调用hasQueuedPredecessors询问线程是否执行过,阳光普照,效率较低。

3.可重入锁

  1. public static void main(String[] args) {
  2. final Object o = new Object();
  3. new Thread(()->{
  4. synchronized (o){
  5. System.out.println("1");
  6. synchronized (o){
  7. System.out.println("2");
  8. synchronized (o){
  9. System.out.println("3");
  10. }
  11. }
  12. }
  13. },"AA").start();
  14. }

4.死锁

  1. public static void main(String[] args) {
  2. final Object a = new Object();
  3. final Object b = new Object();
  4. new Thread(()->{
  5. synchronized (a){
  6. System.out.println(Thread.currentThread().getName()+":持有锁A,试图获取锁B");
  7. try {
  8. TimeUnit.SECONDS.sleep(1);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. synchronized (b){
  13. System.out.println(Thread.currentThread().getName()+":持有锁A,试图获取锁B");
  14. }
  15. }
  16. },"A").start();
  17. new Thread(()->{
  18. synchronized (b){
  19. System.out.println(Thread.currentThread().getName()+":持有锁B,试图获取锁A");
  20. try {
  21. TimeUnit.SECONDS.sleep(1);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. synchronized (a){
  26. System.out.println(Thread.currentThread().getName()+":持有锁B,试图获取锁A");
  27. }
  28. }
  29. },"B").start();
  30. }

五.Callable接口

1.Callable概述

目前我们学习了有两种创建线程的方法-一种是通过创建 Thread 类,另一种是 通过使用 Runnable 创建线程。但是,Runnable 缺少的一项功能是,当线程 终止时(即 run()完成时),我们无法使线程返回结果。为了支持此功能, Java 中提供了 Callable 接口。

现在我们学习的是创建线程的第三种方案—-Callable 接口

Callable 接口的特点如下(重点)

• 为了实现 Runnable,需要实现不返回任何内容的 run()方法,而对于 Callable,需要实现在完成时返回结果的 call()方法。

• call()方法可以引发异常,而 run()则不能。

• 为实现 Callable 而必须重写 call 方法

• 不能直接替换 runnable,因为 Thread 类的构造方法根本没有 Callable

2.Future接口

当 call()方法完成时,结果必须存储在主线程已知的对象中,以便主线程可 以知道该线程返回的结果。为此,可以使用 Future 对象。将 Future 视为保存结果的对象–它可能暂时不保存结果,但将来会保存(一旦 Callable 返回)。Future 基本上是主线程可以跟踪进度以及其他线程的结果的 一种方式。要实现此接口,必须重写 5 种方法,这里列出了重要的方法,如下:

• public boolean cancel(boolean mayInterrupt):用于停止任务。 如果尚未启动,它将停止任务。如果已启动,则仅在 mayInterrupt 为 true 时才会中断任务。

• public Object get()抛出 InterruptedException,ExecutionException: 用于获取任务的结果。 如果任务完成,它将立即返回结果,否则将等待任务完成,然后返回结果。

• public boolean isDone():如果任务完成,则返回 true,否则返回 false 可以看到 Callable 和 Future 做两件事-Callable 与 Runnable 类似,因为它封 装了要在另一个线程上运行的任务,而 Future 用于存储从另一个线程获得的结 果。实际上,future 也可以与 Runnable 一起使用。 要创建线程,需要 Runnable。为了获得结果,需要 future。

3.FutureTask

Java 库具有具体的 FutureTask 类型,该类型实现 Runnable 和 Future,并方 便地将两种功能组合在一起。 可以通过为其构造函数提供 Callable 来创建 FutureTask。然后,将 FutureTask 对象提供给 Thread 的构造函数以创建 Thread 对象。因此,间接地使用 Callable 创建线程。

核心原理(重点)

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些 作业交给 Future 对象在后台完成

• 当主线程将来需要时,就可以通过 Future 对象获得后台作业的计算结果或者执 行状态

• 一般 FutureTask 多用于耗时的计算,主线程可以在完成自己的任务后,再去 获取结果。

• 仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法

• 一旦计算完成,就不能再重新开始或取消计算

• get 方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完 成状态,然后会返回结果或者抛出异常

• get 只计算一次,因此 get 方法放到最后

Demo**

  1. /**
  2. * CallableDemo 案列
  3. */
  4. public class CallableDemo {
  5. /**
  6. * 实现 runnable 接口
  7. */
  8. static class MyThread1 implements Runnable{
  9. /**
  10. * run 方法
  11. */
  12. @Override
  13. public void run() {
  14. try { System.out.println(Thread.currentThread().getName() + "线程进入了 run
  15. 方法");
  16. }catch (Exception e){
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. /**
  22. * 实现 callable 接口
  23. */
  24. static class MyThread2 implements Callable{
  25. /**
  26. * call 方法
  27. * @return
  28. * @throws Exception
  29. */
  30. @Override
  31. public Long call() throws Exception {
  32. try {
  33. System.out.println(Thread.currentThread().getName() + "线程进入了 call
  34. 方法,开始准备睡觉");
  35. Thread.sleep(1000);
  36. System.out.println(Thread.currentThread().getName() + "睡醒了");
  37. }catch (Exception e){
  38. e.printStackTrace();
  39. } return System.currentTimeMillis();
  40. }
  41. }
  42. public static void main(String[] args) throws Exception{
  43. //声明 runable
  44. Runnable runable = new MyThread1();
  45. //声明 callable
  46. Callable callable = new MyThread2();
  47. //future-callable
  48. FutureTask<Long> futureTask2 = new FutureTask(callable);
  49. //线程二
  50. new Thread(futureTask2, "线程二").start();
  51. for (int i = 0; i < 10; i++) {
  52. Long result1 = futureTask2.get();
  53. System.out.println(result1);
  54. }
  55. //线程一
  56. new Thread(runable,"线程一").start();
  57. }
  58. }

六.JUC辅助类

JUC 中提供了三种常用的辅助类,通过这些辅助类可以很好的解决线程数量过 多时 Lock 锁的频繁操作。这三种辅助类为:

• CountDownLatch: 减少计数

• CyclicBarrier: 循环栅栏

• Semaphore: 信号灯

1.CountDownLatch: 减少计数

CountDownLatch 类可以设置一个计数器,然后通过 countDown 方法来进行 减 1 的操作,使用 await 方法等待计数器不大于 0,然后继续执行 await 方法 之后的语句。

• CountDownLatch 主要有两个方法,当一个或多个线程调用 await 方法时,这 些线程会阻塞

• 其它线程调用 countDown 方法会将计数器减 1(调用 countDown 方法的线程 不会阻塞)

• 当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行

场景: 6 个同学陆续离开教室后值班同学才可以关门。

  1. package com.zjl.test.estest.JUCTest.JUCAuxiliaryClass;
  2. import java.util.concurrent.CountDownLatch;
  3. public class CountDownLatchDemo {
  4. public static void main(String[] args) throws InterruptedException {
  5. final CountDownLatch countDownLatch = new CountDownLatch(6);
  6. for (int i = 1; i <=6 ; i++) {
  7. new Thread(()->{
  8. System.out.println(Thread.currentThread().getName()+"号同学离开教师");
  9. countDownLatch.countDown();
  10. },""+i).start();
  11. }
  12. countDownLatch.await();
  13. System.out.println("教师锁门");
  14. }
  15. }

2 循环栅栏 CyclicBarrier CyclicBarrier

看英文单词可以看出大概就是循环阻塞的意思,在使用中 CyclicBarrier 的构造方法第一个参数是目标障碍数,每次执行 CyclicBarrier 一 次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后 的语句。可以将 CyclicBarrier 理解为加 1 操作

场景: 集齐 7 颗龙珠就可以召唤神龙

  1. package com.zjl.test.estest.JUCTest.JUCAuxiliaryClass;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class CyclicBarrierDemo {
  5. private static final Integer NUMBER=7;
  6. public static void main(String[] args) {
  7. CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,()->{
  8. System.out.println("已集齐七颗龙珠");
  9. });
  10. for ( int i = 1; i <=7; i++) {
  11. new Thread(()->{
  12. System.out.println(Thread.currentThread().getName()+"号龙珠被收集");
  13. try {
  14. cyclicBarrier.await();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } catch (BrokenBarrierException e) {
  18. e.printStackTrace();
  19. }
  20. },""+i).start();
  21. }
  22. }
  23. }

3 信号灯 Semaphore

Semaphore 的构造方法中传入的第一个参数是最大信号量(可以看成最大线 程池),每个信号量初始化为一个最多只能分发一个许可证。使用 acquire 方 法获得许可证,release 方法释放许可 场景: 抢车位, 6 部汽车 3 个停车位

  1. package com.zjl.test.estest.JUCTest.JUCAuxiliaryClass;
  2. import java.util.Random;
  3. import java.util.concurrent.Semaphore;
  4. import java.util.concurrent.TimeUnit;
  5. public class SemaphoreDemo {
  6. public static void main(String[] args) {
  7. //设置三个停车位
  8. Semaphore semaphore = new Semaphore(3);
  9. //模拟六辆车
  10. for (int i = 1; i <=6 ; i++) {
  11. new Thread(()->{
  12. try {
  13. //抢占车位
  14. semaphore.acquire();
  15. System.out.println(Thread.currentThread().getName()+"号车抢到了车位");
  16. TimeUnit.SECONDS.sleep(new Random().nextInt(5));
  17. System.out.println(Thread.currentThread().getName()+"--------离开了车位");
  18. } catch (InterruptedException e) {
  19. }finally {
  20. semaphore.release();
  21. }
  22. },""+i).start();
  23. }
  24. }
  25. }

七.读写锁

概述

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那 么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以 应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源, 就不应该允许其他线程对该资源进行读和写的操作了。

针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock, 它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称 为排他锁

1.线程进入读锁的前提条件:

  1. 没有其他线程的写锁
  2. 没有写请求, 或者==有写请求,但调用线程和持有锁的线程是同一个(可重入 锁)。

2.线程进入写锁的前提条件:

  1. 没有其他线程的读锁
  2. 没有其他线程的写锁 而读写锁有以下三个重要的特性:
  3. 1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公 平优于公平。
  4. 2)重进入:读锁和写锁都支持线程重进入。
  5. 3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为 读锁。

1.悲观锁和乐观锁

2.读写锁

3.volatile关键字解释

volatile 关键字详解:https://www.cnblogs.com/dolphin0520/p/3920373.html

4.demo

  1. package com.zjl.test.estest.JUCTest.writereadlock;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.locks.ReentrantReadWriteLock;
  6. class MyCache{
  7. private volatile Map<String,Object> map=new HashMap<>();
  8. ReentrantReadWriteLock lock= new ReentrantReadWriteLock();
  9. public Object get(String key){
  10. lock.readLock().lock();
  11. System.out.println(Thread.currentThread().getName()+"正在取"+key);
  12. Object result=null;
  13. try {
  14. TimeUnit.MICROSECONDS.sleep(300);
  15. result=map.get(key);
  16. System.out.println(Thread.currentThread().getName()+"取完了"+key);
  17. } catch (InterruptedException e) {
  18. }finally {
  19. lock.readLock().unlock();
  20. }
  21. return result;
  22. }
  23. public void put(String key,Object value){
  24. lock.writeLock().lock();
  25. System.out.println(Thread.currentThread().getName()+"正在操作"+key);
  26. try {
  27. TimeUnit.MICROSECONDS.sleep(300);
  28. map.put(key,value);
  29. System.out.println(Thread.currentThread().getName()+"操作完了"+key);
  30. } catch (InterruptedException e) {
  31. }finally {
  32. lock.writeLock().unlock();
  33. }
  34. }
  35. }
  36. public class WriteReadLockDemo {
  37. public static void main(String[] args) {
  38. MyCache myCache = new MyCache();
  39. for (int i = 1; i <=5 ; i++) {
  40. final int num=i;
  41. new Thread(()->{
  42. myCache.put(num+"",num);
  43. },i+"").start();
  44. }
  45. for (int i = 1; i <=5 ; i++) {
  46. final int num=i;
  47. new Thread(()->{
  48. final Object o = myCache.get(num + "");
  49. System.out.println(o);
  50. },i+"").start();
  51. }
  52. }
  53. }

5.锁降级

6.小结

• 在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发 现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。

• 在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写 锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

原因: 当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把 获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写 锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释 放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

八.阻塞队列

1. BlockingQueue 简介

Concurrent 包中,BlockingQueue 很好的解决了多线程中,如何高效安全 “传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建 高质量的多线程程序带来极大的便利。本文详细介绍了 BlockingQueue 家庭 中的所有成员,包括他们各自的功能以及常见使用场景。 阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据 由队列的一端输入,从另外一端输出;

当队列是空的,从队列中获取元素的操作将会被阻塞

当队列是满的,从队列中添加元素的操作将会被阻塞

试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素

试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多 个元素或者完全清空,使队列变得空闲起来并后续新增

常用的队列主要有以下两种

• 先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。 从某种程度上来说这种队列也体现了一种公平性

• 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发 生的事件(栈)

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起 的线程又会自动被唤起

为什么需要 BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了

在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细 节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和 “消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我 们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准 备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地 解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一 发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度 大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么 生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的 数据处理完毕,反之亦然。

• 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起), 直到有数据放入队列

• 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起), 直到队列中有空的位置,线程被自动唤醒

2. BlockingQueue 核心方法

BlockingQueue 的核心方法:

1.放入数据

• offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即 如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当 前执行方法的线程)

• offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定 的时间内,还不能往队列中加入 BlockingQueue,则返回失败

• put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有 空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.

2.获取数据

• poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null

• poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象, 如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知 道时间超时还没有数据可取,返回失败。

• take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断 进入等待状态直到 BlockingQueue 有新的数据被加入;

• drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定 获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加 锁或释放锁。

3.案例

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * 阻塞队列
  8. */
  9. public class BlockingQueueDemo {
  10. public static void main(String[] args) throws InterruptedException {// List list = new ArrayList();
  11. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
  12. //第一组
  13. // System.out.println(blockingQueue.add("a"));
  14. // System.out.println(blockingQueue.add("b"));
  15. // System.out.println(blockingQueue.add("c"));
  16. // System.out.println(blockingQueue.element());
  17. //System.out.println(blockingQueue.add("x"));
  18. // System.out.println(blockingQueue.remove());
  19. // System.out.println(blockingQueue.remove());
  20. // System.out.println(blockingQueue.remove());
  21. // System.out.println(blockingQueue.remove());
  22. // 第二组
  23. // System.out.println(blockingQueue.offer("a"));
  24. // System.out.println(blockingQueue.offer("b"));
  25. // System.out.println(blockingQueue.offer("c"));
  26. // System.out.println(blockingQueue.offer("x"));
  27. // System.out.println(blockingQueue.poll());
  28. // System.out.println(blockingQueue.poll());
  29. // System.out.println(blockingQueue.poll());
  30. // System.out.println(blockingQueue.poll());
  31. // 第三组
  32. // blockingQueue.put("a");
  33. // blockingQueue.put("b");
  34. // blockingQueue.put("c");
  35. // //blockingQueue.put("x");
  36. // System.out.println(blockingQueue.take());
  37. // System.out.println(blockingQueue.take());
  38. // System.out.println(blockingQueue.take());
  39. // System.out.println(blockingQueue.take());
  40. // 第四组
  41. System.out.println(blockingQueue.offer("a"));
  42. System.out.println(blockingQueue.offer("b"));
  43. System.out.println(blockingQueue.offer("c"));
  44. System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));}
  45. }

4 常见的 BlockingQueue

ArrayBlockingQueue(常用)

基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数 组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数 组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的 头部和尾部在数组中的位置。 ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个 锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可 以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之 所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已 经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其 在性能上完全占不到任何便宜。 ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除 元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还 可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

一句话总结: 由数组结构组成的有界阻塞队列。

LinkedBlockingQueue(常用)

基于链表的阻塞队列,同 ArrayListBlockingQueue 类似,其内部也维持着一 个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据 时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回; 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过 构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。 而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生 产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发 的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列 的并发性能。 ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用 的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个 类足以。

一句话总结: 由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。

DelayQueue

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到 该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的 操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻 塞。

一句话总结: 使用优先级队列实现的延迟无界阻塞队列。

PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来 决定),但需要注意的是 PriorityBlockingQueue 并不会阻塞数据生产者,而 只会在没有可消费的数据时,阻塞数据的消费者。 因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费 数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。 在实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。

一句话总结: 支持优先级排序的无界阻塞队列。

SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产 者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须 亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么 对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一 个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经 销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以 库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式 会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得 产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能 可能会降低。 声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的 行为。 公平模式和非公平模式的区别: • 公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞 多余的生产者和消费者,从而体系整体的公平策略; • 非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平 锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式, 如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有 某些生产者或者是消费者的数据永远都得不到处理。

一句话总结: 不存储元素的阻塞队列,也即单个元素的队列。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队 列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。 LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如 果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时 发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的 方法返回。 一句话总结: 由链表组成的无界阻塞队列。 9.4.7 LinkedBlockingDeque LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队 列的两端插入和移除元素。 对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作 可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情 况 • 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时 再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作 失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异 常 • 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可 以通过设置超时参数

一句话总结: 由链表组成的双向阻塞队列

5.小结

  1. 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件 满足,被挂起的线程又会自动被唤起
  2. 为什么需要 BlockingQueue? 在 concurrent 包发布以前,在多线程环境下, 我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全, 而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要 阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手 包办了

九.ThreadPool 线程池

1 线程池简介

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理 者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代 价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

例子: 10 年前单核 CPU 电脑,假的多线程,像马戏团小丑玩多个球,CPU 需 要来回切换。 现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换 效率高。

线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任 务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量, 超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:

• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。

• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。

• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资 源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

• Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors, ExecutorService,ThreadPoolExecutor 这几个类

2.七大参数

常用参数(重点)

• corePoolSize 线程池的核心线程数

• maximumPoolSize 能容纳的最大线程数

• keepAliveTime 空闲线程存活时间

• unit 存活的时间单位

• workQueue 存放提交但未执行任务的队列

• threadFactory 创建线程的工厂类

• handler 等待队列满后的拒绝策略

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线 程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻 塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池 的拒绝策略了。

总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

3.拒绝策略(重点)

CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由 于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效 率上必然的损失较大

AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常 信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执 行流程,影响后续的任务执行。

DiscardPolicy: 直接丢弃,其他啥都没有

DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞 队列 workQueue 中最老的一个任务,并将新任务加入

4.线程池的种类与创建

newCachedThreadPool(常用)

作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程.

特点:

  1. 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE
  2. 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
  3. 当线程池中,没有可用线程,会重新创建一个线程

创建方式:

  1. /**
  2. * 可缓存线程池
  3. * @return
  4. */
  5. public static ExecutorService newCachedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(0,
  16. Integer.MAX_VALUE,
  17. 60L,
  18. TimeUnit.SECONDS,
  19. new SynchronousQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较 短,任务多的场景

newFixedThreadPool(常用)

作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这 些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线 程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中 等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线 程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池 中的线程将一直存在。

特征:

  1. 线程池中的线程处于一定的量,可以很好的控制线程的并发量
  2. 线程可以重复被使用,在显示关闭之前,都将一直存在
  3. 超出一定量的线程被提交时候需在队列中等待

创建方式:

  1. /**
  2. * 固定长度线程池
  3. * @return
  4. */
  5. public static ExecutorService newFixedThreadPool(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(10,
  16. 10,
  17. 0L,
  18. TimeUnit.SECONDS,
  19. new LinkedBlockingQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严 格限制的场景

newSingleThreadExecutor(常用)

作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该 线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程, 那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各 个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即 可使用其他的线程。 特征: 线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此 执行

创建方式:

  1. /**
  2. * 单一线程池
  3. * @return
  4. */
  5. public static ExecutorService newSingleThreadExecutor(){
  6. /**
  7. * corePoolSize 线程池的核心线程数
  8. * maximumPoolSize 能容纳的最大线程数
  9. * keepAliveTime 空闲线程存活时间
  10. * unit 存活的时间单位
  11. * workQueue 存放提交但未执行任务的队列
  12. * threadFactory 创建线程的工厂类:可以省略
  13. * handler 等待队列满后的拒绝策略:可以省略
  14. */
  15. return new ThreadPoolExecutor(1,
  16. 1,
  17. 0L,
  18. TimeUnit.SECONDS,
  19. new LinkedBlockingQueue<>(),
  20. Executors.defaultThreadFactory(),
  21. new ThreadPoolExecutor.AbortPolicy());
  22. }

场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个 线程的场景

newScheduleThreadPool(了解)

作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参 数,最大线程数为整形的最大数的线程池

特征: (1)线程池中具有指定数量的线程,即便是空线程也将保留 (2)可定时或者 延迟执行线程活动

创建方式:

  1. public static ScheduledExecutorService newScheduledThreadPool(int
  2. corePoolSize,
  3. ThreadFactory threadFactory) {
  4. return new ScheduledThreadPoolExecutor(corePoolSize,
  5. threadFactory);
  6. }

场景: 适用于需要多个后台线程执行周期任务的场景

newWorkStealingPool

jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个 任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执 行任务

创建方式:

  1. public static ExecutorService newWorkStealingPool(int parallelism) {
  2. /**
  3. * parallelism:并行级别,通常默认为 JVM 可用的处理器个数
  4. * factory:用于创建 ForkJoinPool 中使用的线程。
  5. * handler:用于处理工作线程未处理的异常,默认为 null
  6. * asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列*/
  7. return new ForkJoinPool(parallelism,
  8. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  9. null,
  10. true);
  11. }

场景: 适用于大耗时,可并行执行的场景

5.案例

场景: 火车站 3 个售票口, 10 个用户买

  1. package com.atguigu.test;
  2. import java.util.concurrent.*;
  3. /**
  4. * 入门案例
  5. */
  6. public class ThreadPoolDemo1 {
  7. /**
  8. * 火车站 3 个售票口, 10 个用户买票
  9. * @param args
  10. */
  11. public static void main(String[] args) {
  12. //定时线程次:线程数量为 3---窗口数为 3
  13. ExecutorService threadService = new ThreadPoolExecutor(3,
  14. 3,
  15. 60L,
  16. TimeUnit.SECONDS,
  17. new LinkedBlockingQueue<>(),
  18. Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
  19. try {
  20. //10 个人买票
  21. for (int i = 1; i <= 10; i++) {
  22. threadService.execute(()->{
  23. try {
  24. System.out.println(Thread.currentThread().getName() + "
  25. 窗口,开始卖票");
  26. Thread.sleep(5000);
  27. System.out.println(Thread.currentThread().getName() + "
  28. 窗口买票结束");
  29. }catch (Exception e){
  30. e.printStackTrace();
  31. }
  32. });
  33. }
  34. }catch (Exception e){
  35. e.printStackTrace();
  36. }finally {
  37. //完成后结束
  38. threadService.shutdown();
  39. }
  40. }
  41. }

6. 线程池底层工作原理(重要)

  1. 在创建了线程池后,线程池中的线程数为零

  2. 当调用 execute()方法添加一个请求任务时,线程池会做出如下判断:
    2.1 如 果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入 队列;
    2.3 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    2.4 如 果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程 池会启动饱和拒绝策略来执行。

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
    4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
    4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

7.注意事项(重要)

  1. 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都 有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用 LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE, 容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参 数,自定义线程池

  2. 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建
    o corePoolSize 线程池的核心线程数
    o maximumPoolSize 能容纳的最大线程数
    o keepAliveTime 空闲线程存活时间
    o unit 存活的时间单位
    o workQueue 存放提交但未执行任务的队列
    o threadFactory 创建线程的工厂类
    o handler 等待队列满后的拒绝策略

  3. 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图