多线程

入门的多线程

线程的状态

线程的状态主要有6个,分别是 NEW RUNNABLE BLOCKED WAITING TIMED_WAITING TERMINAL

其中RUNNABLE 中包含着2个小状态,分别是就绪ready 和 run

wait & sleep

二者都可以暂停当前线程。

wait用于线程之间的通信,交互 ,而 sleep 则是用来暂停当前的线程

wait 不会自动唤醒线程,需要其他的线程使用notifyAll 来进行唤醒操作 sleep可以自动唤醒线程。

上下文切换

所谓的上下文切换,是指当线程数超过了CPU的核数,那么我们为了保证这些线程能够并行执行,采用了时间片轮转法,为每一个线程分配一个时间片,当时间片走完,线程将保存自己的状态,让出CPU执行权给下一个线程,当下一次执行时,线程可以把自己的状态加载出来,这就是上下文切换。

synchronized 和 volatile 的区别:

synchronized 修饰的是方法,是代码块 ,而volatile 主要修饰的是变量

volatile 主要是保证共享变量在多个线程之间的可见性,而synchronized主要是保证了多线程之间访问资源的同步性

volatile 可以保证可见性,但是不保证原子性, synchronized 可以保证原子性(因为这是个锁);

单例模式:双重判断锁

之所以将静态变量修饰为volatile

避免指令重排,因为new 一个对象的过程是分3步骤的 1.分配内存空间 2.初始化 3. 指向内存地址 ,如果说在执行中打乱顺序先13 后 2 那么在其他线程进入的时候,会发现a不是null,就会返回一个没有初始化的对象!

  1. class A {
  2. private static volatile A a;
  3. private A(){}
  4. public static A getInstance(){
  5. if (a ==null){
  6. synchronized (A.class){
  7. if (a == null){
  8. a = new A();
  9. }
  10. }
  11. }
  12. return a;
  13. }
  14. }

synchronized + wait + notify

生产者和消费者的问题:

  1. //生产者和消费者的最初版本
  2. public class pc1 {
  3. public int capacity = 0;
  4. //有一个生产者
  5. public synchronized void product() throws InterruptedException {
  6. while(capacity==1){
  7. this.wait();
  8. }
  9. System.out.println("线程名"+Thread.currentThread().getName()+" 容量"+capacity);
  10. capacity++;
  11. this.notify();
  12. }
  13. //有一个消费者
  14. public synchronized void consume() throws InterruptedException {
  15. while(capacity==0){
  16. this.wait();
  17. }
  18. System.out.println("线程名"+Thread.currentThread().getName()+" 容量"+capacity);
  19. capacity--;
  20. this.notify();
  21. }
  22. }

注意:

1.在写生产者和消费者时,我们首先要注意的是,1.判断阻塞是否阻塞当前进程的时候,不要用 if 要使用的是 while 因为 一个线程阻塞后再次被唤醒后,会继续之前的执行到的位置进行执行,所以当我们有多个生产者和消费者的时候,我们如果使用 if 那么notifyAll ( ) 后,所有的wait 线程都会被唤醒,且因为之前已经判断过了一次条件,那么就会使阻塞失效,生产和消费不平衡;

2.要在runnable中的run中去设置循环,不要循环的创建线程,而是让线程内部进行循环!

JUC

lock + condition

完整的生活者和消费者

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. @SuppressWarnings({"all"})
  5. public class pc2{
  6. private int capacity = 0;
  7. //锁
  8. Lock lock = new ReentrantLock();
  9. //监视器
  10. Condition condition = lock.newCondition();
  11. //生产者代码
  12. public void product(){
  13. //关门
  14. lock.lock();
  15. try{
  16. while (capacity!=0){
  17. //如果说我们的库存还有,就进行等待
  18. condition.await();
  19. }
  20. System.out.println(Thread.currentThread().getName() + " "+capacity);
  21. capacity++;
  22. System.out.println("生产者进行生产");
  23. condition.signalAll();
  24. }catch (Exception e){
  25. e.printStackTrace();
  26. }finally{
  27. //开门
  28. lock.unlock();
  29. }
  30. }
  31. //消费者代码
  32. public void consume(){
  33. lock.lock();
  34. try{
  35. while (capacity==0){
  36. //没有库存了 阻塞,
  37. condition.await();
  38. }
  39. System.out.println(Thread.currentThread().getName() + " "+capacity);
  40. capacity--;
  41. System.out.println("大爷来消费了");
  42. condition.signalAll();
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }finally {
  46. lock.unlock();
  47. }
  48. }
  49. public static void main(String[] args) {
  50. pc2 pc = new pc2();
  51. new Thread(new Runnable() {
  52. @Override
  53. public void run() {
  54. for (int i = 0; i < 50; i++) {
  55. pc.product();
  56. }
  57. }
  58. }).start();
  59. new Thread(new Runnable() {
  60. @Override
  61. public void run() {
  62. for (int i = 0; i < 50; i++) {
  63. pc.product();
  64. }
  65. }
  66. }).start();
  67. new Thread(new Runnable() {
  68. @Override
  69. public void run() {
  70. for (int i = 0; i < 50; i++) {
  71. pc.consume();
  72. }
  73. }
  74. }).start();
  75. new Thread(new Runnable() {
  76. @Override
  77. public void run() {
  78. for (int i = 0; i < 50; i++) {
  79. pc.consume();
  80. }
  81. }
  82. }).start();
  83. }
  84. }

针对于notify 和 notifyAll 的区别 ,每一个锁有一个 waitset 当通知 notifyAll 时,所有 waitset 中的线程都会被激活,去抢夺资源,但是如果仅仅是 notify 结果是从 waitset 中随机的释放一个线程,而如果这个线程无法解除其他线程的状态,那么整个进程也就会处于死锁状态;

锁到底是谁的?

锁属于调用方法的对象 。这个锁因为对象的不同,而不同。

如果是静态的方法 锁属于Class 对象。这个锁全局只有一个。

集合的线程问题

1.HashMap的线程不安全问题:

当我们开启多个线程对于一个HashMap进行写入,删除等操作的时候,就会触发 ConcurrentModificationException 异常 ,解决这个问题的方法有2种:

1.我们可以去调用Collections的工具类中的API来使我们的集合编程线程安全的

2.我们可以使用ConcurrentHashmap 创建一个map的实例对象:

ConcurrentHashmap 和 HashTable相比的优点在于: 锁的粒度更小了,HashTable是针对于整个Table表进行加锁,每次读写,都要锁住整张表,这样一来,读写的效率就会变差 ,而ConcurrentHashmap 是桶加锁,所谓的桶在我看来就是Table中的每一条链表或者说是红黑树,锁住这个桶,并不会影响其他桶的读写操作,所以说如果一个size是16的map ,当我们使用ConcurrentHashmap就可以同时完成16条线程同时进行读写操作,但是 HashTable 只能支持一条线程。

Callable

callable 和 runnable 和 thread 之间的关系;callable相比runnable的优势在于这个方法可以拥有返回值,可以抛出异常,线程最终调用的方法时 call 不是 run

callable的使用方法

  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.FutureTask;
  4. import java.util.concurrent.TimeUnit;
  5. public class callabledemo {
  6. public static void main(String[] args) throws ExecutionException, InterruptedException {
  7. FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  8. @Override
  9. public String call() throws Exception {
  10. System.out.println("线程正在执行");
  11. TimeUnit.SECONDS.sleep(5);
  12. System.out.println("执行结束,返回String");
  13. return "String";
  14. }
  15. });
  16. new Thread(futureTask).start();
  17. new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. try {
  21. TimeUnit.SECONDS.sleep(1);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("不会阻塞");
  26. }
  27. }).start();
  28. String res = futureTask.get();//会阻塞当前程序直到结果产生
  29. System.out.println(res);
  30. }
  31. }

对比2个线程,我们可以发现问题的所在首先callable要借助futureTask 这个实现了runnable的接口来启动一个线程,那可以把FutureTask理解成一个适配器类,我们实际上运行的还是callable中的call方法,另外,我们可以在上面2个线程中看到,runnable只可以在内部处理异常,但是callable可以将可能出现的异常向上抛出,另外执行程序后,在futureTak.get( ) 方法执行的时候,会阻塞当前的主线程,所以我们下面的输出方法,会等到get方法执行结束后,才能继续执行;

三个辅助类

CyclicBarrier 这个类的特点在于,在达到了某个条件的情况下,线程会自动的触发。基本的使用方法如下:

我们设置了数字3,这个三代表当有3个线程被我们阻塞后,可以开始执行我们的程序,当然,当我们的程序执行结束之后,被阻塞的三个线程也会自动唤醒,继续执行;

主要的代码就是 await 方法,相当于 每阻塞一个线程都会 使当前进度+1

  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.CyclicBarrier;
  3. import java.util.concurrent.FutureTask;
  4. import java.util.concurrent.TimeUnit;
  5. public class tool1 {
  6. public static void main(String[] args) {
  7. CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
  8. @Override
  9. public void run() {
  10. System.out.println("开始执行");
  11. }
  12. });
  13. new Thread(new FutureTask<String>(new Callable<String>() {
  14. @Override
  15. public String call() throws Exception {
  16. System.out.println("前置1");
  17. barrier.await();
  18. TimeUnit.SECONDS.sleep(2);
  19. System.out.println("后置1");
  20. return null;
  21. }
  22. })).start();
  23. new Thread(new FutureTask<String>(new Callable<String>() {
  24. @Override
  25. public String call() throws Exception {
  26. System.out.println("前置2");
  27. barrier.await();
  28. TimeUnit.SECONDS.sleep(2);
  29. System.out.println("后置2");
  30. return null;
  31. }
  32. })).start();
  33. new Thread(new FutureTask<String>(new Callable<String>() {
  34. @Override
  35. public String call() throws Exception {
  36. System.out.println("前置3");
  37. barrier.await();
  38. TimeUnit.SECONDS.sleep(2);
  39. System.out.println("后置3");
  40. return null;
  41. }
  42. })).start();
  43. }
  44. }

CountDownLatch

这个辅助类可以理解为当某些线程已经全部执行完毕后,才能继续接下来的线程,主要的方法有两个,1. countDown ( ) 代表当前的线程执行结束,构建对象时设置的目标数-1 2. await ( ) 这里代表当前的对象阻拦的位置,当我们的目标数不减到0时,进程会一直处于阻塞状态。

  1. import java.util.concurrent.CountDownLatch;
  2. import java.util.concurrent.TimeUnit;
  3. public class tool2 {
  4. public static void main(String[] args) throws InterruptedException {
  5. CountDownLatch countDownLatch = new CountDownLatch(3);
  6. for (int i = 0; i < 3; i++) {
  7. new Thread(new Runnable() {
  8. @Override
  9. public void run() {
  10. System.out.println("执行");
  11. try {
  12. TimeUnit.SECONDS.sleep(2);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. countDownLatch.countDown();
  17. }
  18. }).start();
  19. }
  20. countDownLatch.await();
  21. System.out.println("执行完毕");
  22. }
  23. }

semaphore 信号量

主要方法2个 acquire 获取一个位置 release 释放一个空间

  1. import java.util.concurrent.Semaphore;
  2. import java.util.concurrent.TimeUnit;
  3. public class tool3 {
  4. public static void main(String[] args) {
  5. Semaphore semaphore = new Semaphore(3);
  6. //这个就是限流操作。只允许最多有几个进程能够正常运行,可以理解为这是一个大厕所,最多3个人可以进去,如果其他人想进去就要等待!
  7. for (int i = 0; i < 6; i++) {
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. semaphore.acquire();
  13. System.out.println("我在上厕所");
  14. TimeUnit.SECONDS.sleep(2);
  15. System.out.println("我上完了");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }finally{
  19. semaphore.release();
  20. }
  21. }
  22. }).start();
  23. }
  24. }
  25. }

线程池

1.快速创建线程池的三种方式:

通过Executors这个工具类我们就可以快速的构建出所需的线程池,其底层实际上是调用了ThreadPoolExecutor 根据我们所传入参数设计线程池。

  1. ExecutorService pool1 = Executors.newCachedThreadPool();//任意个线程池,随着当前并发事件的增加而增加
  2. ExecutorService pool2 = Executors.newSingleThreadExecutor();//单个线程池,自己去处理所有的线程
  3. ExecutorService pool3 = Executors.newFixedThreadPool(5);//固定的线程池,通过5个线程来处理并发

2.自定义线程池:

我们自定义的线程池可以更具体细致的设计线程池 主要有核心线程数 最大线程数 阻塞队列这三个 ,一个线程池,最大能同时容纳的线程数为 最大线程数 + 阻塞队列 ,在使用默认的拒绝策略的情况下,如果当前没有空闲的线程,则会抛出异常。当一个线程执行完毕后,会自动回到池子,也就是说执行全部任务后,仍可以在空闲情况下继续接受任务,但是如果线程池关掉了,就无法执行其他的任务!

  1. import java.sql.Time;
  2. import java.util.concurrent.*;
  3. public class threadpool {
  4. public static void main(String[] args) throws InterruptedException {
  5. //手动创建线程池
  6. ThreadPoolExecutor threadPoolExecutor =
  7. new ThreadPoolExecutor(2, 3, 3, TimeUnit.SECONDS,
  8. new LinkedBlockingQueue<Runnable>(3)
  9. );
  10. //手动创建线程池,参数的意义:核心线程数,代表我们正常运行时会处于工作状态的线程数;
  11. // 最大线程数:代表,当核心线程数正在工作状态且阻塞队列已满的时候就会把最大线程数-核心线程数个进程唤醒工作;
  12. for (int i = 0; i < 6; i++) {
  13. threadPoolExecutor.execute(new Runnable() {
  14. @Override
  15. public void run() {
  16. System.out.println(Thread.currentThread().getName());
  17. try {
  18. TimeUnit.SECONDS.sleep(1);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. });
  24. }
  25. //一个线程跑完了就会回到池子里,就可以重新调用了;
  26. TimeUnit.SECONDS.sleep(7);
  27. threadPoolExecutor.execute(new Runnable() {
  28. @Override
  29. public void run() {
  30. System.out.println(Thread.currentThread().getName());
  31. }
  32. });
  33. threadPoolExecutor.shutdown();//彻底关闭线程池子;
  34. //这是关了几个?
  35. threadPoolExecutor.execute(new Runnable() {
  36. @Override
  37. public void run() {
  38. System.out.println("over");
  39. }
  40. });
  41. }
  42. }

IO密集型: 代表大多数时间在用来读写,很少进行计算,可以开启多个线程进行IO的输入输出

CPU密集型:代表当前运算工作非常的多,要跑满整个cpu,而不是单核进行运算

volatile

有三个属性:可见性,有序性,不保证原子性

可见性

再聊可见性之前,务必先了解什么是JMM java 内存模型

而这种模型的问题就是每一个模块是独立的,就是说,线程1在从本地内存中读取到主存中的数据后,后面的运算也会接着这个本地内存中的数据进行后续操作,但是如果线程2刚好对主存中数据进行了修改,那么线程1就会发生无效的计算。所以volatile的出现就解决了这个问题,volatile在读取数据的时候,不再通过本地内存进行数据的读,而是直接从主存中进行去读,确保数据的可见性。

有序性

对于有序行实际上是避免了编译器对于代码的指令重新排序,因为在单个线程中,我们有些指令和指令之间的数据是可以进行修改先后顺序的,但是如果多个线程之下,这个重新排序就会产生问题,举个例子:

如果是想先执行1再执行2,但是可能就会因为指令重排的原因发生先执行1的flag = ture 再执行2 结束后 a = 1 这就导致了结果会出现错误,而volatile可以保证这个顺序的不发生改变,这也就是有序性;

  1. public void f1(){
  2. try {
  3. TimeUnit.SECONDS.sleep(1);
  4. } catch (InterruptedException e) {
  5. e.printStackTrace();
  6. }
  7. a = 1;
  8. flag = true;
  9. }
  10. public void f2(){
  11. if (flag){
  12. System.out.println(a);
  13. }
  14. }

不保证原子性

是说volatile这个关键字,在读取主存中的数据的时候,可以被多个线程获取到,而多个线程读到同一个数据的时候,就会都认为自己拿到的数据是正确的,然后就会对这个数据进行相关的操作,在写回答内存的时候,就会发生问题,i++ ,多个线程同时读取到后,各自进行++操作,再写会主存,现在就会有多个线程会同时读到一个数据,++后写回。

ThreadLocal

什么是ThreadLocal呢?

ThreadLocal的用途主要是用于线程与线程之间的数据隔离,每个线程都有自己的独一份的变量。

但是这个变量并不是存储在ThreadLocal自己的属性中,而是在Thread中每个Thread都有一个自己的ThreadLocalMap 这个map可以称为是Thread 的本地内存,而ThreadLocal 正是去获取到当前线程的map , 然后将自己这个ThreadLocal的对象和value 一起存放在map中,这个就是所谓的ThreadLocal。

ThreadLocal中的内存泄漏,是说ThreadLocalMap中的Key是弱引用类型的,所谓所引用类型,就是指,只要当gc发现它没有栈空间的变量指向它的时候,gc就可以回收掉它,也就是说当我们的ThreadLocal不在指向这个K的时候,这个K就会自动回收,但是由于value是强引用,所以我们在gc回收了key后,就会留下一个 key:null value:obj的元素,gc就无法再去回收它,而在ThreaLocal中在调用set get remove的时候,都会检查当前的key是不是null 如果是null 就会把value也置为null ,进而是gc可以回收掉