并行:多个 cpu 实例或者多台机器同时执行一段处理逻辑,是真正的同时。 并发:通过 cpu 调度算法,让用户看上去同时执行,实际上从 cpu 操作层面不是真正的同时。
线程与进程的区别归纳:
a.地址空间和其它资源:进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
b.通信:进程间通信 IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
c.调度和切换:线程上下文切换比进程上下文切换要快得多。
d.在多线程 OS 中,进程不是一个可执行的实体。
守护线程:
守护线程是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。在 Java 中垃圾回收线程就是特殊的守护线程。
5.1:原理/方法机制
多线程执行时,在栈内存中,其实每一个执行线程都有一片自己所属的栈内存空间。进行方法的压栈和弹栈。
当执行线程的任务结束了,线程自动在栈内存中释放了。但是当所有的执行线程都结束了,那么进程就结束了。
synchronized, wait, notify 是任何对象都具有的同步工具。
wait/notify 必须存在于 synchronized 块中,因为如果wait执行到了notify后面,可能导致wait线程一直没有办法唤醒。比如:
并且,这三个关键字针对的是同一个监视器(某对象的监视器)。
volatile:多线程的内存模型:main memory(主存)、working memory(线程栈),在处理数据时,线程会把值从主存 load 到本地栈,完成操作后再 save 回去(volatile 关键词的作用:每次针对该变量的操作都激发一次 load and save)。
5.2:线程状态
当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。在线程的生命周期中,枚举中给出了六种线程状态:
就绪(Runnable):线程准备运行,不一定立马就能开始执行。
运行中(Running):进程正在执行线程的代码。
等待中(Waiting):线程处于阻塞的状态,等待外部的处理结束。
睡眠中(Sleeping):线程被强制睡眠。
I/O 阻塞(Blocked on I/O):等待 I/O 操作完成。
同步阻塞(Blocked on Synchronization):等待获取锁。
死亡(Dead):线程完成了执行
多个线程处理同一个资源,需要线程间通信解决线程对资源的占用,避免对同一资源争夺。及引入等待唤醒机制(wait(),notify())
(a)wait()方法:线程调用 wait()方法,释放它对锁的拥有权,然后等待另外的线程来通知它(通知的方式是 notify()或者 notifyAll()方法),这样它才能重新获得锁的拥有权和恢复执行。
要确保调用 wait()方法的时候拥有锁,即,wait()方法的调用必须放在synchronized方法或synchronized块中。
(b)notify()方法:唤醒一个等待当前对象的锁的线程。唤醒在此对象监视器上等待的单个线程。
(c)notifAll()方法:唤醒在此对象监视器上等待的所有线程。
sleep 执行后线程进入阻塞状态,sleep(0)表示当前线程被冻结了一下,让其他线程有机会优先执行,当前线程暂时放弃了 CPU,相当于一个让位动作,让当前线程立即回到就绪队列
yield 执行后线程进入就绪状态
join 执行后线程进入阻塞状态
join()中止当前线程(也就是 a),等待指定(也就是 b)线程结束,然后再运行当前线程
中断
Stop 方式过于野蛮,直接停止,可能会导致一些资源没有释放导致死锁。 Thread.interupt() 并不会立即中断,而是打上中断标记,具体啥时候中断还是交给操作系统处理,防止持久资源没法释放。Thread 类提供了 interrupted 方法测试当前线程是否中断,isInterrupted 方法测试线程是否已经中断
线程 A 和 B 都要获取对象 O 的锁定,假设 A 获取了对象 O 锁,B 将等待 A 释放对 O 的锁定,
如果使用 synchronized ,如果 A 不释放,B 将一直等下去,不能被中断
如果 使用 ReentrantLock,如果 A 不释放,可以使 B 在等待了足够长的时间以后,中断等待,而干别的事情
public static void main(String[] args){
Thread t = new DemoThread();
t.start();
t.interrupt();
System.out.println("t线程是否已经停止:" + t.isInterrupted());//true
System.out.println("当前线程是否已经停止" + Thread.interrupted());//false,main线程依然运行
}
public static class DemoThread extends Thread{
@Override
public void run(){
while(!isInterrupted()){
System.out.println("没结束!")
}
}
}
5.2:Thread 类
构造
- public Thread() :分配一个新的线程对象。
- public Thread(String name) :分配一个指定名字的新的线程对象。
- public Thread(Runnable target) :分配一个带有指定目标新的线程对象。
- public Thread(Runnable target,String name) :分配一个带有指定目标新的线程对象并指定名字。
常用方法:
- public String getName() :获取当前线程名称。
- public void start() :导致此线程开始执行; Java 虚拟机调用此线程的 run 方法。
- public void run() :此线程要执行的任务在此处定义代码。
- public static void sleep(long millis) :使当前正在执行的线程以指定的毫秒数暂停(暂时停止执行)。
- public static Thread currentThread() :返回对当前正在执行的线程对象的引用。
public boolean isAlive():测试线程是否已经启动
5.2:ThreadLocal
为每一个线程创建一个副本,实现线程上下文的变量传递。
线程变量
ThreadLocal 提供了线程内存储变量的能力,这些变量不同之处在于每一个线程读取的变量是对应的互相独立的。通过 get 和 set 方法就可以得到当前线程对应的值。ThreadLocal 实例通常来说都是 private static 类型的,用于关联线程和线程上下文。
好处:传递数据:保存每个线程绑定的数据,在需要的地方可以直接获取,避免参数直接传递带来的代码耦合问题
- 线程隔离:各线程之间的数据相互隔离却又具备并发性,避免同步方式带来的性能损失
使用举例:
public class MyDemo01 {
// 变量
private String content;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public static void main(String[] args) {
MyDemo01 myDemo01 = new MyDemo01();
ThreadLocal<String> threadLocal = new ThreadLocal<>();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
threadLocal.set(Thread.currentThread().getName() + "的数据");
System.out.println("-----------------------------------------");
System.out.println(Thread.currentThread().getName() + "\t " + threadLocal.get());
}, String.valueOf(i)).start();
}
}
}
输出
4 4的数据
————————————————————-
3 3的数据
————————————————————-
2 2的数据
————————————————————-
1 1的数据
0 0的数据
ThreadLocal 与 Synchronized 的区别:
虽然 ThreadLocal 模式与 Synchronized 关键字都用于处理多线程并发访问变量的问题,不过两者处理问题的角度和思路不同。
Synchronized | ThreadLocal | |
---|---|---|
原理 | 同步机制采用 以空间换时间 的方式,只提供了一份变量,让不同的线程排队访问 | ThreadLocal 采用以空间换时间的概念,为每个线程都提供一份变量副本,从而实现同时访问而互不干扰 |
侧重点 | 多个线程之间访问资源的同步 | 多线程中让每个线程之间的数据相互隔离 |
总结:在刚刚的案例中,虽然使用 ThreadLocal 和 Synchronized 都能解决问题,但是使用 ThreadLocal 更为合适,因为这样可以使程序拥有更高的并发性。
在 JDK8 中 ThreadLocal 的设计是:每个 Thread 维护一个 ThreadLocalMap,这个 Map 的 key 是 ThreadLocal 实例本身,value 才是真正要存储的值 object。具体的过程是这样的:
- 每个 Thread 线程内部都有一个 Map(ThreadLocalMap)
- Map 里面存储 ThreadLocal 对象(key)和线程的变量副本(value)
- Thread 内部的 Map 是由 ThreadLocal 维护的,由 ThreadLocal 负责向 map 获取和设置线程的变量值。
- 对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了副本的隔离,互不干扰。
面这张图详细的揭示了 ThreadLocal 和 Thread 以及 ThreadLocalMap 三者的关系。
1、Thread 中有一个 map,就是 ThreadLocalMap
2、ThreadLocalMap 的 key 是 ThreadLocal,值是我们自己设定的。
3、ThreadLocal 是一个弱引用,当为 null 时,会被当成垃圾回收
为什么 key 设置成弱引用?
当线程结束时,将线程设为 null,如果 key 也设置成强引用指向 ThreadLocal,那么在线程结束时 ThreadLocal 不能被回收,,容易发生内存泄漏。
4、重点来了,突然我们 ThreadLocal 是 null 了,也就是要被垃圾回收器回收了,但是此时我们的 ThreadLocalMap 生命周期和 Thread 的一样,它不会回收,这时候就出现了一个现象。那就是 ThreadLocalMap 的 key 没了,但是 value 还在,这就造成了内存泄漏。
解决办法:使用完 ThreadLocal 后,执行 remove 操作,避免出现内存溢出情况。
线程池禁用 ThreadLocal,如果没有 remove 掉,则容易造成垃圾。
//#################
//#ThreadLocal
//#################
1:数据结构如下
Thread
|->当前线程
Thread#ThreadLocal.ThreadLocalMap threadLocals
|->当前线程成员字段
ThreadLocal.ThreadLocalMap#Entry[] table;
|->当前线程成员字段threadLocals的成员字段table,存放所有 ThreadLocal
table索引位:ThreadLocal实例.threadLocalHashCode & (数组长度 - 1);
table索引位的值:
T referent:ThreadLocal实例,即父类WeakReference的软引用
Object value:ThreadLocal实例.set的入参
ThreadLocal 泄露的原因:
1:Thread 与 Thread#ThreadLocal.ThreadLocalMap 的声明周期一样长
2:ThreadLocal 是 Entry extends WeakReference
3:Thread#ThreadLocal.ThreadLocalMap.Entry[] table 的索引 (ThreadLocal.hashcode & (table.length-1)) 的值在 步骤2被回收后就出现内存泄露
ThreadLocal 泄露的解决方式:
1:set/get/remove
图解:假设 threadLocal2 被回收了,但是 table[idx2] 的 WeakReference
一旦Thread一直活着,就进而导致 table[idx2] 的value 不能回收,最后导致 memory leak 内存泄露
+——+———+——————-+—————+——+———+—————+
|Thread#ThreadLocal.ThreadLocalMap |
+——+———+——————-+—————+——+———+—————+
| |Entry[] table |
| |索引 索引值 |
| |threadLocal1.hash & (table.length - 1) 用户的值 |
| |threadLocal2.hash & (table.length - 1) 用户的值 |
| | |
| | |
| | |
| | |
+——+———+——————-+—————+——+———+—————+
ThreadLocal 内存泄露原因
http://yucliuh.imwork.net:32880/static/thread_local_analyse.png
核心源码
除了构造方法之外,ThreadLocal 对外暴露的方法有以下 4 个
方法声明 | 描述 |
---|---|
protected T initialValue() | 返回当前线程局部变量的初始值 |
public void set(T value) | 返回当前线程绑定的局部变量 |
public T get() | 获取当前线程绑定的局部变量 |
public void remove() | 移除当前线程绑定的局部变量 |
以下是这 4 个方法的详细源码分析
了解到 ThreadLocal 的操作实际上是围绕 ThreadLocalMap 展开的。ThreadLocalMap 的源码相对比较复杂,我们从以下三个方面进行讨论。
1:基本结构
ThreadLocalMap 是 ThreadLocal 的内部类,没有实现 Map 接口,用独立的方式实现了 Map 的功能,其内部的 Entry 也是独立实现。
存储结果 Entry
Spring 中使用
5.3:线程实现方式
继承 Thread 类,重写 run 方法(其实 Thread 类本身也实现了 Runnable 接口)
继承 Thread 类实现线程,扩展性不强,因为 Java 类只能继承一个
测试:
class MyThread extends Thread{
public void run(){
}
}
public class TestThread{
public static void main(String[] args){
MyThread thread = new MyThread();//创建用户线程对象
thread.start();//启动用户线程
thread.run();//主线程调用用户线程对象的run()方法
}
}
方法
//当前线程可转让cpu控制权,让别的就绪状态线程运行(切换)
public static Thread.yield()
//暂停一段时间
public static Thread.sleep()
//在一个线程中调用other.join(),将等待other执行完后才继续本线程。
public join()
//后两个函数皆可以被打断
public interrupte()
实现 Runnable 接口,重写 run 方法
当使用 Thread(Runnable thread)方式创建线程对象时,须为该方法传递一个实现了 Runnable 接口的对象,这样创建的线程将调用实现 Runnable 接口的对象的 run()方法
public class TestThread{
public static void main(String[] args){
Mythread mt = new Mythread();
Thread t = new Thread(mt);//创建用户线程
t.start();//启动用户线程
}
}
class Mythread implements Runnable{
public void run(){
}
}
实现 Callable 接口,重写 call 方法(有返回值)
使用线程池(有返回值)
上所有的多线程代码都是通过运行 Thread 的 start()方法来运行的。因此,不管是继承 Thread 类还是实现 Runnable 接口来实现多线程,最终还是通过 Thread 的对象的 API 来控制线程的
说明
1:start 和 run 方法有什么不同?
t.start()才会启动一个线程,通过调用 Thread 类的 start()方法来启动一个线程,这时此线程处于就绪(可运行)状态,并没有运行,一旦得到 cpu 时间片,就开始执行 run()方法,这里方法 run()称为线程体,它包含了要执行的这个线程的内容,Run 方法运行结束,此线程随即终止。
t.run()只是普通的方法调用,所以是顺序执行的。
5.6:线程池
作用:避免频繁地创建和销毁线程,达到线程对象的重用。另外,使用线程池还可以根据项目灵活地控制并发的数目。
ThreadPoolExecutor 类是线程池中最核心的一个类,它提供了四个构造方法。
1,参数
corePoolSize 核心线程数量
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize,即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
workQueue 阻塞队列
用来保存等待被执行的任务的阻塞队列. 在 JDK 中提供了如下阻塞队列:
(1) ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
(2)LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
(3)SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
(4) priorityBlockingQuene:具有优先级的无界阻塞队列
maximumPoolSize 最大线程数 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;当阻塞队列是无界队列,则 maximumPoolSize 则不起作用,因为无法提交至核心线程池的线程会一直持续地放入 workQueue.
keepAliveTime 线程空闲时的存活时间
线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用,超过这个时间的空闲线程将被终止;
unit
keepAliveTime 的单位
有7种取值,在TimeUnit类中有7种静态属性
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
threadFactory 线程工厂
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory
*handler 当拒绝处理任务时的策略
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
AbortPolicy:直接抛出异常,默认策略;关键业务,并发量高德时候抛出异常能够及时发现
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务,主要用于无关紧要的任务,博客阅读量
RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
2:构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
3:其他创建线程池的方法
- Executors.newFixedThreadPool(int nThreads) :创建一个拥有 i 个线程的线程池
- 执行长期的任务,性能好很多
- 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待。
- 缺点:由于阻塞队列无限大,可能造成栈溢出,导致程序崩溃。
- Executors.newSingleThreadExecutor:创建一个只有 1 个线程的 单线程池
- 一个任务一个任务执行的场景
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
- Executors.newCacheThreadPool(); 创建一个可扩容的线程池
- 执行很多短期异步的小程序或者负载教轻的服务器
- 创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建新线程
- 处理大量短时间工作任务的线程池,他会缓存线程并重用,无缓存线程时,就会创建新线程,闲置超过 60 秒则会被移出缓存,其内部使用 SynchronousQueue 作为工作队列;
- Executors.newScheduledThreadPool(int corePoolSize):线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池
- newSingleThreadScheduledExecutor():创建单线程池,返回 ScheduledExecutorService,可以进行定时或周期性的工作调度;new ScheduledThreadPool(int corePoolSize):创建一个定长的线程池,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程
- new WorkStealingPool(int parallelism): Java 8 才加入这个创建方法,其内部会构建 ForkJoinPool,利用 Work-Stealing 算法,并行地处理任务,不保证处理顺序;
ThreadPoolExecutor():是最原始的线程池创建,上面 1-3 创建方式都是对 ThreadPoolExecutor 的封装。
4:方法
execute()
提交任务,交给线程池执行
执行流程:
1、如果线程池当前线程数量少于 corePoolSize,则 addWorker(command, true)创建新 worker 线程,如果创建成功返回,没创建成功则 2
addWorker(command, true)失败的原因可能是:线程池已经 shutdown,shutdown 的线程池不再接收新任务
- workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了 worker 线程,导致 workerCount>=corePoolSize
2、如果线程池中线程还在 running 状态,尝试将 task 加入 workQueue 阻塞队列中,如果加入成功,进行 double-check,如果加入失败(可能是队列已满),则执行后续步骤;
double-check 主要目的是:判断刚加入 workQueue 阻塞队列的 task 是否能被执行
- 如果线程池已经不是 running 状态了,应该拒绝添加新任务,从 workQueue 中删除任务
- 如果线程池是运行状态,或者从 workQueue 中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
3、如果线程池不是 running 状态 或者 无法入队列,尝试开启新线程,扩容至 maxPoolSize,如果 addWork(command, false)失败了,拒绝当前 command。
submit()
提交任务,能够返回执行结果 execute + Future
shutdown()
关闭线程池,等待任务都执行完
执行流程:
1、上锁,mainLock 是线程池的主锁,是可重入锁,当要操作 workers set 这个保持线程的 HashSet 时,需要先获取 mainLock,还有当要处理 largestPoolSize、completedTaskCount 这类统计数据时需要先获取 mainLock
2、判断调用者是否有权限 shutdown 线程池
3、使用 CAS 操作将线程池状态设置为 shutdown,shutdown 之后将不再接收新任务
4、中断所有空闲线程 interruptIdleWorkers()
5、onShutdown(),ScheduledThreadPoolExecutor 中实现了这个方法,可以在 shutdown()时做一些处理
6、解锁
7、尝试终止线程池 tryTerminate()
shutdownNow():关闭线程池,不等待任务执行完
getTaskCount():线程池已执行和未执行的任务总数
getCompletedTaskCount():已完成的任务数量
getPoolSize():线程池当前的线程数量
getActiveCount():当前线程池中正在执行任务的线程数量
5:原理:
ThreadPoolExecutor 执行 execute()流程:
当一个任务提交至线程池之后:
1.线程池首先判断核心线程池里的线程是否已经满了。如果不是,则创建一个新的工作线程来执行任务。否则进入 2.
2:判断工作队列是否已经满了,倘若还没有满,将线程放入工作队列。否则进入 3.
3.判断线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行。如果线程池满了,则交给饱和策略来处理任务。
源码实现:
Executor:最顶级接口,提供了execute()方法将任务提交和任务执行分离。当你把一个Runnable任务提交给Executor后,如何执行任务要看它的实现类。
ExecutorService:增加了对线程池中任务生命周期的管理,可强制取消正在执行的任务,拒绝再接受任务。提供了submit()方法来扩展Executor.execute(),使任务执行有返回值。
AbstractExecutorService:ExecutorService接口的默认实现,线程池的大部分功能已在这个类中被编写。
ThreadPoolExecutor:线程池最核心的一个类,继承了AbstractExecutorService,完整的实现了一个线程池。
线程池的重要变量主要两个:runState(线程池运行状态:运行还是关闭)和workerCount(线程池中的线程数目),在 JDK1.8 中,这两个变量被放入了一个线程安全的 int 类型变量中
6:自定义线程池
实际开发中,上诉线程池我们一个都不用,都是自己自定义的
- 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
- 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
- 线程池不允许使用 Executors 去创建,而是通过 ThreadToolExecutors 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
- Executors 返回的线程池对象弊端如下:
- FixedThreadPool 和 SingleThreadPool:
- 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
- CacheThreadPool 和 ScheduledThreadPool
- 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
- FixedThreadPool 和 SingleThreadPool:
- Executors 返回的线程池对象弊端如下:
线程池的合理参数:
生产环境中如何配置 corePoolSize 和 maximumPoolSize。这个是根据具体业务来配置的
(1)高并发、任务执行时间短的业务,线程池线程数可以设置为 CPU 核数+1,减少线程上下文的切换
(2)并发不高、任务执行时间长的业务要区分开看:分为 CPU 密集型和 IO 密集型。性质不同的任务可用使用不同规模的线程池分开处理:
- CPU 密集型:尽可能少的线程,Ncpu+1
- IO 密集型:尽可能多的线程, Ncpu*2,比如数据库连接池
参考公式:CPU 核数 / (1 - 阻塞系数) 阻塞系数在 0.8 ~ 0.9 左右
例如:8 核 CPU:8/ (1 - 0.9) = 80 个线程数
混合型:CPU 密集型的任务与 IO 密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。
3:状态
running:这是最正常的状态,接受新的任务,处理等待队列中的任务。
- shutdown:不接受新的任务提交,但是会继续处理等待队列中的任务。
- stop:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。
- tidying:所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。
- terminated:terminated()方法结束后,线程池的状态就会变成这个。
. 在 Java 程序中怎么保证多线程的运行安全?
方法一:使用安全类,比如 Java. util. concurrent 下的类。
方法二:使用自动锁 synchronized。
方法三:使用手动锁 Lock。
4;关闭方式
Shutdown shutdownNow tryTerminate 清空工作队列,终止线程池中各个线程,销毁线程池
Wait 方法:只能在同步代码块中调用,wait 会释放掉对象锁,等待 nitify 唤醒
Notify 方法:
6:线程池中遇到的问题
(3):Java 线程池是如何保证核心线程不被销毁的?
Q:线程池是通过队列的 take 方法来阻塞核心线程 Worker 的 run 方法,保证核心线程不会因执行完 run 方法而被系统终止。
(1)如果你提交任务时,线程池队列已满,这时会发生什么?
如果你使用的 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务;
如果你使用的是有界队列比方说 ArrayBlockingQueue 的话,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue 满了,则会使用拒绝策略 RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy。
(2)高并发、任务执行时间短的业务怎样使用线程池?并发不高、任务执行时间长的业务怎样使用线程池?并发高、业务执行时间长的业务怎样使用线程池?这是我在并发编程网上看到的一个问题:
1)高并发、任务执行时间短的业务,线程池线程数可以设置为 CPU 核数+1,减少线程上下文的切换 2)并发不高、任务执行时间长的业务要区分开看: a)假如是业务时间长集中在 IO 操作上,也就是 IO 密集型的任务,因为 IO 操作并不占用 CPU,所以不要让所有的 CPU 闲下来,可以加大线程池中的线程数目,让 CPU 处理更多的业务 b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换 3)并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考 2)。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。
7:注意解决
1:使用线程池在流量突发期间能够平滑地服务降级
2:不要在有界线程池中执行相互依赖的任务。当线程池中正在执行的线程阻塞在依赖于线程池中其他任务的完成上。
解决:扩大线程池的线程数,以容纳更多的任务。但决定一个线程池合数的大小可能是困难甚至不可能的。另一个方法是将线程池的队列改成无界,但是由于系统资源有限,无界也只是容纳尽量多的任务。
3:确保提交到线程池的任务可以中断。
4:确保线程池中执行任务不能悄无声息的失败。线程池中的所有任务必须提供机制,如果它们异常终止,则需要通知应用程序。任务恢复或清除操作可以通过重写 java.util.concurrent.ThreadPoolExecutor
类的 afterExecute()
钩子来执行。
5:确保在使用线程池时重新初始化ThreadLocal变量。
5.4:线程间通信
- 锁机制:包括互斥锁,条件变量,读写锁
- 互斥锁提供了以排他方式防止数据结构被并发修改的问题
- 读写锁运行多线程同时读共享数据,而对写操作是互斥的
- 条件变量可以以原子方式阻塞进程,知道某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的,条件变量始终与互斥锁一起使用。
- 信号量机制(Semaphore):包括无名线程信号量和命名线程信号量
- 信号机制(Signal):类似进程间的信号处理,线程间的通信目的主要是用于线程同步,所以线程没有像进程通信中的用于数据交换的通信机制。
线程间交换数据:
ExecutorService service = Executors.newCachedThreadPool();
Exchanger exchanger = new Exchanger();
service.execute(new Runnable() {
public void run() {
try {
String sendData = "id:" + Thread.currentThread().getId() + ",data:两个黄鹂鸣翠柳,一行白鹭上青天;";
System.out.println(Thread.currentThread().getName() + "准备把数据" + sendData + "拿出来交换");
String receiveData = (String) exchanger.exchange(sendData);
System.out.println(Thread.currentThread().getName() + "换回来的数据为" + receiveData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
5.5:线程安全
线程同步
当我们使用多个线程访问同一资源的时候,且多个线程中对资源有写的操作,就容易出现线程安全问题。
Java 中提供了同步机制(synchronized)来解决
- 同步代码块。
- 同步方法:
- 锁机制。
同步代码块
synchronized(同步锁){
需要同步操作的代码
}
作用在非静态方法上:非静态方法是只能提供类的实例进行调用,所以实际上就是对调用方法的对象加锁,俗称对象锁
作用在静态方法上:静态方法是可以通过类名直接调用,所以实际上就是对调用方法的类加锁,俗称类锁
作用在代码块:根据传入的是类对象或类实例判断加锁方式同步方法
同步方法:使用 synchronized 修饰的方法,就叫做同步方法,保证 A 线程执行该方法的时候,其他线程只能在方法外等着。
public synchronized void method(){
可能会产生线程安全问题的代码
}
Sleep()和 wait()的区别:
sleep() 时间到会自动恢复;调用 sleep 不会释放对象锁。InterruptedException 异常。
wait() 是 Object 的方法,导致本线程放弃对象锁,释放所持有的对象的 lock。可以使用 notify()/notifyAll()直接唤醒。Notify 唤醒一个处于等待状态的线程,并不能确切唤醒某一个等待的线程,而是由 JVM 确定唤醒那个线程。
反对使用 stop()因为它不安全
Suspend()容易发生死锁调用 suspend()的时候,目标线程会停下来,但却仍然持有在这之前获得的锁定。此时,其他任何线程都不能访问锁定的资源,除非被”挂起”的线程恢复运行。对任何线程来说,如果它们想恢复目标线程,同时又试图使用任何一个锁定的资源,就会造成死锁。
Yield():线程让步,暂停当前正在执行的线程,把机会让给优先级相同或更高的线程
Join():锁相关
Lock 能完成 synchronized 所实现的所有功能。
synchronized 会自动释放锁,而 Lock 一定要求程序员手工释放,并且必须在 finally 从句中释放。
1:乐观锁与悲观锁
悲观锁:认为自己在使用数据的时候一定会有别的线程来修改数据。
Java 中,synchronized 关键字和 Lock 的实现类都是悲观锁。
悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。
乐观锁:自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。使用 CAS 算法
CAS 算法(比较与交换),会导致 ABA 问题,可以加时间戳
2:自旋锁 VS 适应性自旋锁
如果同步代码块的内容过于简单,线程挂起切换的时间比线程执行的时间还要长,就得不尝失,如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃 CPU 的执行时间,看看持有锁的线程是否很快就会释放锁。
而为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。
如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。
如果自旋超过了限定次数(默认是 10 次,可以使用-XX:PreBlockSpin 来更改)没有成功获得锁,就应当挂起线程。
实现原理也是 CAS
自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
为什么线程切换浪费资源?
第一:因为 CPU 运行状态分为用户态和内核态。线程切换状态会使 CPU 运行状态从用户态转换到内核态。
第二:每个线程在运行时的指令是被放在 CPU 的寄存器中的,如果切换内存状态,需要先把本线程的代码和变量写入内存。这样经常切换会耗费时间。
3:无锁 VS 偏向锁 VS 轻量级锁 VS 重量级锁
什么 Synchronized 能实现线程同步?
在回答这个问题之前我们需要了解两个重要的概念:“Java对象头”、“Monitor”。
Java 对象头
synchronized 是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在 Java 对象头里的,而 Java 对象头又是什么呢?
我们以 Hotspot 虚拟机为例,Hotspot 的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。
Mark Word:默认存储对象的 HashCode,分代年龄和锁标志位信息。这些信息都是与对象自身定义无关的数据,所以 Mark Word 被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间 Mark Word 里存储的数据会随着锁标志位的变化而变化。
Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
Monitor
Monitor 可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个 Java 对象就有一把看不见的锁,称为内部锁或者 Monitor 锁。
Monitor 是线程私有的数据结构,每一个线程都有一个可用 monitor record 列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个 monitor 关联,同时 monitor 中有一个 Owner 字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。
现在话题回到 synchronized,synchronized 通过 Monitor 来实现线程同步,Monitor 是依赖于底层的操作系统的 Mutex Lock(互斥锁)来实现的线程同步。
如同我们在自旋锁中提到的“阻塞或唤醒一个 Java 线程需要操作系统切换 CPU 状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长”。这种方式就是 synchronized 最初实现同步的方式,这就是 JDK 6 之前 synchronized 效率低的原因。这种依赖于操作系统 Mutex Lock 所实现的锁我们称之为“重量级锁”,JDK 6 中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。
所以目前锁一共有 4 种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级。
无锁
无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。
无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的 CAS 原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。
偏向锁
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。
当一个线程访问同步代码块并获取锁时,会在 Mark Word 里存储锁偏向的线程 ID。在线程进入和退出同步块时不再通过 CAS 操作来加锁和解锁,而是检测 Mark Word 里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 原子指令即可。
偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
偏向锁在 JDK 6 及以后的 JVM 里是默认启用的。可以通过 JVM 参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。
轻量级锁
是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的 Mark Word 的拷贝,然后拷贝对象头中的 Mark Word 复制到锁记录中。
拷贝成功后,虚拟机将使用 CAS 操作尝试将对象的 Mark Word 更新为指向 Lock Record 的指针,并将 Lock Record 里的 owner 指针指向对象的 Mark Word。
如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象 Mark Word 的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。
若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。
重量级锁
升级为重量级锁时,锁标志的状态值变为“10”,此时 MarkWord 中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。
偏向锁通过对比 Mark Word 解决加锁问题,避免执行 CAS 操作。而轻量级锁是通过用 CAS 操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。
锁升级
一开始无锁,当有一个线程 A 过来的时候,
偏向锁加锁(线程 A):CAS 设置 markword 成功,markwork 设置为 【tid:A|1 | 01】;
如果接下来线程 A 再来加锁,
- 如果期间没有其他线程尝试获取锁,markword 没有变,就直接加锁成功,不需要 CAS 或 Monitor 操作,所以偏向锁加锁非常快。
- 如果期间有其他线程尝试加锁,偏向锁那时就已经被撤销,根据当时是无锁[0 | 0 | 01],轻量级锁[lock record ptr | 00]或者重量级锁[monitor ptr | 10]操作。
当又有线程 B 来加锁:
B 发现 markword 是 A 的偏向锁,但是由于偏向锁解锁不会修改 markword,如果线程 A 已经没有了,VM 就要撤销 A 的偏向锁,变成无锁;如果 A 还有,就在线程 A 中保存 lock record ptr,锁升级为轻量级锁。
轻量级锁加锁:CAS 设置 markword,若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。
重量级锁加 monitor 锁
4:公平锁与非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。有可能造成优先级翻转,或者饥饿的线程(也就是某个线程一直得不到锁)
并发包中 ReentrantLock 的创建可以指定析构函数的 boolean 类型来得到公平锁或者非公平锁,默认是非公平锁
/*
创建一个可重入锁,true 表示公平锁,false 表示非公平锁。默认非公平锁
*/
Lock lock = new ReentrantLock(true);
因为非公平锁的优点在于吞吐量比公平锁大,对于synchronized而言,也是一种非公平锁
在 ReentrantLock 中一句用于实现公平锁和非公平锁
public ReentrantLock(boolean fair){
sync = fair ? FairSync() : new NonfairSync();
}
非公锁实现方式就是:首先获取到当前线程,判断当前锁的状态是否为 0,如果是,说明当前锁没有被其他线程占有,则利用 CAS 操作将锁的状态从 0 置为 1 成功后,将锁的持有者置为当前线程。
公平锁的实现,就是在非公平锁的实现上,加了一层判断 hasQueuedPredecessors(),该方法的大概意思是判断是否有线程等待的时间比当前线程等待时间还要久,如果有返回 true,则当前线程获取锁失败,如果没有返回 false,当前线程获取到锁,也就是判断当前线程是否是等待队列的队头元素
5:可重入锁和非可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者 class),不会因为之前已经获取过还没释放而阻塞。
Java 中 ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
为什么可重入锁就可以在嵌套调用时可以自动获得锁呢?
有多个人在排队打水,此时管理员允许锁和同一个人的多个水桶绑定。这个人用多个水桶打水时,第一个水桶和锁绑定并打完水之后,第二个水桶也可以直接和锁绑定并开始打水,所有的水桶都打完水之后打水人才会将锁还给管理员。这个人的所有打水流程都能够成功执行,后续等待的人也能够打到水。这就是可重入锁。
源码分析重入锁 ReentrantLock 以及非可重入锁 NonReentrantLock
ReentrantLock 和 NonReentrantLock 都继承父类 AQS,其父类 AQS 中维护了一个同步状态 status 来计数重入次数,status 初始值为 0。
当线程尝试获取锁时,可重入锁先尝试获取并更新 status 值,如果 status == 0 表示没有其他线程在执行同步代码,则把 status 置为 1,当前线程开始执行。如果 status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行 status+1,且当前线程可以再次获取锁。而非可重入锁是直接去获取并尝试更新当前 status 的值,如果 status != 0 的话会导致其获取锁失败,当前线程阻塞。
释放锁时,可重入锁同样先获取当前 status 的值,在当前线程是持有锁的线程的前提下。如果 status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。而非可重入锁则是在确定当前线程是持有锁的线程之后,直接将 status 置为 0,将锁释放。
6:独享锁(排它锁)与共享锁
独占锁:指该锁一次只能被一个线程所持有。对 ReentrantLock 和 Synchronized 而言都是独占锁,锁操作失败会将该线程睡眠,等待锁释放时被唤醒。
共享锁:指该锁可以被多个线程锁持有
对 ReentrantReadWriteLock 读锁是共享,写锁是独占。写的时候只能一个人写,但是读的时候,可以多个人同时读
读-读:能共存
读-写:不能共存
写-写:不能共存
当我们在进行写操作的时候,就需要转换成写锁
// 创建一个写锁
rwLock.writeLock().lock();
// 写锁 释放
rwLock.writeLock().unlock();
当们在进行读操作的时候,在转换成读锁
// 创建一个读锁
rwLock.readLock().lock();
// 读锁 释放
rwLock.readLock().unlock();
这里的读锁和写锁的区别在于,写锁一次只能一个线程进入,执行写操作,而读锁是多个线程能够同时进入,进行读取的操作
获取锁的过程:
- 当线程调用 acquireShared()申请获取锁资源时,如果成功,则进入临界区。
- 当获取锁失败时,则创建一个共享类型的节点并进入一个 FIFO 等待队列,然后被挂起等待唤醒。
- 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。
释放锁过程:
- 当线程调用 releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。
跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后,要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。
死锁
原因:系统资源分配不足;进程推进顺序不合适;资源分配不当;
条件:互斥,请求保持,不剥夺,循环等待
AQS
AbstractQueuedSynchronizer 抽象队列同步器
ReentrantLock、CountDownLatch、CycleBarrier 底层都是通过 AQS 来实现的
AQS 的核心思想:如果被请求的共享资源空闲,则将当前请求的资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及唤醒时锁分配的机制,这个 AQS 是用 CLH 队列锁实现的,即将暂时获取不到的锁的线程加入到队列中。CLH 队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列的实例,仅存在节点之间的关联关系。
AtomicInteger state,具体含义由子类实现,子类必须实现更改方法,必须是一个内部类。
有新进程来竞争该资源,拿不到就要添加的队列尾部,
AQS 是将每一条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node),来实现锁的分配
AQS 就是基于 CLH 队列,用 volatile 修饰共享变量 state,线程通过 CAS 去改变状态符,成功则获取锁成功,失败则进入等待队列,同时等待被唤醒。
注意:AQS 是自旋锁,在等待唤醒的时候,经常会使用自旋的方式,不断的尝试获取锁,直到被其它线程获取成功
实现了 AQS 的锁有:自旋锁、互斥锁、读写锁、条件变量、信号量、栅栏都是 AQS 的衍生物,具体实现如下
如上图所示,AQS 维护了一个 volatile int state 的变量 和 一个 FIFO 线程等待队列,多线程争用资源被阻塞的时候,就会进入这个队列中。state 就是共享资源,其访问方式有如下三种:
- getState()
- setState()
- compareAndSetState()
AQS 定义了两种资源共享方式
- Exclusive:独占,只有一个线程能执行,如 ReentrantLock
- Share:共享,多个线程可以同时执行,如 Semaphore、CountDownLatch、ReadWriteLock、CycleBarrier
不同的自定义同步器争用共享资源的方式也不同
ReentrantLock
以 ReentrantLock(可重入独占式锁)为例,state 初始化为 0,表示未锁定状态,A 线程 lock()时,会调用 tryAcquire()独占锁,并将 state + 1,之后其它线程在想通过 tryAcquire 的时候就会失败,知道 A 线程 unlock() 到 state = 0 为止,其它线程才有机会获取到该锁。A 释放锁之前,自己也是可以重复获取此锁(state 累加),这就是可重入的概念。
注意:获取多少次锁就需要释放多少次锁,保证 state 是能够回到 0
CountDownLatch
以 CountDownLatch 为例,任务分 N 个子线程执行,state 就初始化为 N,N 个线程并行执行,每个线程执行完之后 countDown() 一次,state 就会 CAS 减 1,当 N 子线程全部执行完毕,state = 0,hui unpark() 主调动线程,主调用线程就会从 await()函数返回,继续之后的动作。
一般来说,自定义同步器要么独占方式,要么共享方式,他们也需要实现 tryAcquire 和 tryRelease、 tryAcquireShared 和 tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器实现独占和共享两种方式,比如 ReentrantLockReadWriteLock。
- acquire() 和 acquireShared() 两种方式下,线程在等待队列中都是忽略中断的
- acquireInterruptibly() 和 acquireSharedInterruptibly() 是支持响应中断的
同步器一般包含两种方法,一种是 acquire,另一种是 release。acquire 操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而 release 操作则是通过某种方式改变同步状态,使得一或多个被 acquire 阻塞的线程继续执行。
可中断
final void lock() {
if (compareAndSetState(0, 1))<br /> setExclusiveOwnerThread(Thread.currentThread());<br /> else<br /> acquire(1);<br /> }<br /> public final void acquire(int arg) {<br /> if (!tryAcquire(arg) &&<br /> acquireQueued(addWaiter(Node.EXCLUSIVE), arg))<br /> selfInterrupt();<br /> }<br />通过 lockInterruptibly()方法获取某个锁,如果不能获取到,只有进行等待的情况下,是可以响应中断的,而用 synchronized 修饰的话,当一个线程处于等待某个锁的状态,是无法被中断的,只有一只等待下去。<br />打断之后 node 状态的变化(state 变为 cancelled)
5.6:线程协作
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
join()
在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。最后能够保证 a 线程的输出先于 b 线程的输出。
wait() notify() notifyAll()
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。这是因为设计者为了避免使用出现 lost wake up 问题而搞出来的。 (初始的时候 count 等于 0,这个时候消费者检查 count 的值,发现 count 小于等于 0 的条件成立;就在这个时候,发生了上下文切换,生产者进来了,噼噼啪啪一顿操作,把两个步骤都执行完了,也就是发出了通知,准备唤醒一个线程。这个时候消费者刚决定睡觉,还没睡呢,所以这个通知就会被丢掉。紧接着,消费者就睡过去了……没有来唤醒的了,造成死锁)
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
wait() 和 sleep() 的区别
- wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
- wait() 会释放锁,sleep() 不会。
await() signal() signalAll()
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。
相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。5.7:concurrent 并发包
JUC 包增加了并发编程中常用的工具类,用于定义类似于线程的自定义子 系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。并发容器
JUC 包下的容器类分为两部分,一部分是并发集合类,一部分是并发队列类,
1:ConcurrentLinkedQueue
一个基于连接节点的无界线程安全的队列,删除节点是将 item 设置为 null, 队列迭代时跳过 item 为 null 节点,所以不允许 null 入队,添加等都是使用 CAS 算法
synchronized
JDK 早期,synchronized 叫做重量级锁, 因为申请锁资源必须通过 kernel, 系统调用,后来调整为锁升级的过程:无锁 - 偏向锁 - 轻量级锁 (自旋锁,自适应自旋)- 重量级锁
synchronized:保证在同一时刻,只有一个线程可以执行某个方法或某个代码块,同时 synchronized 可以保证一个线程的变化可见(可见性),即可以代替 volatile。悲观锁,非公平锁(底层调用 mutex 锁,内核提供的这个锁并不保证公平)
可以修饰代码块,方法,静态方法,类
注意:用 synchronized 修饰不同类型的方法时,由于获取的对象头中的 monitor 对象可能不同,也可能会出现线程安全问题。因为:每个对象都存在一个 monitor 与之关联,对象与 monitor 之间的关系存在多种实现方式。
修饰实例方法:对当前实例加锁,进入方法需要获得当前实例的锁
修饰静态方法:对当前类对象加锁,进入静态方法需要获得当前类对象的锁
修饰代码块:对指定对象进行加锁,进入代码块需要获得指定对象的锁
volatile
Volatile 是 Java 虚拟机提供的轻量级的同步机制(三大特性)
- 保证可见性
- 不保证原子性
- 禁止指令重排
实现机制:相比于没加 volatile 关键字,汇编代码会加入一个 lock 前缀指令,相当于一个内存屏障(禁止处理器指令重排,而 vloatile 是禁止编译器重排序)。此时
(1)会将当前处理器缓存行的数据立即写会系统主存
(2)这个写回内存的操作会引起在其他 cpu 里缓存了该内存地址的数据无效(MES 协议)
线程 2 将 initFlag 的值 store 到主内存时要通过总线,cpu 总线嗅探机制监听到 initFlag 值被修改,线程 1 的 initFlag 失效,线程 1 需要重新 read initFlag 的值。
如果大量使用 volatile,由于 Volatile 的 MESI 缓存一致性协议,需要不断的从主内存嗅探和 cas 不断循环,无效交互会导致总线带宽达到峰值。造成总线风暴。
Q:synchronized 和 volatile 的区别是什么?
- volatile 是变量修饰符;synchronized 是修饰类、方法、代码段。
- volatile 仅能实现变量的修改可见性,不能保证原子性;而 synchronized 则可以保证变量的修改可见性和原子性。
- volatile 不会造成线程的阻塞;synchronized 可能会造成线程的阻塞。
Q:synchronized 和 lock 有什么区别?
- synchronized 属于 JVM 层面的,Lock 是 API 层面的
- synchronized 不需要手动释放锁,Lock 需要主动释放,否则可能出现死锁
- synchronized 不可中断,除非抛出异常或则正常运行完成,ReetrantLock 可以中断,可以设置超时方法。
重排序:
指令重排序包括:编译器优化重排,指令级并行重排,内存系统读写重排
指令:
store:将 cpu 缓存的数据刷新到主存中
load:将主存中的数据拷贝进 cpu
内存屏障种类:
volatile 写实现内存屏障“
对 volatile 变量进行写操作是,会在写操作后加入一条 store 屏障指令,将工作内存中的共享变量值刷新会到主内存。
原子类
对于 Java 中的运算操作,例如自增或自减,若没有进行额外的同步操作,在多线程环境下就是线程不安全的。num++解析为 num=num+1,明显,这个操作不具备原子性,多线程并发共享这个变量时必然会出现问题。就算加上 volatile 也不能保证其原子性。只有将其声明为原子类
其底层是使用的 CAS 算法,Unsafe
原子类一览:将普通变量升级为原子变量,主要是 AtomicIntegerFieldUpdater,在高并发情况下,LongAdder(累加器)比 AtomicLong 原子操作效率更高,LongAdder 累加器是 java8 新加入的
2:原子变量与 CAS 算法
标量原子变量类 AtomicInteger,AtomicLong 和 AtomicBoolean 类分别支持对原始数据类型 int,long 和 boolean 的操作。当引用变量需要以原子方式更新时,AtomicReference 类用于处理引用数据类型。
原子数组类 有三个类称为 AtomicIntegerArray,AtomicLongArray 和 AtomicReferenceArray,它们表示一个 int,long 和引用类型的数组,其元素可以进行原子性更新。
CAS
CAS(比较交换)是一种无锁非阻塞的算法实现,包含三个操作数:内存位置(V)、预期原值(A)和新值(B)
CAS 会导致 ABA 问题,线程 1 准备用 CAS 将变量的值由 A 替换为 B,在此之前,线程 2 将变量的值由 A 替换为 C,又由 C 替换为 A,然后线程 1 执行 CAS 时发现变量的值仍然为 A,所以 CAS 成功。但实际上这时的现场已经和最初不同了,尽管 CAS 成功,但可能存在潜藏的问题。
解决办法(版本号 AtomicStampedReference),基础类型简单值不需要版本号
Unsafe 类
AQS 等也是用来 CAS 算法,unsafe 类是 CAS 的核心类,原子等无锁操作,自旋操作都是 unsafe 类,Java 无法直接访问底层操作系统,而是通过本地 native 方法来访问,尽管如此,JVM 还是开了一个后门:Unsafe 它提供了硬件级别的原子操作。在底层调用汇编指令cmpxchg指令,这是一条汇编指令,所以 CPU 一次通过,是原子操作。
但是它设置了限制,不让上层开发者使用,可以通过反射进行获取,
// 对于使用不安全的操作,一个比较推荐的语法:
class MyTrustedClass {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private long myCountAddress = …;
public int getCount() { return unsafe.getByte(myCountAddress); }
方法声明为 native
单例的,
申请内存:All
3:ConcurrentHashMap
见前
4:CountDownLatch(闭锁)
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch 最重要的方法是 countDown()——倒数 和 await(),前者主要是倒数一次,后者是等待倒数到 0,如果没有到达 0,就只有阻塞等待了。
5:Lock 同步锁
6:Condition 控制线程通信
###
8:线程 8 锁
AbstractQueuedSynchronizer (AQS)
提供了一个基于 FIFO 队列
6.1 ConcurrentHashMap
6.3 Condition
6.4 CopyOnWriteArrayList
CopyOnWriteArrayList 是一个线程安全、并且在读操作时无锁的 ArrayList
6.5 CopyOnWriteArraySet
CopyOnWriteArraySet 基于 CopyOnWriteArrayList 实现,其唯一的不同是在 add 时调用的是 CopyOnWriteArrayList 的 addIfAbsent 方法。保证了无重复元素,但在 add 时每次都要进行数组的遍历,因此性能会略低于上个。
6.6 ArrayBlockingQueue
**6.7 ThreadPoolExecutor
ReentrantLock
ReenTrantLock 的实现是一种自旋锁,通过循环调用 CAS 操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态是我们去分析和理解锁设计的关键钥匙。
与 synchronized 的区别?
相同:都是可重入锁,阻塞的,
1:synchronized 是 java 关键字,是 JVM 层面的,而 ReentrantLock 是 API 层面的。
2:ReenTrantLock 可以指定是公平锁还是非公平锁。而 synchronized 只能是非公平锁。
3:ReenTrantLock 提供了一个 Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
4:ReenTrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly()来实现这个机制。synchronized 是不可中断的,一个线程获取不到锁就一直等着。
5:ReentantLock 相比于 Synchronized 可以更方便的获取锁,可以操作读写锁,可以唤醒指定线程等等,
6:Synchronized 适合于并发竞争低的情况,因为 Synchronized 的锁升级如果最终升级为重量级锁在使用的过程中是没有办法降级的(GC 必须),意味着每次都要和 cpu 去请求锁资源,而 ReentrantLock 主要是提供了阻塞的能力,通过在高并发下线程的挂起,来减少竞争,提高并发能力
ReetrantReadWriteLock 读写锁
Read 的时候是共享锁,Write 的时候是排它锁(互斥锁),默认使用非公平方式获取锁,可重入锁
锁降级:从写锁变成读锁,支持
锁升级:从读锁变成写锁,不支持,会产生死锁。
StampedLock
也是一种读写锁,提供两种读模式:乐观锁和悲观锁。
乐观读:允许读的过程中也可以获取写锁后写入,
1,CountDownLatch
使一个线程等待其他线程各自执行完毕后再执行。
是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为 0 时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
CountDownLatch .await();//当前线程阻塞,等待其他线程完成
CountDownLatch .countDown();//当前子线程结束,通知等待线程可以减一
2,CyclicBarrier
与 CountDownLatch 相反,这个类是为了帮助猿友们方便的实现多个线程一起启动的场景,就像赛跑一样,只要大家都准备好了,那就开始一起冲。也就是做加法。
CyclicBarrier 的字面意思就是可循环(cyclic)使用的屏障(Barrier)。它要求做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过 CyclicBarrier 的 await 方法。
barrier.await();//通知主可以加一了
区别:
CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:
- CountDownLatch 一般用于一个或多个线程,等待其他线程执行完任务后,再才执行
- CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行 另外,CountDownLatch 是减计数,计数减为 0 后不能重用;而 CyclicBarrier 是加计数,可置 0 后复用。
CountDownLatch: 计数器自己可以在任意地方减 1 countDown(), await()方法在线程外调用,当计数器为 0 的时候才可以执行线程外的后面的代码
CyclicBarrier:在线程内调用 await(),用于控制线程内的后面的代码的执行
CountDownLatch 用来控制线程外的代码,用于阻塞父线程的代码,CyclicBarrier 用于控制线程内的代码,用于阻塞当前线程的代码
ConditionObject 通知
synchronized 控制同步的时候,可以配合 Object 的 wait(),notify(),notifyAll()系列方法实现等待/通知模式,而 Lock,它提供了条件 Condition 接口,配合 await(),signal(),signalAll()等方法也可以实现等待/通知机制。ConditionObject 实现了 Condition 接口,给 AQS 提供条件变量的支持。
一个 Condition 包含一个等待队列,Condition 拥有节点(firstWaiter)和尾节点(lastWaiter),当前线程调用 Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
调用 Condition 的 signal 方法,将会唤醒在等待队列中等待时间最久的节点(首节点—),在唤醒节点之前,会将节点移动同步队列中。
3,Semaphore 信号量
信号量主要用于两个目的
- 一个是用于共享资源的互斥使用
-
4,Exchanger
5:FutureTask
在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future
进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。
Future:未来会产生的结果,异步执行。
future.get();还是阻塞的,虽然那时异步的,但是这还是阻塞,根本没啥用。
谷歌的 guava 类库使用监听者模式,监听一个装饰者的线程池,于是 JDK 抄了一个 CompleteListener6:BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:
FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
- 优先级队列 :PriorityBlockingQueue
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
- 生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置
如果提交任务是,线程池队列已满,这时会发生什么?
1、如果使用的是无界队列 LinkedBlockingQueue,也就是无界队列的话,没关系,继续添加任务到阻塞队列中等待执行,因为 LinkedBlockingQueue 可以近乎认为是一个无穷大的队列,可以无限存放任务
2、如果使用的是有界队列比如 ArrayBlockingQueue,任务首先会被添加到 ArrayBlockingQueue 中,ArrayBlockingQueue 满了,会根据 maximumPoolSize 的值增加线程数量,如果增加了线程数量还是处理不过来,ArrayBlockingQueue 继续满,那么则会使用拒绝策略,RejectedExecutionHandler 处理满了的任务,默认是 AbortPolicy
7:ForkJoin
主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。