image.png

简介

什么是juc

**JUC**就是**java.util.concurrent**工具包的简称。这是一个处理线程的工具包,jdk1.5开始出现;

线程和进程概念

进程与线程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
总结来说:

  • 进程:指在系统中正在运行的一个应用程序,程序一旦运行就是进程,进程一一资源分配的最小单位。
  • 线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程一一程序执行的最小单位。

    线程的状态

  • NEW新建

  • RUNNABLE准备就绪
  • BLOCKED阻塞
  • WAITING不见不散
  • TIMIED_WAITING过时不候
  • TERMINATED终结

    waitsleep区别

  1. sleepThread的静态方法,waitObject的方法,任何对象实例都能调用。
  2. sleep不会释放锁,它也不需要占用锁。wait会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized中)。
  3. 它们都可以被interrupted方法中断。

    并发和执行

    串行模式

  • 串行表示所有任务都依次按先后顺序进行。串行意味着必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。
  • 串行是一次只能取得一个任务,并执行这个任务。

    并行模式

  • 并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU。

    并发

    并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,并发的重点在于它是一种现象,并发描述的是多进程同时运行的现象。但实际上,对于单核心 CPU来说,同一时刻只能运行一个线程。所以,这里的”同时运行”表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占CPU的,而是执行一会停一会。

    小结

    并发:同一时刻多个线程在访问同一个资源,多个线程对一个点;

  • 例子:春运抢票 电商秒杀.

并行:多项工作一起执行,之后再汇总。

  • 例子:泡方便面,电水壶烧水,一边撕调料倒入桶中。

    管程(Monitor)

  • Monitor翻译过来是监视器,我们平时所说的就是一种监视器,是一种同步机制,保证同一个时间只能有一个线程能去访问被保护的数据或代码;

  • jvm同步是基于进入或退出,使用管程的对象实现的;

    用户线程and守护线程

    用户线程

    自定义线程,平时用到的线程;
    特定:主线程结束,用户线程还在运行,jvm存活;

    1. Thread aa = new Thread(() -> {
    2. //线程名:Thread.currentThread().getName()
    3. //是否是守护线程:Thread.currentThread().isDaemon()
    4. ...
    5. }, "用户线程名");
    6. aa.start();

    守护线程

    特殊线程,运行在后台,比如垃圾回收;
    特定:主线程结束,没有用户线程,守护线程结束,jvm结束;

    1. Thread aa = new Thread(() -> {
    2. //线程名:Thread.currentThread().getName()
    3. //是否是守护线程:Thread.currentThread().isDaemon()
    4. ...
    5. }, "守护线程名");
    6. aa.setDaemon(true);
    7. aa.start();

    Lock接口

    关键字Synchronized

    作用范围

    synchronizedJava中的关键字,是一种同步锁。它修饰的对象有以下几种:

  • 修饰一个代码块,被修饰的代码块称为同步语句块,

    • 其作用的范围是大括号{括起来的代码,作用的对象是调用这个代码块的对象;。
  • 修饰一个方法,被修饰的方法称为同步方法,

    • 其作用的范围是整个方法,作用的对象是调用这个方法的对象;。

      多线程编程步骤

  • 第一步:创建资源类,在资源类创建属性和操作方法;

  • 第二步:创建多个线程,调用资源类的操作方法;
    1. //第一步:创建资源类,在资源类创建属性和操作方法;
    2. class Ticket{
    3. private int number=30;
    4. public synchronized void sale(){
    5. //加上synchronized关键字就可对方法加锁,保证数据安全性
    6. if(number>0){
    7. System.out.print(Thread.currentThread().getName());
    8. System.out.println(":卖出:1张票,还剩下:"+--number);
    9. }
    10. }
    11. }
    12. public class SaleTicker {
    13. public static void main(String[] args) {
    14. Ticket ticket=new Ticket();
    15. //第二步:创建多个线程,调用资源类的操作方法;
    16. new Thread(new Runnable() {
    17. @Override
    18. public void run() {
    19. for(int i=0;i<30;i++){
    20. ticket.sale();
    21. }
    22. }
    23. },"线程1").start();
    24. new Thread(new Runnable() {
    25. @Override
    26. public void run() {
    27. for(int i=0;i<30;i++){
    28. ticket.sale();
    29. }
    30. }
    31. },"线程2").start();
    32. new Thread(new Runnable() {
    33. @Override
    34. public void run() {
    35. for(int i=0;i<30;i++){
    36. ticket.sale();
    37. }
    38. }
    39. },"线程3").start();
    40. }
    41. }

    什么是lock接口

    介绍

    Lock锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock提供了比synchronized更多的功能。
    Lock与的Synchfonized区别:
    Lock不是Java语言内置的,synchronizedJava语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问。
    Locksynchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而 Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。。

    使用

    可重入锁:可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁;
    1. class Ticket{
    2. private int number=30;
    3. private final ReentrantLock lock=new ReentrantLock();//创建可重入锁
    4. public void sale(){
    5. lock.lock();//上锁
    6. if(number>0){
    7. System.out.print(Thread.currentThread().getName());
    8. System.out.println(":卖出:1张票,还剩下:"+--number);
    9. }
    10. lock.unlock();//解锁
    11. }
    12. }
    Locksynchronized有以下几点不同:
  1. Lock是一个接口,而 synchronized是Java中的关键字,synchronized是内置的语言实现;
  2. synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁。
  3. Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断。
  4. 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  5. Lock可以提高多个线程进行读操作的效率:
    1. 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized

      线程间通信

      jdk官方解释:

      Class Object

      void notify()
      译:唤醒在此对象监视器上等待的单个线程; void notifyAll()
      译:唤醒在此对象监视器上等待的所有线程; void wait( )
      译:导致当前的线程等待,直到其他线程调用此对象的notify()方法或notifyAll()方法; void wait(long timeout)
      译:导致当前的线程等待,直到其他线程调用此对象的notify()方法或notifyAll()方法,或者指定的时间过完。 void wait(long timeout, int nanos)
      译:导致当前的线程等待,直到其他线程调用此对象的notify()方法或notifyAll()方法,或者其他线程打断了当前线程,或者指定的时间过完。

小总结:

  • wait()notify()notifyAll()都不属于Thread类,而是属于Object基础类,也就是每个对象都有wait()notify()notifyAll()功能,因为每个对象都有锁,锁是每个对象的基础,当然操作锁的方法也是最基础了。
  • 当需要调用以上的方法的时候,一定要对竞争资源进行加锁,如果不加锁的话,则会报IllegalMonitorStateException异常;
  • 当想要调用wait()进行线程等待时,必须要取得这个锁对象的控制权(对象监视器),一般是放到synchronized(obj)代码中。
  • 在while循环里而不是if语句下使用wait,这样,会在线程暂停恢复后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知;
  • 调用obj.wait()释放了obj的锁,否则其他线程也无法获得obj的锁,也就无法在synchronized(obj){obj.notify()}代码段内唤醒A。
  • notify()方法只会通知等待队列中的第一个相关线程(不会通知优先级比较高的线程);
  • notifyAll()通知所有等待该竞争资源的线程(也不会按照线程的优先级来执行);
  • 假设有三个线程执行了obj.wait(),那么obj.notifyAll()则能全部唤醒tread1thread2thread3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,tread1thread2thread3只有一个有机会获得锁继续执行,例如tread1,其余的需要等待thread1释放obj锁之后才能继续执行。
  • 当调用obj.notify/notifyAll后,调用线程依旧持有obj锁,因此,tread1thread2thread3虽被唤醒,但是仍无法获得obj锁。直到调用线程退出synchronized块,释放obj锁后,thread1,thread2,thread3中的一个才有机会获得锁继续执行。

    实操

    class Share{
      private int number=0;//初始值
      public synchronized void incr() throws InterruptedException{
          if(number!=0){//如果number不是0,等待
              this.wait();//wait()方法特点:在哪里等待,在哪里被唤醒
          }
          number++;//number值是0,加1
          System.out.println(Thread.currentThread().getName()+"::"+number);
          this.notifyAll();//通知其他所有线程【线程间通信】
      }
      public synchronized void decr() throws InterruptedException{
          if(number!=1){//如果number不是1,等待
              this.wait();
          }
          number--;//number值是0,减1
          System.out.println(Thread.currentThread().getName()+"::"+number);
          this.notifyAll();//通知其他所有线程【线程间通信】
      }
    }
    public class SaleTicker {
      public static void main(String[] args) {
          Share share=new Share();
          new Thread(()->{  //lambda表达式方式实现Runable
              for(int i=0;i<10;i++){
                  try {
                      share.incr();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          },"加一线程").start();
          new Thread(()->{
              for(int i=0;i<10;i++){
                  try {
                      share.decr();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          },"减一线程").start();
      }
    }
    

    虚假唤醒问题

    image.png
    解决方法:while()来替代if()

    class Share{
      private int number=0;//初始值
      public synchronized void incr() throws InterruptedException{
          while(number!=0){//如果number不是0,等待
          //使用while()防止虚假唤醒问题
              this.wait();
          }
          number++;//number值是0,加1
          System.out.println(Thread.currentThread().getName()+"::"+number);
          this.notifyAll();//通知其他所有线程【线程间通信】
      }
      public synchronized void decr() throws InterruptedException{
          while(number!=1){//如果number不是1,等待
              this.wait();
          }
          number--;//number值是0,减1
          System.out.println(Thread.currentThread().getName()+"::"+number);
          this.notifyAll();//通知其他所有线程【线程间通信】
      }
    }
    

    不同方法实现线程间通信总结

    synchronized方法

    如上:

  • 资源类方法名使用synchronized关键字修饰,

  • 等待使用this.wait();
  • 通知其他所有线程使用this.notifyAll();

    Lock方法

  • Lock替换synchronized方法和语句的使用,Condition取代了对象监视器方法的使用。

    class Share1{
      private int number=0;
      private Lock lock=new ReentrantLock();//创建一个可重入锁
      private Condition condition=lock.newCondition();
      //new一个Condition对象,使用这个对象来等待和通知其他线程
      public void incr() throws InterruptedException {
          lock.lock();//上锁
          try {
              while (number!=0){//使用while()防止虚假唤醒问题
                  condition.await();//等待
              }
              number++;
              System.out.println(Thread.currentThread().getName()+"::"+number);
              condition.signalAll();//通知其他all线程
          }finally {
              lock.unlock();//释放锁
          }
      }
    }
    

    线程间定制化通信

    就是使线程按照约定的顺序进行调用;

  • 没有什么新技术,就是设置一个标志位,例如private int flag=1;然后使用while()对这个标志位进行判断是wait还是进行操作,并且进行操作后对这个标志位进行赋值,然后另一个也使用while()进行判断。

  • 总结来说就是使用while()来判断标志位,之后根据判断结果wait或进行操作,抢占顺序之类的没有影响,因为有这个while()判断;

    集合的线程安全

    线程不安全演示

    ArrayList

    List<String> list=new ArrayList<>();
    for(int i=0;i<10;i++){
      new Thread(()->{
          list.add(UUID.randomUUID().toString().substring(0,8));
          System.out.println(list);
      },String.valueOf(i)).start();
    }
    
    报错:

    Exception in thread “8” java.util.ConcurrentModificationException

    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
    at java.util.ArrayList$Itr.next(ArrayList.java:861)
    at java.util.AbstractCollection.toString(AbstractCollection.java:461)
    at java.lang.String.valueOf(String.java:2994)
    at java.io.PrintStream.println(PrintStream.java:821)
    

问题就出在System.out.println(list);这,主要由于ArrayListadd()没加synchronized关键字:

public boolean add(E e) {
    ensureCapacityInternal(size + 1);  // Increments modCount!!
    elementData[size++] = e;
    return true;
}

也就是线程不安全的,由于在高并发的情况下使得输出的时候可能其他线程正在add()中,使两者产生冲突报错;

HashSet

Set<String> set= new HashSet<>();
for(int i=0;i<1000;i++){
    new Thread(()->{
        set.add(UUID.randomUUID().toString().substring(0,8));
        System.out.println(set);
    },String.valueOf(i)).start();
}

并发现修改问题,报错如下:

Exception in thread “998” Exception in thread “186” java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469) at java.util.HashMap$KeyIterator.next(HashMap.java:1493) at java.util.AbstractCollection.toString(AbstractCollection.java:461) at java.lang.String.valueOf(String.java:2994) at java.io.PrintStream.println(PrintStream.java:821)

HashMap

同样如上,这里就不演示了;

解决方案

ArrayList

Vector方案

使用Vector来替代ArrayList:同ArrayList一样,Vector也是List接口的实现类,并且观察Vector的源码发现,他的大多数实现方法都是加了synchronized关键字的;

public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}

但这种方案效率低,不推荐

collections方案

可以通过collections类里面的一个静态方法,来修饰ArrayList,使其线程安全;主要操作如下:

List<String> list= Collections.synchronizedList(new ArrayList<>());

同样collections类里面还有其他类似的静态方法
image.png
但这种方案依然比较古老,不推荐;

CopyOnWriteArrayList方案

JUCjava.util.concurrent包中的CopyOnWriteArrayList类来替代ArrayList

List<String> list= new CopyOnWriteArrayList<>();

称为写时复制技术,原理如下:
image.png

HashSet

同样使用JUCjava.util.concurrent包中的类来实现,这里使用CopyOnWriteArraySet类来替代HashSet

Set<String> set= new CopyOnWriteArraySet<>();

HashMap

同样使用JUCjava.util.concurrent包中的类来实现,这里使用ConcurrentHashMap类来替代HashMap

Map<String,String> map=new ConcurrentHashMap<>();

多线程锁

synchronized锁的范围分析

当一个资源类里的非静态方法使用synchronized修饰时:

  • 就好比我们把这个资源类比作是厕所,方法比作是单间坑位,synchronized锁其中一个厕所里所有的有锁坑位,并不影响去另一个厕所的人,只影响去这个厕所里有锁的坑位的人;

当一个资源类里的静态方法static使用synchronized修饰时:

  • 这时锁的就不是当前对象(厕所)的此方法(坑位)this,而是当前资源类的class字节码对象;

总结:
synchronized实现同步的基础:Java中的每一个对象都可以作为锁,具体表现为以下3种形式。

  • 对于普通同步方法,锁是当前实例对象。
  • 对于静态同步方法,锁是当前类的class对象。
  • 对于同步方法块,锁是Synchonized括号里配置的对象

    公平锁和非公平锁

    公平锁:private Lock lock=new ReentrantLock(true);

  • 所有线程都会干到活;

  • 执行效率相对较低;

非公平锁:private Lock lock=new ReentrantLock(false);private Lock lock=new ReentrantLock();

  • 是抢占式的,抢的多抢的少全靠自己,可能出现一个线程把活全干了,其他线程饿死的情况;
  • 优点是执行效率高;

    public ReentrantLock() {
      sync = new NonfairSync();
    }
    //NonfairSync():非公平锁
    public ReentrantLock(boolean fair) {
      sync = fair ? new FairSync() : new NonfairSync();
    }
    //FairSync():公平锁
    

    可重入锁

  • 可重入锁ReentrantLock可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁;

  • synchronized(隐式)和lock(显式)都是可重入锁;

    死锁

    概念:两个或以上的线程由于争夺对方锁资源,会不退让,同时等待对方释放锁的现象,如果没有外力干涉,程序无法再执行下去;
    产生原因
  1. 系统资源不足;
  2. 进程运行推进顺序不合理;
  3. 资源分配不当;

    死锁演示

    public class DeadLock {
     public static String a=new String();
     public static String b=new String();
     public static void main(String[] args) {
         Thread t1= new Thread(() -> {
             synchronized (a) {
                 System.out.println(Thread.currentThread().getName() + "已获取a的锁,正在争取b的锁");
                 synchronized (b) {
                     System.out.println(Thread.currentThread().getName() + "已获取ab的锁");
                 }
             }
         });
         Thread t2= new Thread(() -> {
             synchronized (b) {
                 System.out.println(Thread.currentThread().getName() + "已获取b的锁,正在争取a的锁");
                 synchronized (a) {
                     System.out.println(Thread.currentThread().getName() + "已获取ba的锁");
                 }
             }
         });
         t1.start();
         t2.start();
     }
    }
    
    死锁验证,首先控制台输入命令jps -l(类似于linux中的**ps -ef**)查看所有进程,选择一个你认为有死锁的进程,记住进程号,之后控制台输入jstack 进程号,如果出现下面情况说明有死锁:
    image.png

    Callable接口

    image.png
    目前我们学习了有两种创建线程的方法-一种是通过创建Thread类,另一种是通过使用Runnable创建线程。但是,Runnable缺少的一项功能是:当线程终止时(即run()完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable接口。
    Callable 接口的特点如下(重点):
  • 为了实现Runnable,需要实现不返回任何内容的run()方法,而对于Callable,需要实现在完成时返回结果的call()方法。
  • call()方法可以引发异常,而run()则不能。
  • 为实现Callable而必须重写call()方法。

    FutureTask

    image.png

    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;
    class MyCallableThread implements Callable {
      @Override
      public Object call() throws Exception {
          return 250;
      }
    }
    public class CallableTest {
      public static void main(String[] args) {
          FutureTask<Integer> futureTask1=new FutureTask<>(new MyCallableThread());
          //传统方法
          FutureTask<Integer> futureTask2=new FutureTask<>(()->{
              return 666;
          });//使用lambda表达式方法
      }
    }
    

    FutureTask创建线程

    使用FutureTask(未来任务)有一个特点:只有第一次才进行计算,之后调用get()方法得到返回值都是直接返回第一次计算出的结果;

    FutureTask<Integer> futureTask2=new FutureTask<>(()->{
      System.out.println(Thread.currentThread().getName()+"来了");
      return 666;
    });//使用lambda表达式方法
    new Thread(futureTask2,"线程2").start();
    System.out.println("返回结果:"+futureTask2.get());
    

    JUC-辅助类

    减少计数CountDownLatch

    CountDownLatch类可以设置一个计数器,然后

  • 通过countDown方法来进行减1的操作,

  • 使用await方法等待计数器不大于0,然后继续执行await方法之后的语句。

CountDownLatch主要有两个方法,

  • 当一个或多个线程调用await方法时,这些线程会阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)。
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
    import java.util.concurrent.CountDownLatch;
    public class CountDownLatchTest {
      public static void main(String[] args) {
          /*使的最后才锁门*/
          CountDownLatch countDownLatch=new CountDownLatch(6);
          //创建一个CountDownLatch对象,初始值设置为6
          for(int i=0;i<6;i++){
              new Thread(()->{
                  System.out.println(Thread.currentThread().getName()+"号离开教室");
                  countDownLatch.countDown();//计数器-1
              },String.valueOf(i)).start();
          }
          try {
              countDownLatch.await();//等待,直到计数器值为0才执行下面代码
              System.out.println("班长锁门了");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    }
    
    输出(同学的离开顺序不确定,但是最后锁门是已经确定的)

    0号离开教室 2号离开教室 3号离开教室 1号离开教室 4号离开教室 5号离开教室 班长锁门了

循环栅栏CyclicBarrier

允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
CyclicBarrier看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier的构造方法第一个参数是目标障碍数,每次执行CyclicBarrierawait()方法一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()之后的语句和初始化后设定的达到障碍数之后做的事情。可以将CyclicBarrierawait()方法理解为加1操作。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
    private static int NUMVER=7;
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier=new CyclicBarrier(NUMVER,
                  ()->{System.out.println("\n集齐了7颗龙珠了");});
        //第1个参数是目标障碍数;第2个参数是达到障碍数之后做的事情(Runnable对象)
        for (int i=1;i<NUMVER+1;i++){
            new Thread(()->{
                try {
                    System.out.print(" 龙珠"+Thread.currentThread().getName());
                    cyclicBarrier.await();
                    System.out.print ("许愿望 ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

输出:

龙珠1 龙珠3 龙珠4 龙珠2 龙珠5 龙珠6 龙珠7 集齐了7颗龙珠了 许愿望 许愿望 许愿望 许愿望 许愿望 许愿望 许愿望

信号塔Semaphore

一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()(从该信号量获取许可证)都会阻塞,直到许可证可用,然后才能使用它。每个release()(释放许可证,将其返回到信号量)添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象;Semaphore只保留可用数量的计数,并相应地执行。信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源

import java.util.concurrent.Semaphore;
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3);
        for(int i=1;i<=6;i++){
            new Thread(()->{
                System.out.println("线程"+Thread.currentThread().getName()+"等待中***");
                try {
                    semaphore.acquire();//抢占
                    System.out.println("线程"+Thread.currentThread().getName()+"抢到<----");
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("线程"+Thread.currentThread().getName()+"将要释放---->");
                    semaphore.release();//释放
                }
            },String.valueOf(i)).start();
        }
    }
}

结果:

线程1等待中 线程2等待中 线程2抢到武器<—— 线程3等待中 线程3抢到武器<—— 线程5等待中 线程1抢到武器<—— 线程4等待中 线程6等待中 线程3将要释放——> 线程2将要释放——> 线程1将要释放——> 线程4抢到武器<—— 线程5抢到武器<—— 线程6抢到武器<—— 线程6将要释放——> 线程5将要释放——> 线程4将要释放——>

JUC-读写锁ReentrantReadWriteLock

  • 读锁:共享锁;
  • 写锁:独占锁;
  • 读锁和写锁之间会发生死锁,写锁和写锁之间会发生死锁,读锁和读锁之间不会发生死锁;
  • 对于不同线程之间读读共享读写、写写互斥;
  • 但是对于同一个线程来说获取读锁时也可以获取其写锁(必须获取写锁再获取读锁才行);

    使用

    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    //首先声明一个ReentrantReadWriteLock对象 readWriteLock.writeLock().lock();//添加写锁并上锁 readWriteLock.writeLock().unlock();//释放写锁 readWriteLock.readLock().lock();//添加读锁并上锁 readWriteLock.readLock().unlock();//释放读锁

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
    private volatile Map<String,Object> map=new HashMap<>();
    /*
    volatile是Java提供的一种轻量级的同步机制。Java语言包含两种内在的同步机制:
    同步块(或方法)和 volatile变量,相比于synchronized(synchronized通常称为重量级锁),
    volatile更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile变量的同步性较差
    (有时它更简单并且开销更低),而且其使用也更容易出错。
     */
    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    //首先声明一个ReentrantReadWriteLock对象
    //放数据
    public void put(String key,Object value){
        readWriteLock.writeLock().lock();//添加写锁并上锁
        System.out.println(Thread.currentThread().getName()+"正在对《"+key+"》进行写操作");
        try {
            Thread.sleep(300);
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入数据《"+key+"》成功");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            readWriteLock.writeLock().unlock();//释放写锁
        }
    }
    //取数据
    public Object get(String key){
        readWriteLock.readLock().lock();//添加读锁并上锁
        System.out.println(Thread.currentThread().getName()+"正在对《"+key+"》进行读操作");
        Object r=new Object();
        try {
            Thread.sleep(300);
            r= map.get(key);
            System.out.println("数据《"+r+"》已读到");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            readWriteLock.readLock().unlock();//释放读锁
        }
        return r;
    }
}
public class readWrite {
    public static void main(String[] args) {
        MyCache myCache=new MyCache();
        for (int i=0;i<5;i++){
            final int num=i;
            new Thread(()->{
                myCache.put(num+"",num+"");
            },String.valueOf(i)).start();
        }
        for (int i=0;i<5;i++){
            final int num=i;
            new Thread(()->{
                myCache.get(num+"");
            },String.valueOf(i)).start();
        }
    }
}

读写锁的演变

image.png
**Synchronized****ReentrantLock**区别:

  • 这两种同步方式有很多相似之处,它们都是加锁方式同步,而且都是阻塞式的同步,也就是说当如果一个线程获得了对象锁,进入了同步块,其他访问该同步块的线程都必须阻塞在同步块外面等待,而进行线程阻塞和唤醒的代价是比较高的;
  • 这两种方式最大区别就是对于Synchronized来说,它是java语言的关键字,是原生语法层面的互斥,需要jvm实现。而ReentrantLock它是JDK 1.5之后提供的API层面的互斥锁,需要lock()unlock()方法配合try/finally语句块来完成
  • 便利性:很明显Synchronized的使用比较方便简洁,并且由编译器去保证锁的加锁和释放,而ReentrantLock需要手工声明来加锁和释放锁,为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。
  • 锁的细粒度和灵活度:很明显ReentrantLock优于Synchronized

    读写锁的降级

    image.png
    读锁不能升级为写锁

    ReentrantReadWriteLock rrwl=new ReentrantReadWriteLock();
    ReentrantReadWriteLock.WriteLock writeLock = rrwl.writeLock();
    ReentrantReadWriteLock.ReadLock readLock = rrwl.readLock();
    new Thread(()->{
      writeLock.lock();//1、获取写锁
      .....
      readLock.lock();//2、获取读锁
      .....
      writeLock.unlock();//3、释放写锁(此步就是锁降级操作)
      readLock.unlock();//4、释放读锁
    },"1").start();
    

    阻塞队列BlockingQueue

    介绍

    image.png
    image.png
    好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

    常见的BlockingQueue

    ArrayBlockingQueue(常用)

  • 基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

  • ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
  • ArrayBlockingQueueLinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
  • 一句话总结:由数组结构组成的有界阻塞队列。

    LinkedBlockingQueue(常用)

  • 基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

  • ArrayBlockingQueueLinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
  • 一句话总结∶由链表结构组成的有界(大小默认值为Integer.MaxValue)阻塞队列。

    DelayQueue

  • DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

  • 一句话总结∶使用优先级队列实现的延迟无界阻塞队列。

    PriorityBlockingQueue

  • 基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。

  • 因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
  • 一句话总结∶支持优先级排序的无界阻塞队列。

    SynchronousQueue

  • 一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。

  • 一句话总结:不存储元素的阻塞队列,也即是单个元素的队列;
  • 声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。

    公平模式和非公平模式的区别:

  • 公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;。

  • 非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

    LinkedTransferQueue

  • LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfertransfer方法。

  • LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。
  • 一句话总结:由链表组成的无界阻塞队列。

    LinkedBlockingDeque

  • LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。

  • 对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况:
    • 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回false表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException异常。
    • 读取元素时:如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数。
  • 一句话总结:由链表组成的双向阻塞队列;

    BlockingQueue核心方法

    image.png

    使用

    BlockingQueue blockingQueue=new ArrayBlockingQueue(3);
    blockingQueue.xxx
    

    ThreadPool线程池

    线程池简介

  • 线程池(英语∶thread pool):一种线程使用模式。线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

  • 线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
  • 它的主要特点为:

    • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
    • 提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行。
    • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
    • Java中的线程池是通过Executor框架实现的,该框架中用到了ExecutorExecutorsExecutorServiceThreadPoolExecutor这几个类;
      • image.png

        线程池使用方式

        Executors.newFixedThreadPool(int)

        (1池n线程)
  • 作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

  • 特征:线程池中的线程处于一定的量,可以很好的控制线程的并发量。线程可以重复被使用,在显示关闭之前,都将一直存在。超出一定量的线程被提交时候需在队列中等待;
    //newFixedThreadPool1池n线程
    ExecutorService threadPool1 = Executors.newFixedThreadPool(5);//这里只有5个线程
    try {
      for(int i=0;i<10;i++){
          threadPool1.execute(()->{
              System.out.println("线程池名字:"+Thread.currentThread().getName());
          });
      }
    }finally {
      threadPool1.shutdown();//连接放回线程池
    }
    
    输出:

    线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-2 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-1

Executors.newSingleThreadExecutor()

(一个任务一个任务执行,一池一线程)

//一池一线程
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();//1个线程
try {
    for(int i=0;i<10;i++){
        threadExecutor.execute(()->{
            System.out.println("线程池名字:"+Thread.currentThread().getName());
        });
    }
}finally {
    threadExecutor.shutdown();//连接放回线程池
}

输出:

线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1

Executors.newCachedThreadPool()

(线程池根据需求创建线程,可扩容,遇强则强)

//可扩容线程
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
    for(int i=0;i<10;i++){
        threadPool.execute(()->{
            System.out.println("线程池名字:"+Thread.currentThread().getName());
        });
    }
}finally {
    threadPool.shutdown();//连接放回线程池
}

输出:

线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-6 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-7 线程池名字:pool-1-thread-8 线程池名字:pool-1-thread-2 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-10 线程池名字:pool-1-thread-9

源码

//newFixedThreadPool,1池n线程
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//newSingleThreadExecutor,//一池一线程
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
//newCachedThreadPool,可扩容线程
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

观察源码发现,三者的源码都是ThreadPoolExecutor

ThreadPoolExecutor七个参数介绍

   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {...
  • **corePoolSize**:核心线程数
    • 核心线程会一直存活,即使没有任务需要执行;
    • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理;
    • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭;
  • **maximumPoolSize**:最大线程数;
  • **keepAliveTime**:非核心线程存活时间;
  • **unit**:存活时间单位;
  • **workQueue**:阻塞队列,当核心线程都用完后,请求就会放到阻塞队列中进行等待;
  • **threadFactory**:线程工厂,用于创建线程;
  • **handler**:超过阻塞队列时新来的线程请求拒绝策略;

    ThreadPoolExecutor工作流程

    image.png
    image.png

    自定义线程池

    image.png

    //自定义线程池 
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
      2, 5, 2L,
      TimeUnit.SECONDS,
      new ArrayBlockingQueue<>(3),
      Executors.defaultThreadFactory(),
      new ThreadPoolExecutor.AbortPolicy()
    );
    

    Fork/Join分支合并框架

    简介

    Fork/Join它可以将一个大的任务拆分成多个子任务(子任务分配线程来操作,实现多线程)进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:

  • Fork:把一个复杂任务进行分拆,大事化小。

  • Join:把分拆任务的结果进行合并。

    步骤

  • 任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。

  • 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

image.png
image.png

使用示例

实现一个从1加到100的操作,一次只能两个加数相加,并且两个加数的差必须小于10

package juc.lock;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class MyTask extends RecursiveTask<Integer>{
    //拆分时两个加数差值不能超过10
    private static final Integer VALUE=10;
    private int begin;
    private int end;
    private int result;
    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        System.out.println("线程名子:"+Thread.currentThread().getName());
        if((end-begin)<VALUE){
            for(int i=begin;i<=end;i++){
                result+=i;
            }
        }else {
            int mid=(begin+end)/2;//求出中间值
            MyTask myTask1=new MyTask(begin,mid);//左边
            MyTask myTask2=new MyTask(mid+1,end);//右边
            myTask1.fork();//调用方法拆分
            myTask2.fork();
            result=myTask1.join()+myTask2.join();//合并
        }
        return result;
    }
}
public class Fork_join_Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask=new MyTask(1,100);
        //创建我们自定义的RecursiveTask对象
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
        //创建分支合并对象
        System.out.println("最终结果:"+forkJoinTask.get());
        forkJoinPool.shutdown();//关闭池对象
    }
}

输出:

线程名子:ForkJoinPool-1-worker-12 …… 最终结果:5050

CompletableFuture异步回调

image.png

//异步调用,无返回值
CompletableFuture<Void> completableFuture1=CompletableFuture.runAsync(()->{
    System.out.println("无返回值线程名称:"+Thread.currentThread().getName());
});
completableFuture1.get();
//异步调用,有返回值
CompletableFuture<Integer> completableFuture2=CompletableFuture.supplyAsync(()->{
    System.out.println("有返回值线程名称:"+Thread.currentThread().getName());
    return 110;
});
completableFuture2.whenComplete((t,u)->{
    System.out.println("-----t:"+t);//返回值
    System.out.println("-----u:"+u);//异常信息
}).get();