并发问题的产生
CPU性能优化手段
- CPU缓存:通过CPU多级缓存来增加CPU读取数据的速度
- 运行时指令重排
两个问题
- 同一时间点,各个CPU看到的同一内存地址的数据值不同。
- 多线程指令重排可能会出现乱序执行,出现错误。
内存屏障
Java线程基本概念
Java线程基础
- run() 和 start()
run() 只是定义了启动线程需要执行的方法,直接调用就和调用普通的方法一样,会在主线程中顺序执行.
start() 内部调用start0()本地方法,是真正创建子线程的方法,会创建一个子线程并启动。 - 实现方式
- 继承Thread类 - 本质也是实现了 Runnable接口
- 实现Runnable接口 - 接口内部只有一个run方法
- 实现Callable接口 -
**Callable**
接口可以返回结果或抛出检查异常 - 从线程池获取
实现 Runnable 接口和 Callable 接口的区别
Runnable
自 Java 1.0 以来一直存在,但Callable
仅在 Java 1.5 中引入,目的就是为了来处理Runnable
不支持的用例。**Runnable**
接口不会返回结果或抛出检查异常,但是**Callable**
接口可以。
所以,如果任务不需要返回结果或抛出异常推荐使用Runnable
接口,这样代码看起来会更加简洁。 ```java @FunctionalInterface public interface Runnable { /**
- 被线程执行,没有返回值也无法抛出异常 */ public abstract void run(); } ```
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}
Java线程状态
终止线程
使用interrupt()方法,或者在while循环中判断标识位。
Sleep和Wait
并发三特性
概览图
- 可见性 : 一个线程修改共享变量后,其他线程可见(CPU缓存引发 也可由jit优化引发)。
- 有序性: 对象半初始化问题,单例模式双重检验,由于指令重排序引起对象未初始化便被其他线程得到。
- 原子性 : 非原子操作要么全部成功,要么全部失败
可见性
Java内存模型 (JMM)
在 JDK1.2 之前,Java 的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下,线程可以把变量保存工作内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。
两种解决方案:
1)通过在总线加LOCK#锁的方式 (效率低下、解决原子性)
2)通过缓存一致性协议(volatile)
有序性
重排序 遵循 as-if-serial 和happens-before 规则 (规定了不能重排序的情况 )
原子性
原子操作
原子操作三种实现方式:
- CAS (Compare and Swap) : 会引发ABA问题 -> 解决方案 : 版本号更新
- 加锁
CAS 和 AQS
这两者都是并发编程的基础。
CAS
什么是CAS
CAS,compare and swap的缩写,即:比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。
如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
CAS “我认为位置 V 此刻的值是 A;如果确实如此,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”
CAS通过这种方法保证在 “我” 操作位置V的这个时间段内,位置V的值A没有被 “其他人” 修改,从而保证操作的原子性。
CAS存在的问题
CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。
- ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 - 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
- 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
concurrent包中 CAS 的实现
由于java的CAS同时具有 volatile 读和volatile写的内存语义,因此Java线程之间的通信现在有了下面四种方式:
- A线程写volatile变量,随后B线程读这个volatile变量。
- A线程写volatile变量,随后B线程用CAS更新这个volatile变量。
- A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。
- A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。
Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:
- 首先,声明共享变量为volatile;
- 然后,使用CAS的原子条件更新来实现线程之间的同步;
- 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。
Atomic 原子类(CAS + volatile)
并发包 java.util.concurrent
的原子类都存放在java.util.concurrent.atomic
AQS
AQS是并发编程中非常重要的概念,它是juc包下的许多并发工具类,如CountdownLatch,CyclicBarrier,Semaphore 和锁, 如ReentrantLock, ReaderWriterLock的实现基础,提供了一个基于int状态码和队列来实现的并发框架。
AQS基本概念
AQS(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示状态,通过内置的FIFO(first in,first out)队列来完成资源获取线程的排队工作。
同步状态
AQS中维持一个全局的int状态码(state),线程通过修改(加/减指定的数量)是否成功来决定当前线程是否成功获取到同步状态。
独占or共享模式
AQS支持两种获取同步状态的模式既独占式和共享式。
独占式模式同一时刻只允许一个线程获取同步状态,而共享模式则允许多个线程同时获取。
同步队列
同步队列(一个FIFO双向队列)是AQS的核心,用来完成同步状态的管理,当线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成一个节点并加入到同步队列,同时会阻塞当前线程。
基于AQS的并发工具类
- CountDownLatch
让主线程等待一组事件发生后继续执行,子线程执行countDown方法,使主线程的cnt值-1,减到零时主线程继续执行。
创建子线程时可以传入CountDownLatch - CyclicBarrier
阻塞当前线程,等待其他线程、所有线程到达栅栏处后,才能继续执行
T1.await()栅栏计数器减一,若不为零t1阻塞
创建子线程时可以传入CyclicBarrier - Semaphore
控制某个资源可被多少个线程同时访问、semp.acquire()获取许可、semp.release()释放
创建子线程时可以传入Semaphore - Exchanger
线程之间用于数据交换,两个线程到达同步点之后,相互交换数据,先到的会阻塞(只能用于两个线程)
volatile、synchronized、ReetrantLock
volatile
volatile 通过底层通过对其修饰变量的读写操作前后加入内存屏障实现禁止指令重排序 (有序性)
通过 立即写、缓存一致协议 保证可见性
- MESI缓存一致协议 : 当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态
- 使得加锁的过程仅存在于线程向主存中共享变量写的过程,极大的提升了效率。
volatile 保证可见性和有序性,不保证原子性
在某一时刻线程1将i的值load取出来,放置到cpu缓存中,然后再将此值放置到寄存器A中,然后A中的值自增1(寄存器A中保存的是中间值,没有直接修改i,因此其他线程并不会获取到这个自增1的值)。如果在此时线程2也执行同样的操作,获取值i==10,自增1变为11,然后马上刷入主内存。此时由于线程2修改了i的值,实时的线程1中的i==10的值缓存失效,重新从主内存中读取,变为11。接下来线程1恢复。将自增过后的A寄存器值11赋值给cpu缓存i。这样就出现了线程安全问题。
synchronized
概述
锁升级过程
不同锁级别,对象markword的对应结构
ReetrantLock
synchronized 和 ReentrantLock 的区别
- 两者都是可重入锁
“可重入锁” 指的是自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果不可锁重入的话,就会造成死锁。同一个线程每次获取锁,锁的计数器都自增 1,所以要等到锁的计数器下降为 0 时才能释放锁。 - synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API
synchronized 是依赖于 JVM 实现的,前面我们也讲到了 虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。 - ReentrantLock 比 synchronized 增加了一些高级功能
相比synchronized,ReentrantLock增加了一些高级功能。主要来说主要有三点:
- 等待可中断 : ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。
- 可实现公平锁 : ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。ReentrantLock默认情况是非公平的,可以通过 ReentrantLock类的ReentrantLock(boolean fair)构造方法来制定是否是公平的。
- 可实现选择性通知(锁可以绑定多个条件): synchronized关键字与wait()和notify()/notifyAll()方法相结合可以实现等待/通知机制。ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition()方法。
- 选择性通知:
Condition是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器),线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。而synchronized关键字就相当于整个 Lock 对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程这样会造成很大的效率问题,而Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中的所有等待线程。
线程池
概述
为什么要使用线程池 ?
七大参数
**ThreadPoolExecutor**
七大参数:
**corePoolSize**
: 核心线程数 - 核心线程数定义了最小可以同时运行的线程数量。**maximumPoolSize**
: 最大线程数 - 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。**workQueue**
: 等待队列 - 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。**keepAliveTime**
:保活时间 - 当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁;**unit**
:keepAliveTime
参数的时间单位。**threadFactory**
: 线程工厂 - executor 创建新线程的时候会用到。**handler**
: 饱和策略 - 当前运行线程数为最大线程数,等待队列已满时对于新任务的处理策略。
执行流程
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
- 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程
- 当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
饱和策略
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor
定义一些策略:
**ThreadPoolExecutor.AbortPolicy**
: 抛出RejectedExecutionException
来拒绝新任务的处理。**ThreadPoolExecutor.CallerRunsPolicy**
: 调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。**ThreadPoolExecutor.DiscardPolicy**
: 不处理新任务,直接丢弃掉。**ThreadPoolExecutor.DiscardOldestPolicy**
: 此策略将丢弃最早的未处理的任务请求。
线程池的状态
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
线程池各个状态切换框架图:
1、RUNNING
状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。
当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
线程池的选择
根据任务种类选择线程的数量:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N
JDK自带四种线程池
Java通过Executors提供四种线程池,分别为
- newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
- newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- newScheduledThreadPool 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
- newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
等待队列(阻塞队列)
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列(常用)
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列
- DelayQueue: 一个使用优先级队列实现的无界阻塞队列
- SynchronousQueue: 一个不存储元素的阻塞队列(常用)
- LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列
- LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列
异步编排
通过 CompletableFuture 类可以异步向线程池提交任务,可以做到在A任务执行后利用A的结果再向线程池提交新的任务。
CompletableFuture<> future = CompletableFuture.supplyAsync(()->{},线程池)
.whenComplete((返回值,异常)) 感知线程返回信息,无法改变结果
.handle感知线程返回信息,并作相应处理
.thenRun等方法,串行执行其他任务
等更多串行方式
CompletableFuture<返回值> result = CompletableFuture.supplyAsync(Callanle,线程池)需要返回值
result.thenAcceptAcceptAsync((result的返回值),线程池) 不需要返回值