在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中的 Collection 实现等;
1、Volatile关键字
轻量级的同步机制
一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:
- 保证了**可见性**,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
- 禁止进行指令重排序。
无法保证原子性
并发:并发是指一个处理器同时处理多个任务
并行:并行是指多个处理器或者是多核的处理器同时处理多个不同的任务。
3、进程与线程
进程是一个程序,跟操作系统有关,是系统进行资源分配的最小单位。
- 线程依赖于进程,共享进程获取到的资源,跟语言有关系。
两者区别
- 简而言之,一个程序至少有一个进程,一个进程至少有一个线程.
- 线程的划分尺度小于进程,使得多线程程序的并发性高。
- 另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
- 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别
4、Synchronized与Lock
4.1、Synchronized(不见不散)
关键字synchronized可以保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时synchronized可以保证一个线程的变化可见(可见性),即可以代替volatile。
synchronized可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,同时它还可以保证共享变量的内存可见性
确保线程互斥的访问同步代码
默认也是非公平锁
Synchronized源码是有两个解锁的,为了保证程序发生异常也能解锁。
synchronized的三种应用方式普通同步方法(实例方法),锁是当前实例对象 ,进入同步代码前要获得当前实例的锁
- 静态同步方法,锁是当前类的class对象 ,进入同步代码前要获得当前类对象的锁
同步方法块,锁是括号里面的对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。
4.1.1、Synchronized怎么实现可重入
每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。
当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。
在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。
当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。4.2、Lock(过时不候)
在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类。通常都是用可重入锁(递归锁)ReentrantLock。默认情况下为非公平锁。
4.3、区别
来源:
lock是一个接口,而synchronized是java的一个关键字,synchronized是内置的语言实现;- 异常是否释放锁:
synchronized在发生异常时候会自动释放占有的锁,因此不会出现死锁;而lock发生异常时候,不会主动释放占有的锁,必须手动unlock来释放锁,可能引起死锁的发生。(所以最好将同步代码块用try catch包起来,finally中写入unlock,避免死锁的发生。) - 是否响应中断
lock等待锁过程中可以用interrupt来中断等待,而synchronized只能等待锁的释放,不能响应中断; - 是否知道获取锁
Lock可以通过trylock来知道有没有获取锁,而synchronized不能; - Lock可以提高多个线程进行读操作的效率。(可以通过readwritelock实现读写分离)
- 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
- synchronized使用Object对象本身的wait 、notify、notifyAll调度机制,而Lock可以使用Condition进行线程之间的调度,
详情见https://blog.csdn.net/hefenglian/article/details/82383569
5、创建线程的方式
run()为线程类的核心方法,相当于主线程的main方法,是每个线程的入口。一个线程调用两次start()方法将会抛出线程状态异常,也就是的start()只可以被调用一次 。run()方法是由jvm创建完本地操作系统级线程后回调的方法,不可以手动调用(否则就是普通方法)
5.1、继承Thread类创建线程
-
5.2、实现Runnable接口创建线程(推荐)
-
5.3、使用Callable和Future创建线程
创建Callable接口的实现类,并实现call()方法,该call()方法作为线程的执行体,并且有返回值
- 创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法作为返回值
- 使用FutureTask对象作业Thread对象的target创建并启动新线程
调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
5.4、使用线程池
线程池一共有四种:
newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
- newScheduledThreadPool:适用于执行延时或者周期性任务。
线程池的七大参数:
public ThreadPoolExecutor(int corePoolSize,//核心线程大小
int maximumPoolSize,//线程池最大线程数量
long keepAliveTime,//空闲线程存活时间
TimeUnit unit,//空间线程存活时间单位
BlockingQueue<Runnable> workQueue,//工作队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略
)
5.4.1、corePoolSize 核心线程大小
线程池中最小的线程数量,即使处理空闲状态,也不会被销毁,除非设置了allowCoreThreadTimeOut。
- CPU密集型:核心线程数 = CPU核数 + 1
- IO密集型:核心线程数 = CPU核数 * 2+1
注:IO密集型(某大厂实践经验),核心线程数 = CPU核数 / (1-阻塞系数),例如阻塞系数 0.8,CPU核数为4,则核心线程数为20
5.4.2、maximumPoolSize 线程池最大线程数量
一个任务被提交后,首先会被缓存到工作队列中,等工作队列满了,则会创建一个新线程,处理从工作队列中的取出一个任务。
5.4.3、keepAliveTime 空闲线程存活时间
当线程数量大于corePoolSize时,一个处于空闲状态的线程,在指定的时间后会被销毁。
5.4.4、unit 空间线程存活时间单位
5.4.5、workQueue工作队列
jdk中提供了四种工作队列,新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。
- ArrayBlockingQueue基于数组的有界阻塞队列,按FIFO排序。
- LinkedBlockingQuene基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。
- ③SynchronousQuene一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
PriorityBlockingQueue具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
5.4.6、threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
5.4.7、handler 拒绝策略
当工作队列中的任务已满并且线程池中的线程数量也达到最大,这时如果有新任务提交进来,拒绝策略就是解决这个问题的,jdk中提供了4中拒绝策略:
CallerRunsPolicy该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
- AbortPolicy该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
- DiscardPolicy该策略下,直接丢弃任务,什么都不做。
- DiscardOldestPolicy该策略下,抛弃最早进入队列的那个任务,然后尝试把这次拒绝的任务放入队列
线程池的好处。
- 线程使应用能够更加充分合理的协调利用cpu 、内存、网络、i/o等系统资源。
- 线程的创建需要开辟虚拟机栈,本地方法栈、程序计数器等线程私有的内存空间。
- 在线程的销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险。
- 另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些丢失线程自身无法解决的。所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
线程池的作用包括:
- 利用线程池管理并复用线程、控制最大并发数等。
- 实现任务线程队列缓存策略和拒绝机制。
- 实现某些与时间相关的功能,如定时执行、周期执行等。
隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免个服务线程互相影响。
5.4.8、线程池的原理
在创建了线程池后,等待提交过来的任务请求
- 当调用execute()方法添加一个请求任务时,线程池会做出如下判断
- 如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
- 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程like运行这个任务;
- 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
AbortPolicy:默认,直接抛出RejectedExcutionException异常,阻止系统正常运行
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
- CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
5.4.10、手写线程池
为什么不用Executors中JDK提供的?
根据阿里巴巴手册:并发控制这章线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
- 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
- 线程池不允许使用Executors去创建,而是通过ThreadToolExecutors的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
- Executors返回的线程池对象弊端如下:
- FixedThreadPool和SingleThreadPool:允许的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
- CacheThreadPool和ScheduledThreadPool:允许创建的的线程数量为:Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM
- Executors返回的线程池对象弊端如下:
线程池的合理参数
这个是根据具体业务来配置的,分为CPU密集型和IO密集型
- CPU密集型
- CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行
- CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)
- 而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些
- CPU密集型任务配置尽可能少的线程数量:
- 一般公式:CPU核数 + 1个线程数
IO密集型
新建(NEW):新创建了一个线程对象。
- 可运行(RUNNABLE):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 。
- 运行(RUNNING):可运行状态(runnable)的线程获得了cpu 时间片(timeslice) ,执行程序代码。
- 阻塞(BLOCKED):阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice 转到运行(running)状态。阻塞的情况分三种:
- 等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。
- 同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。
- 其他阻塞:运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。
死亡(DEAD):线程run()、main() 方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。
7、死锁
7.1、什么是死锁
所谓死锁,是指多个进程在运行过程中因争夺资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进。 因此我们举个例子来描述,如果此时有一个线程A,按照先锁a再获得锁b的的顺序获得锁,而在此同时又有另外一个线程B,按照先锁b再锁a的顺序获得锁。如下图所示:
7.2、死锁产生的原因
竞争资源,系统中的资源可以分为两类:
- 可剥夺资源,是指某进程在获得这类资源后,该资源可以再被其他进程或系统剥夺,CPU和主存均属于可剥夺性资源;
- 另一类资源是不可剥夺资源,当系统把这类资源分配给某进程后,再不能强行收回,只能在进程用完后自行释放,如磁带机、打印机等。
- 进程间推进顺序非法
若P1保持了资源R1,P2保持了资源R2,系统处于不安全状态,因为这两个进程再向前推进,便可能发生死锁
例如,当P1运行到P1:Request(R2)时,将因R2已被P2占用而阻塞;当P2运行到P2:Request(R1)时,也将因R1已被P1占用而阻塞,于是发生进程死锁
7.3、写一个死锁程序
package Future.Tech.technologyTest.lock;
import java.util.concurrent.TimeUnit;
/**
* @author WJ
* @ClassName DeadLock
* @Description TODO
* @data 2021/7/31 18:12
* @Version 1.0
*/
class MyDeadLock implements Runnable{
String lockA;
String lockB;
public MyDeadLock(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
public String getLockA() {
return lockA;
}
public void setLockA(String lockA) {
this.lockA = lockA;
}
public String getLockB() {
return lockB;
}
public void setLockB(String lockB) {
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println("线程:"+Thread.currentThread().getName()+"正在持有线程,尝试获取B");
try{
TimeUnit.SECONDS.sleep(3);}catch(InterruptedException e){e.printStackTrace();}
synchronized (lockB){
System.out.println("线程:"+Thread.currentThread().getName()+"正在持有线程,尝试获取A");
}
}
}
}
public class DeadLock {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyDeadLock(lockA,lockB),"A").start();
new Thread(new MyDeadLock(lockB,lockA),"B").start();
}
}
public class DeadLockDemo {
public static void main(String[] args) {
Object lockA = new Object();
Object lockB = new Object();
new Thread(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (lockA) {
System.out.println(name + " got lockA, want LockB");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(name + " got lockB");
System.out.println(name + ": say Hello!");
}
}
}
}, "线程-A").start();
new Thread(new Runnable() {
@Override
public void run() {
String name = Thread.currentThread().getName();
synchronized (lockB) {
System.out.println(name + " got lockB, want LockA");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockA) {
System.out.println(name + " got lockA");
System.out.println(name + ": say Hello!");
}
}
}
}, "线程-B").start();
}
}
7.4、怎么解决死锁问题
7.4.1、预防死锁
- 资源一次性分配:一次性分配所有资源,这样就不会再有请求了:(破坏请求条件)
- 只要有一个资源得不到分配,也不给这个进程分配其他的资源:(破坏请保持条件)
- 可剥夺资源:即当某进程获得了部分资源,但得不到其它资源,则释放已占有的资源(破坏不可剥夺条件)
资源有序分配法:系统给每类资源赋予一个编号,每一个进程按编号递增的顺序请求资源,释放则相反(破坏环路等待条件)
7.4.1.1、以确定的顺序获得锁
如果必须获取多个锁,那么在设计的时候需要充分考虑不同线程之前获得锁的顺序。按照上面的例子,两个线程获得锁的时序图如下:
如果此时把获得锁的时序改成:
那么死锁就永远不会发生。 针对两个特定的锁,开发者可以尝试按照锁对象的hashCode值大小的顺序,分别获得两个锁,这样锁总是会以特定的顺序获得锁,那么死锁也不会发生。问题变得更加复杂一些,如果此时有多个线程,都在竞争不同的锁,简单按照锁对象的hashCode进行排序(单纯按照hashCode顺序排序会出现“环路等待”),可能就无法满足要求了,这个时候开发者可以使用银行家算法,所有的锁都按照特定的顺序获取,同样可以防止死锁的发生,该算法在这里就不再赘述了,有兴趣的可以自行了解一下。7.4.1.2、超时放弃
当使用synchronized关键词提供的内置锁时,只要线程没有获得锁,那么就会永远等待下去,然而Lock接口提供了boolean tryLock(long time, TimeUnit unit) throws InterruptedException方法,该方法可以按照固定时长等待锁,因此线程可以在获取锁超时以后,主动释放之前已经获得的所有的锁。通过这种方式,也可以很有效地避免死锁。 还是按照之前的例子,时序图如下:
7.4.2、避免死锁
预防死锁的几种策略,会严重地损害系统性能。因此在避免死锁时,要施加较弱的限制,从而获得较满意的系统性能。由于在避免死锁的策略中,允许进程动态地申请资源。因而,系统在进行资源分配之前预先计算资源分配的安全性。若此次分配不会导致系统进入不安全的状态,则将资源分配给进程;否则,进程等待。其中最具有代表性的避免死锁算法是银行家算法。
银行家算法:首先需要定义状态和安全状态的概念。系统的状态是当前给进程分配的资源情况。因此,状态包含两个向量Resource(系统中每种资源的总量)和Available(未分配给进程的每种资源的总量)及两个矩阵Claim(表示进程对资源的需求)和Allocation(表示当前分配给进程的资源)。安全状态是指至少有一个资源分配序列不会导致死锁。当进程请求一组资源时,假设同意该请求,从而改变了系统的状态,然后确定其结果是否还处于安全状态。如果是,同意这个请求;如果不是,阻塞该进程知道同意该请求后系统状态仍然是安全的。7.4.3、检测死锁
jps -l jstack两个命令
首先为每个进程和每个资源指定一个唯一的号码;
-
7.4.4、解除死锁
当发现有进程死锁后,便应立即把它从死锁状态中解脱出来,常采用的方法有:
剥夺资源:从其它进程剥夺足够数量的资源给死锁进程,以解除死锁状态;
撤消进程:可以直接撤消死锁进程或撤消代价最小的进程,直至有足够的资源可用,死锁状态.消除为止;所谓代价是指优先级、运行代价、进程的重要性和价值等。
7.4.5、死锁检测
Jstack命令
jstack是java虚拟机自带的一种堆栈跟踪工具。jstack用于打印出给定的java进程ID或core file或远程调试服务的Java堆栈信息。 Jstack工具可以用于生成java虚拟机当前时刻的线程快照。线程快照是当前java虚拟机内每一条线程正在执行的方法堆栈的集合,生成线程快照的主要目的是定位线程出现长时间停顿的原因,如线程间死锁、死循环、请求外部资源导致的长时间等待等。 线程出现停顿的时候通过jstack来查看各个线程的调用堆栈,就可以知道没有响应的线程到底在后台做什么事情,或者等待什么资源。
- JConsole工具
Jconsole是JDK自带的监控工具,在JDK/bin目录下可以找到。它用于连接正在运行的本地或者远程的JVM,对运行在Java应用程序的资源消耗和性能进行监控,并画出大量的图表,提供强大的可视化界面。而且本身占用的服务器内存很小,甚至可以说几乎不消耗
7.5、产生死锁的必要条件
- 互斥条件:进程要求对所分配的资源进行排它性控制,即在一段时间内某资源仅为一进程所占用。
- 请求和保持条件:当进程因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:进程已获得的资源在未使用完之前,不能剥夺,只能在使用完时由自己释放。
环路等待条件:在发生死锁时,必然存在一个进程—资源的环形链。
8、CAS
CAS(compare and swap)的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该 位置的值。8.1、CAS三大问题
ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。关于ABA问题参考文档: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html
- 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
8.2、CAS原理
unsafe类,unsafe类相当于编译原语,默认就是原子性的
- 自旋锁
9、公平锁和非公平锁
- 如果一个线程组里,能保证每个线程都能拿到锁,那么这个锁就是公平锁,按照顺序,先来后到。公平锁维护了一个FIFO的队列,先进先出
- 相反,如果保证不了每个线程都能拿到锁,也就是存在有线程饿死,允许加塞,那么这个锁就是非公平锁。
非公平锁性能高于公平锁性能。首先,在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。而且,非公平锁能更充分的利用cpu的时间片,尽量的减少cpu空闲的状态时间。
10、乐观锁和悲观锁
- Synchronized属于悲观锁,悲观地认为程序中的并发情况严重,所以严防死守。
- CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去尝试更新。
11、可重入锁
广义上的可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。ReentrantLock和synchronized都是可重入锁,11.1、证明Synchronized
```java /**- 可重入锁(也叫递归锁)
- 指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取到该锁的代码,在同一线程在外层方法获取锁的时候,在进入内层方法会自动获取锁
- 也就是说:
线程可以进入任何一个它已经拥有的锁所同步的代码块
- @author: fengyi0827
- @create: 2021-07-15-12:12 */
/**
资源类 */ class Phone {
/**
- 发送短信
@throws Exception */ public synchronized void sendSMS() throws Exception{ System.out.println(Thread.currentThread().getName() + “\t invoked sendSMS()”);
// 在同步方法中,调用另外一个同步方法 sendEmail(); }
/**
- 发邮件
- @throws Exception */ public synchronized void sendEmail() throws Exception{ System.out.println(Thread.currentThread().getId() + “\t invoked sendEmail()”); } } public class ReenterLockDemo {
public static void main(String[] args) {
Phone phone = new Phone();
// 两个线程操作资源列
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}
} /**
- 结果
t1 invoked sendSMS() t1线程在外层方法获取锁的时候
t1 invoked sendEmail() t1在进入内层方法会自动获取锁
t2 invoked sendSMS() t2线程在外层方法获取锁的时候
t2 invoked sendEmail() t2在进入内层方法会自动获取锁
*/
```
11.2、证明ReentrantLock
```java import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
/**
资源类 */ class Phone implements Runnable{
Lock lock = new ReentrantLock();
/**
set进去的时候,就加锁,调用set方法的时候,能否访问另外一个加锁的set方法 */ public void getLock() { lock.lock(); try {
System.out.println(Thread.currentThread().getName() + "\t get Lock"); setLock();
} finally {
lock.unlock();
} }
public void setLock() { lock.lock(); try {
System.out.println(Thread.currentThread().getName() + "\t set Lock");
} finally {
lock.unlock();
} }
@Override public void run() { getLock(); } } public class ReenterLockDemo {
public static void main(String[] args) {
Phone phone = new Phone();
/**
* 因为Phone实现了Runnable接口
*/
Thread t3 = new Thread(phone, "t3");
Thread t4 = new Thread(phone, "t4");
t3.start();
t4.start();
}
} //结果 //t3 get Lock //t3 set Lock //t4 get Lock //t4 set Lock
- 如果加锁的数量==解锁的数量,程序不会报错
- 如果加锁的数量>解锁的数量,程序卡死,线程锁死,等待解锁,也就是死锁
- 如果加锁的数量<解锁的数量,程序报错java.lang.IllegalMonitorStateException
<a name="nGFU1"></a>
# 12、自旋锁(SpinLock)
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少上下文切换的消耗,缺点是循环会消耗CPU的资源
```java
/**
* 手写一个自旋锁
*
* 循环比较获取直到成功为止,没有类似于wait的阻塞
*
* 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁5秒,B随后进来发现当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到
* @author: fengyi0827
* @create: 2021-07-15-15:46
*/
public class SpinLockDemo {
// 现在的泛型装的是Thread,原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in ");
// 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否者自旋
while(!atomicReference.compareAndSet(null, thread)) {
}
}
/**
* 解锁
*/
public void myUnLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
// 自己用完了后,把atomicReference变成null
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
// 启动t1线程,开始操作
new Thread(() -> {
// 开始占有锁
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 开始释放锁
spinLockDemo.myUnLock();
}, "t1").start();
// 让main线程暂停1秒,使得t1线程,先执行
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1秒后,启动t2线程,开始占用这个锁
new Thread(() -> {
// 开始占有锁
spinLockDemo.myLock();
// 开始释放锁
spinLockDemo.myUnLock();
}, "t2").start();
}
}
13、独占锁和共享锁
独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁
共享锁:指该锁可以被多个线程锁持有
对ReentrantReadWriteLock其读锁是共享,其写锁是独占
写的时候只能一个人写,但是读的时候,可以多个人同时读
/**
* 读写锁
* 多个线程 同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行
* 但是,如果一个线程想去写共享资源,就不应该再有其它线程可以对该资源进行读或写
*
* @author: fengyi0827
* @create: 2021-07-15-16:59
*/
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 资源类
*/
class MyCache {
/**
* 缓存中的东西,必须保持可见性,因此使用volatile修饰
*/
private volatile Map<String, Object> map = new HashMap<>();
/**
* 创建一个读写锁
* 它是一个读写融为一体的锁,在使用的时候,需要转换
*/
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/**
* 定义写操作
* 满足:原子 + 独占
* @param key
* @param value
*/
public void put(String key, Object value) {
// 创建一个写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);
try {
// 模拟网络拥堵,延迟0.3秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 写锁 释放
rwLock.writeLock().unlock();
}
}
/**
* 获取
* @param key
*/
public void get(String key) {
// 读锁
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在读取:");
try {
// 模拟网络拥堵,延迟0.3秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object value = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + value);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 读锁释放
rwLock.readLock().unlock();
}
}
/**
* 清空缓存
*/
public void clean() {
map.clear();
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
// 线程操作资源类,5个线程写
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, String.valueOf(i)).start();
}
// 线程操作资源类, 5个线程读
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, String.valueOf(i)).start();
}
}
}
14、CountDownLatch
点火公式 - -
让一些线程阻塞直到另一些线程完成一系列操作才被唤醒
CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程就会被阻塞。其它线程调用CountDown方法会将计数器减1(调用CountDown方法的线程不会被阻塞),当计数器的值变成零时,因调用await方法被阻塞的线程会被唤醒,继续执行
package com.moxi.interview.study.thread;
import java.util.concurrent.CountDownLatch;
/**
* @author: fengyi0827
* @create: 2021-07-15-19:03
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t 班长最后关门");
}
}
17、CyclicBarrier
CyclicBarrier的字面意思就是可循环(cyclic)使用的屏障(Barrier)。它要求做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await方法
/**
* CyclicBarrier循环屏障
*
* @author: fengyi0827
* @create: 2021-07-16-14:40
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 定义一个循环屏障,参数1:需要累加的值,参数2 需要执行的方法
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 0; i < 7; i++) {
final Integer tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 收集到 第" + tempInt + "颗龙珠");
try {
// 先到的被阻塞,等全部线程完成后,才能执行方法
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
16、Semaphore
信号量,作用
- 一个是用于共享资源的互斥使用
另一个用于并发线程数的控制 ```java /**
- 信号量Demo
- @author: fengyi0827
@create: 2021-07-16-15:01 */ public class SemaphoreDemo {
public static void main(String[] args) {
/** * 初始化一个信号量为3,默认是false 非公平锁, 模拟3个停车位 */ Semaphore semaphore = new Semaphore(3, false); // 模拟6部车 for (int i = 0; i < 6; i++) { new Thread(() -> { try { // 代表一辆车,已经占用了该车位 semaphore.acquire(); // 抢占 System.out.println(Thread.currentThread().getName() + "\t 抢到车位"); // 每个车停3秒 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t 离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放停车位 semaphore.release(); } }, String.valueOf(i)).start(); }
} }
<a name="mEM3t"></a>
# 17、阻塞队列(BlockingQueue)
<br />线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
- 当蛋糕店的柜子空的时候,无法从柜子里面获取蛋糕
- 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞
- 当蛋糕店的柜子满的时候,无法继续向柜子里面添加蛋糕了
BlockingQueue阻塞队列是属于一个接口,底下有七个实现类
- **ArrayBlockQueue:由数组结构组成的有界阻塞队列**
- **LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列**
**有界,但是界限非常大,相当于无界,可以当成无界**
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- **SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列**
生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
<br />
<a name="M3b1Z"></a>
## 17.1、生产者消费者模式
```java
/**
* 生产者消费者 传统版
* 题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
* @author: fengyi0827
* @create: 2021-07-16-21:38
*/
/**
* 线程 操作 资源类
* 判断 干活 通知
* 防止虚假唤醒机制
*/
/**
* 资源类
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// 高内聚,低耦合 内聚指的是,一个空调,自身带有调节温度高低的方法
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1").start();
// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
PLUS版
/**
* 生产者消费者 阻塞队列版
* 使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
*
* @author: 陌溪
* @create: 2020-03-17-11:13
*/
class MyResource {
// 默认开启,进行生产消费
// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改
private volatile boolean FLAG = true;
// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而应该采用依赖注入里面的,构造注入方法传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
/**
* 生产
* @throws Exception
*/
public void myProd() throws Exception{
String data = null;
boolean retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 2秒存入1个data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");
}
/**
* 消费
* @throws Exception
*/
public void myConsumer() throws Exception{
String retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
// 2秒存入1个data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );
// 退出消费队列
return;
}
}
}
/**
* 停止生产的判断
*/
public void stop() {
this.FLAG = false;
}
}
public class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) {
// 传入具体的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
System.out.println("");
System.out.println("");
try {
myResource.myProd();
System.out.println("");
System.out.println("");
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
// 5秒后,停止生产和消费
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("");
System.out.println("");
System.out.println("5秒中后,生产和消费线程停止,线程结束");
myResource.stop();
}
}
18、AQS
AbstractQueuedSynchronizer 抽象的队列同步器
是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。
AQS原理:
加锁:
- 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
- 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;(队列的头部是傀儡节点)
- acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
释放锁:
release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。