一、请谈谈你对volatile的理解。
1、定义:volatile是JVM提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排(保证有序性)
2、JMM内存模型之可见性
JMM(Java内存模型Java Memory Model,简称JMM)本身是一种抽象的概念并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
JMM关于同步的规定:
- 线程解锁前,必须把共享变量的值刷新回主内存
- 线程加锁前,必须读取主内存的最新值到自己的工作内存
- 加锁解锁是同一把锁
由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成,其简要访问过程如下图:
可见性
通过前面对JMM的介绍,我们知道各个线程对主内存中共享变量的操作都是各个线程各自拷贝到自己的工作内存进行操作后再写回到主内存中的。
这就可能存在一个线程AAA修改了共享变量X的值但还未写回主内存时,另外一个线程BBB又对主内存中同一个共享变量X进行操作,但此时A线程工作内存中共享变量x对线程B来说并不可见,这种工作内存与主内存同步延迟现象就造成了可见性问题
[
](https://blog.csdn.net/u011863024/article/details/114684428)
3、可见性代码验证说明
import java.util.concurrent.TimeUnit;
/**
* 假设是主物理内存
*/
class MyData {
//volatile int number = 0;
int number = 0;
public void addTo60() {
this.number = 60;
}
}
/**
* 验证volatile的可见性
* 1. 假设int number = 0, number变量之前没有添加volatile关键字修饰
*/
public class VolatileDemo {
public static void main(String args []) {
// 资源类
MyData myData = new MyData();
// AAA线程 实现了Runnable接口的,lambda表达式
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
// 线程睡眠3秒,假设在进行运算
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改number的值
myData.addTo60();
// 输出修改后的值
System.out.println(Thread.currentThread().getName() + "\t update number value:" + myData.number);
}, "AAA").start();
// main线程就一直在这里等待循环,直到number的值不等于零
while(myData.number == 0) {}
// 按道理这个值是不可能打印出来的,因为主线程运行的时候,number的值为0,所以一直在循环
// 如果能输出这句话,说明AAA线程在睡眠3秒后,更新的number的值,重新写入到主内存,并被main线程感知到了
System.out.println(Thread.currentThread().getName() + "\t mission is over");
}
}
由于没有volatile修饰MyData类的成员变量number,main线程将会卡在while(myData.number == 0) {},不能正常结束。若想正确结束,用volatile修饰MyData类的成员变量number吧。
4、volatile 不保证原子性
原子性定义:不可分割,完整性,也即某个线程正在做某个具体业务时,中间不可以被加塞或者被分割。需要整体完整要么同时成功,要么同时失败。
(1)不保证原子性案例演示
class MyData2 {
/**
* volatile 修饰的关键字,是为了增加 主线程和线程之间的可见性,只要有一个线程修改了内存中的值,其它线程也能马上感知
*/
volatile int number = 0;
public void addPlusPlus() {
number ++;
}
}
public class VolatileAtomicityDemo {
public static void main(String[] args) {
MyData2 myData = new MyData2();
// 创建10个线程,线程里面进行1000次循环
for (int i = 0; i < 20; i++) {
new Thread(() -> {
// 里面
for (int j = 0; j < 1000; j++) {
myData.addPlusPlus();
}
}, String.valueOf(i)).start();
}
// 需要等待上面20个线程都计算完成后,在用main线程取得最终的结果值
// 这里判断线程数是否大于2,为什么是2?因为默认是有两个线程的,一个main线程,一个gc线程
while(Thread.activeCount() > 2) {
// yield表示不执行
Thread.yield();
}
// 查看最终的值
// 假设volatile保证原子性,那么输出的值应该为: 20 * 1000 = 20000
System.out.println(Thread.currentThread().getName() + "\t finally number value: " + myData.number);
}
}
(2)不保证原子性理论解析
number++在多线程下是非线程安全的。
我们可以将代码编译成字节码,可看出number++被编译成3条指令。
假设我们没有加 synchronized那么第一步就可能存在着,三个线程同时通过getfield命令,拿到主存中的 n值,然后三个线程,各自在自己的工作内存中进行加1操作,但他们并发进行 iadd 命令的时候,因为只能一个进行写,所以其它操作会被挂起,假设1线程,先进行了写操作,在写完后,volatile的可见性,应该需要告诉其它两个线程,主内存的值已经被修改了,但是因为太快了,其它两个线程,陆续执行 iadd命令,进行写入操作,这就造成了其他线程没有接受到主内存n的改变,从而覆盖了原来的值,出现写丢失,这样也就让最终的结果少于20000。
(3)不保证原子性问题解决
可加synchronized解决,但它是重量级同步机制,性能上有所顾虑。
如何不加synchronized解决number++在多线程下是非线程安全的问题?使用AtomicInteger。
import java.util.concurrent.atomic.AtomicInteger;
class MyData2 {
/**
* volatile 修饰的关键字,是为了增加 主线程和线程之间的可见性,只要有一个线程修改了内存中的值,其它线程也能马上感知
*/
volatile int number = 0;
AtomicInteger number2 = new AtomicInteger();
public void addPlusPlus() {
number ++;
}
public void addPlusPlus2() {
number2.getAndIncrement();
}
}
public class VolatileAtomicityDemo {
public static void main(String[] args) {
MyData2 myData = new MyData2();
// 创建10个线程,线程里面进行1000次循环
for (int i = 0; i < 20; i++) {
new Thread(() -> {
// 里面
for (int j = 0; j < 1000; j++) {
myData.addPlusPlus();
myData.addPlusPlus2();
}
}, String.valueOf(i)).start();
}
// 需要等待上面20个线程都计算完成后,在用main线程取得最终的结果值
// 这里判断线程数是否大于2,为什么是2?因为默认是有两个线程的,一个main线程,一个gc线程
while(Thread.activeCount() > 2) {
// yield表示不执行
Thread.yield();
}
// 查看最终的值
// 假设volatile保证原子性,那么输出的值应该为: 20 * 1000 = 20000
System.out.println(Thread.currentThread().getName() + "\t finally number value: " + myData.number);
System.out.println(Thread.currentThread().getName() + "\t finally number2 value: " + myData.number2);
}
}
输出结果为:
main finally number value: 18766
main finally number2 value: 20000
5、volatile指令重排
计算机在执行程序时,为了提高性能,编译器和处理器的常常会对指令做重排,一般分以下3种:
单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。
处理器在进行重排序时必须要考虑指令之间的数据依赖性
多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测。
(1)禁止指令重排小总结
volatile实现禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象。
先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个CPU指令,它的作用有两个:
- 保证特定操作的执行顺序,
- 保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)。
由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条Memory Barrier则会告诉编译器和CPU,不管什么指令都不能和这条Memory Barrier指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。内存屏障另外一个作用是强制刷出各种CPU的缓存数据,因此任何CPU上的线程都能读取到这些数据的最新版本。
对volatile变量进行写操作时,会在写操作后加入一条store屏障指令,将工作内存中的共享变量值刷新回到主内存。
对Volatile变量进行读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。
(2)线程安全性获得保证
- 工作内存与主内存同步延迟现象导致的可见性问题 - 可以使用synchronized或volatile关键字解决,它们都可以使一个线程修改后的变量立即对其他线程可见。
- 对于指令重排导致的可见性问题和有序性问题 - 可以利用volatile关键字解决,因为volatile的另外一个作用就是禁止重排序优化。
6、单例模式volatile分析
DCL(Double Check Lock 双端检锁机制)
public class SingletonDemo{
private SingletonDemo(){}
private volatile static SingletonDemo instance = null;
public static SingletonDemo getInstance() {
if(instance == null) {
synchronized(SingletonDemo.class){
if(instance == null){
instance = new SingletonDemo();
}
}
}
return instance;
}
}
解析:
原因在于某一个线程执行到第一次检测,读取到的instance不为null时,instance的引用对象可能没有完成初始化。instance = new SingletonDemo();可以分为以下3步完成(伪代码):
memory = allocate(); //1.分配对象内存空间
instance(memory); //2.初始化对象
instance = memory; //3.设置instance指向刚分配的内存地址,此时instance != null
步骤2和步骤3不存在数据依赖关系,而且无论重排前还是重排后程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的。
memory = allocate(); //1.分配对象内存空间
instance = memory;//3.设置instance指向刚分配的内存地址,此时instance! =null,但是对象还没有初始化完成!
instance(memory);//2.初始化对象
但是指令重排只会保证串行语义的执行的一致性(单线程),但并不会关心多线程间的语义一致性。
所以当一条线程访问instance不为null时,由于instance实例未必已初始化完成,也就造成了线程安全问题。
二、CAS解析(Compare And Set)
1、示例程序
public class CASDemo{
public static void main(string[] args){
AtomicInteger atomicInteger = new AtomicInteger(5);// mian do thing. . . . ..
System.out.println(atomicInteger.compareAndSet(5, 2019)+"\t current data: "+atomicInteger.get());
System.out.println(atomicInteger.compareAndset(5, 1024)+"\t current data: "+atomicInteger.get());
}
}
输出结果:
true 2019
false 2019
2、底层原理(上)
CAS底层原理知道吗?如果知道,请谈谈你对UnSafe的理解!
atomiclnteger.getAndIncrement();源码
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
...
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
...
}
(1)UnSafe
1、Unsafe:是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。
注意:**Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务。
2、变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。
3、变量value**用volatile修饰,保证了多线程之间的内存可见性。
(2)CAS是什么
CAS的全称为Compare-And-Swap,它是一条CPU并发原语。
它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。
CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。(原子性)
3、底层原理(下)
4、CAS缺点
(1)循环时间长开销很大
// ursafe.getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4){
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
}while(!this.compareAndSwapInt(varl, var2, var5,var5 + var4));
return var5;
}
我们可以看到getAndAddInt方法执行时,有个do while,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。
(2)只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
三、ABA问题
CAS会导致“ABA问题”
CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。
比如说一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。
尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
1、AtomicReference原子引用
import java.util.concurrent.atomic.AtomicReference;
class User{
String userName;
int age;
public User(String userName, int age) {
this.userName = userName;
this.age = age;
}
@Override
public String toString() {
return String.format("User [userName=%s, age=%s]", userName, age);
}
}
public class AtomicReferenceDemo {
public static void main(String[] args){
User z3 = new User( "z3",22);
User li4 = new User("li4" ,25);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(z3);
System.out.println(atomicReference.compareAndSet(z3, li4)+"\t"+atomicReference.get().toString());
System.out.println(atomicReference.compareAndSet(z3, li4)+"\t"+atomicReference.get().toString());
}
}
输出结果
true User [userName=li4, age=25]
false User [userName=li4, age=25]
2、AtomicStampedReference版本号原子引用
原子引用 + 新增一种机制,那就是修改版本号(类似时间戳),它用来解决ABA问题。
3、ABA问题解决
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABADemo {
/**
* 普通的原子引用包装类
*/
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
// 传递两个值,一个是初始值,一个是初始版本号
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
System.out.println("============以下是ABA问题的产生==========");
new Thread(() -> {
// 把100 改成 101 然后在改成100,也就是ABA
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}, "t1").start();
new Thread(() -> {
try {
// 睡眠一秒,保证t1线程,完成了ABA操作
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 把100 改成 101 然后在改成100,也就是ABA
System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
}, "t2").start();
/
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
/
System.out.println("============以下是ABA问题的解决==========");
new Thread(() -> {
// 获取版本号
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第一次版本号" + stamp);
// 暂停t3一秒钟
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 传入4个值,期望值,更新值,期望版本号,更新版本号
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第二次版本号" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 第三次版本号" + atomicStampedReference.getStamp());
}, "t3").start();
new Thread(() -> {
// 获取版本号
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 第一次版本号" + stamp);
// 暂停t4 3秒钟,保证t3线程也进行一次ABA问题
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t 修改成功否:" + result + "\t 当前最新实际版本号:"
+ atomicStampedReference.getStamp());
System.out.println(Thread.currentThread().getName() + "\t 当前实际最新值" + atomicStampedReference.getReference());
}, "t4").start();
}
}
输出结果
============以下是ABA问题的产生==========
true 2019
============以下是ABA问题的解决==========
t3 第一次版本号1
t4 第一次版本号1
t3 第二次版本号2
t3 第三次版本号3
t4 修改成功否:false 当前最新实际版本号:3
t4 当前实际最新值100
四、CopyOnWriteArrayList/CopyOnWriteArraySet/ConcurrencyMap解决多线程安全问题。
五、理解公平锁、非公平锁、可重入锁、递归锁、自旋锁。手写自旋锁!
1、公平锁与非公平锁
(1)定义
- 公平锁是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到。
- 非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后中请的线程比先中请的线程优先获取锁。在高并发的情况下,有可能会造成优先级反转或者饥饿现象
(2)区别
- 公平锁:就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。
- 非公平锁:比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
(3)题外话
Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。
非公平锁的优点在于吞吐量比公平锁大。
对于Synchronized而言,也是一种非公平锁。
2、可重入锁和递归锁理论知识
(1)定义
可重入锁(也叫做递归锁):指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。也即是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。ReentrantLock/synchronized就是一个典型的可重入锁。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
(2)代码验证
Synchronized验证
class Phone {
public synchronized void sendSMS() throws Exception{
System.out.println(Thread.currentThread().getName() + "\t invoked sendSMS()");
// 在同步方法中,调用另外一个同步方法
sendEmail();
}
public synchronized void sendEmail() throws Exception{
System.out.println(Thread.currentThread().getId() + "\t invoked sendEmail()");
}
}
public class SynchronizedReentrantLockDemo {
public static void main(String[] args) {
Phone phone = new Phone();
// 两个线程操作资源列
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "t2").start();
}
}
输出结果
t1 invoked sendSMS()
11 invoked sendEmail()
t2 invoked sendSMS()
12 invoked sendEmail()
ReentrantLock验证
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Phone2 implements Runnable{
Lock lock = new ReentrantLock();
/**
* set进去的时候,就加锁,调用set方法的时候,能否访问另外一个加锁的set方法
*/
public void getLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t get Lock");
setLock();
} finally {
lock.unlock();
}
}
public void setLock() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "\t set Lock");
} finally {
lock.unlock();
}
}
@Override
public void run() {
getLock();
}
}
public class ReentrantLockDemo {
public static void main(String[] args) {
Phone2 phone = new Phone2();
/**
* 因为Phone实现了Runnable接口
*/
Thread t3 = new Thread(phone, "t3");
Thread t4 = new Thread(phone, "t4");
t3.start();
t4.start();
}
}
输出结果
t3 get Lock
t3 set Lock
t4 get Lock
t4 set Lock
3、自旋锁理论知识
(1)定义
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
(2)代码验证
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockDemo {
// 现在的泛型装的是Thread,原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in ");
// 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否者自旋
while(!atomicReference.compareAndSet(null, thread)) {
//摸鱼
}
}
public void myUnLock() {
// 获取当前进来的线程
Thread thread = Thread.currentThread();
// 自己用完了后,把atomicReference变成null
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
// 启动t1线程,开始操作
new Thread(() -> {
// 开始占有锁
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 开始释放锁
spinLockDemo.myUnLock();
}, "t1").start();
// 让main线程暂停1秒,使得t1线程,先执行
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 1秒后,启动t2线程,开始占用这个锁
new Thread(() -> {
// 开始占有锁
spinLockDemo.myLock();
// 开始释放锁
spinLockDemo.myUnLock();
}, "t2").start();
}
}
输出结果
t1 come in
t2 come in
t1 invoked myUnlock()
t2 invoked myUnlock()
4、读写锁理论知识
(1)定义
- 独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁
- 共享锁:指该锁可被多个线程所持有。
多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。但是,如果有一个线程想去写共享资源来,就不应该再有其它线程可以对该资源进行读或写。
对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。
读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
(2)代码验证
package com.lun.concurrency;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache2 {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
// 创建一个写锁
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);
try {
// 模拟网络拥堵,延迟0.3秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 写锁 释放
rwLock.writeLock().unlock();
}
}
public void get(String key) {
// 读锁
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t 正在读取:");
try {
// 模拟网络拥堵,延迟0.3秒
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object value = map.get(key);
System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + value);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 读锁释放
rwLock.readLock().unlock();
}
}
public void clean() {
map.clear();
}
}
public class ReadWriteWithLockDemo {
public static void main(String[] args) {
MyCache2 myCache = new MyCache2();
// 线程操作资源类,5个线程写
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, String.valueOf(i)).start();
}
// 线程操作资源类, 5个线程读
for (int i = 1; i <= 5; i++) {
// lambda表达式内部必须是final
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, String.valueOf(i)).start();
}
}
}
输出结果
1 正在写入:1
1 写入完成
2 正在写入:2
2 写入完成
3 正在写入:3
3 写入完成
5 正在写入:5
5 写入完成
4 正在写入:4
4 写入完成
2 正在读取:
3 正在读取:
1 正在读取:
5 正在读取:
4 正在读取:
3 读取完成:3
2 读取完成:2
1 读取完成:1
5 读取完成:5
4 读取完成:4
六、CountDownLatch、CyclicBarrier、Semaphore 理解
1、CountDownLatch(向下递减)
(1)作用:让一线程阻塞直到另一些线程完成一系列操作才被唤醒。
(2)CountDownLatch主要有两个方法(await(),countDown())。
(3)说明:
当一个或多个线程调用await()时,调用线程会被阻塞。其它线程调用countDown()会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。
(4)案例1
假设一个自习室里有7个人,其中有一个是班长,班长的主要职责就是在其它6个同学走了后,关灯,锁教室门,然后走人,因此班长是需要最后一个走的,那么有什么方法能够控制班长这个线程是最后一个执行,而其它线程是随机执行的
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 上完自习,离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t 班长最后关门");
}
}
输出结果:
0 上完自习,离开教室
6 上完自习,离开教室
4 上完自习,离开教室
5 上完自习,离开教室
3 上完自习,离开教室
1 上完自习,离开教室
2 上完自习,离开教室
main 班长最后关门
(5)案例2:枚举+CountDownLatch
import java.util.Objects;
public enum CountryEnum {
ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");
private Integer retcode;
private String retMessage;
CountryEnum(Integer retcode, String retMessage) {
this.retcode = retcode;
this.retMessage = retMessage;
}
public static CountryEnum forEach_countryEnum(int index) {
CountryEnum[] myArray = CountryEnum.values();
for(CountryEnum ce : myArray) {
if(Objects.equals(index, ce.getRetcode())) {
return ce;
}
}
return null;
}
public Integer getRetcode() {
return retcode;
}
public void setRetcode(Integer retcode) {
this.retcode = retcode;
}
public String getRetMessage() {
return retMessage;
}
public void setRetMessage(String retMessage) {
this.retMessage = retMessage;
}
}
import java.util.concurrent.CountDownLatch;
public class UnifySixCountriesDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "国被灭了!");
countDownLatch.countDown();
}, CountryEnum.forEach_countryEnum(i).getRetMessage()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " 秦国统一中原。");
}
}
输出结果:
齐国被灭了!
燕国被灭了!
楚国被灭了!
魏国被灭了!
韩国被灭了!
赵国被灭了!
main 秦国统一中原。
2、CyclicBarrier
(1)定义
CyclicBarrier的字面意思就是可循环(Cyclic)使用的屏障(Barrier)。它要求做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await方法。
CyclicBarrier与CountDownLatch的区别:CyclicBarrier可重复多次,而CountDownLatch只能是一次。
(2)案例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class SummonTheDragonDemo {
public static void main(String[] args) {
/**
* 定义一个循环屏障,参数1:需要累加的值,参数2 需要执行的方法
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
final Integer tempInt = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 收集到 第" + tempInt + "颗龙珠");
try {
// 先到的被阻塞,等全部线程完成后,才能执行方法
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
输出结果
2 收集到 第2颗龙珠
6 收集到 第6颗龙珠
1 收集到 第1颗龙珠
7 收集到 第7颗龙珠
5 收集到 第5颗龙珠
4 收集到 第4颗龙珠
3 收集到 第3颗龙珠
召唤神龙
3、Semaphore信号量
(1)作用
- 用于多个共享资源的互斥使用
- 用于并发线程数的控制。
正常的锁(concurrency.locks或synchronized锁)在任何时刻都只允许一个任务访问一项资源,而 Semaphore允许n个任务同时访问这个资源。
(2)案例:模拟抢车位场景
假设一共有6辆车,3个停车位
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
/**
* 初始化一个信号量为3,默认是false 非公平锁, 模拟3个停车位
*/
Semaphore semaphore = new Semaphore(3, false);
// 模拟6部车
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
// 代表一辆车,已经占用了该车位
semaphore.acquire(); // 抢占
System.out.println(Thread.currentThread().getName() + "\t 抢到车位");
// 每个车停3秒
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t 离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放停车位
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
输出结果:
1 抢到车位
2 抢到车位
0 抢到车位
0 离开车位
2 离开车位
1 离开车位
5 抢到车位
4 抢到车位
3 抢到车位
5 离开车位
4 离开车位
3 离开车位
七、阻塞队列
1、阻塞队列理论
- 阻塞队列有没有好的一面
- 不得不阻塞,你如何管理
2、阻塞队列接口结构和实现类
(1)定义
顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素。
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
同样试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增。
(2)为什么用?有什么好处?
在多线程领域:所谓阻塞,在某些情况下余挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。
在Concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
(3)架构介绍
(4)种类分析
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
- DelayQueue:使用优先级队列实现延迟无界阻塞队列。
- SynchronousQueue:不存储元素的阻塞队列。
- 没有容量
- 不存储元素
- 每一个put必须等待一个take,否则不能继续添加元素,反之亦然
- LinkedTransferQueue:由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
(5)BlockingQueue的核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | reemove(e) | poll() | take() | poll(time, unit ) |
检查 | element() | peek() | 不可用 | 不可用 |
性质 | 说明 |
---|---|
抛出异常 | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full 当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException |
特殊性 | 插入方法,成功true,失败false 移除方法:成功返回出队列元素,队列没有就返回空 |
一直阻塞 | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。 |
超时退出 | 当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
(6)SynchronousQueueDemo
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
3、Synchronized和Lock有什么区别?
- synchronized属于JVM层面,属于java的关键字;Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁
- monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象 只能在同步块或者方法中才能调用 wait/ notify等方法)
- 使用方法:
- synchronized:不需要用户去手动释放锁,当synchronized代码执行后,系统会自动让线程释放对锁的占用。
- ReentrantLock:则需要用户去手动释放锁,若没有主动释放锁,就有可能出现死锁的现象,需要lock() 和 unlock() 配置try catch语句来完成
- 等待是否中断
- synchronized:不可中断,除非抛出异常或者正常运行完成。
- ReentrantLock:可中断,可以设置超时方法
- 设置超时方法,trylock(long timeout, TimeUnit unit)
- lockInterrupible() 放代码块中,调用interrupt() 方法可以中断
- 加锁是否公平
- synchronized:非公平锁
- ReentrantLock:默认非公平锁,构造函数可以传递boolean值,true为公平锁,false为非公平锁
- 锁绑定多个条件Condition
- synchronized:没有,要么随机,要么全部唤醒
- ReentrantLock:用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像synchronized那样,要么随机,要么全部唤醒
4、锁绑定多个条件Condition Demo
案例:
多线程之间按顺序调用,实现 A-> B -> C 三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次
紧接着
AA打印5次,BB打印10次,CC打印15次
…
来10轮
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource {
// A 1 B 2 c 3
private int number = 1;
// 创建一个重入锁
private Lock lock = new ReentrantLock();
// 这三个相当于备用钥匙
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
// 判断
while(number != 1) {
// 不等于1,需要等待
condition1.await();
}
// 干活
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t " + number + "\t" + i);
}
// 唤醒 (干完活后,需要通知B线程执行)
number = 2;
// 通知2号去干活了
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
// 判断
while(number != 2) {
// 不等于1,需要等待
condition2.await();
}
// 干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t " + number + "\t" + i);
}
// 唤醒 (干完活后,需要通知C线程执行)
number = 3;
// 通知2号去干活了
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
// 判断
while(number != 3) {
// 不等于1,需要等待
condition3.await();
}
// 干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t " + number + "\t" + i);
}
// 唤醒 (干完活后,需要通知C线程执行)
number = 1;
// 通知1号去干活了
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SynchronizedAndReentrantLockDemo {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
int num = 10;
new Thread(() -> {
for (int i = 0; i < num; i++) {
shareResource.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < num; i++) {
shareResource.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < num; i++) {
shareResource.print15();
}
}, "C").start();
}
}
5、生产者消费者阻塞队列版案例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource {
// 默认开启,进行生产消费
// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改
private volatile boolean FLAG = true;
// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而应该采用依赖注入里面的,构造注入方法传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
public void myProducer() throws Exception{
String data = null;
boolean retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 2秒存入1个data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");
}
public void myConsumer() throws Exception{
String retValue;
// 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
// 当FLAG为true的时候,开始生产
while(FLAG) {
// 2秒存入1个data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );
// 退出消费队列
return;
}
}
}
/**
* 停止生产的判断
*/
public void stop() {
this.FLAG = false;
}
}
public class ProducerConsumerWithBlockingQueueDemo {
public static void main(String[] args) {
// 传入具体的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动\n\n");
try {
myResource.myProducer();
System.out.println("\n");
} catch (Exception e) {
e.printStackTrace();
}
}, "producer").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
// 5秒后,停止生产和消费
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n\n5秒中后,生产和消费线程停止,线程结束");
myResource.stop();
}
}
6、Callable接口
Callable接口,是一种让线程执行完成后,能够返回结果的。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " come in Callable");
TimeUnit.SECONDS.sleep(2);
return 1024;
}
}
public class CallableDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask, "A").start();
new Thread(futureTask, "B").start();//多个线程执行 一个FutureTask的时候,只会计算一次
// 输出FutureTask的返回值
System.out.println("result FutureTask " + futureTask.get());
}
}
八、线程池
1、定义、优点
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
使用及其优势。
它的主要特点为:线程复用,控制最大并发数,管理线程。
优点:
- 降低资源消耗。通过重复利用己创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
[
](https://blog.csdn.net/u011863024/article/details/114684428)
2、线程池的3个常用方式
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。
(1)Executors.newScheduledThreadPool()
(2)Executors.newWorkStealingPool(int) - Java8新增,使用目前机器上可用的处理器作为它的并行级别
(3)Executors.newSingleThreadExecutor()
特点:创建一个单线程化的线程池,他只会用唯一的工作线程来执行任务,保证所有的任务按照指定顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
(4)Executors.newFixedThreadPool(int)
特点:创建一个定长线程池,可控制线程最大并发数,超出的线程会在线程中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
(5)Executors.newCachedThreadPool()
特点:创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。(当线程空闲超过60s,就销毁线程!)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3、线程池七大参数介绍
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
...
}
- corePoolSize:线程池中的常驻核心线程数
- 在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程。
- 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
- maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1
- keepAliveTime:多余的空闲线程的存活时间。
- 当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止
- unit:keepAliveTime的单位。
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可。
- handler:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数( maximumPoolSize)。
4、底层工作原理
- 在创建了线程池后,等待提交过来的任务请求。
- 当调用execute()方法添加一个请求任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断:
- 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后它最终会收缩到corePoolSize的大小。
5、线程池的4种拒绝策略
定义:等待队列也已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。
这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK拒绝策略:
- AbortPolicy(默认):直接抛出 RejectedExecutionException异常阻止系统正常运知。
- CallerRunsPolicy:”调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口
6、线程池实际使用(自定义)
(1)定义
3.【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
4.【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下: 1) FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2) CachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。 阿里巴巴《Java 开发手册》
(2)案例(手写和改造拒绝策略)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolExecutorDemo {
public static void doSomething(ExecutorService executorService, int numOfRequest) {
try {
System.out.println(((ThreadPoolExecutor)executorService).getRejectedExecutionHandler().getClass() + ":");
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < numOfRequest; i++) {
final int tempInt = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tempInt + " 办理业务");
});
}
TimeUnit.SECONDS.sleep(1);
System.out.println("\n\n");
} catch (Exception e) {
System.err.println(e);
} finally {
executorService.shutdown();
}
}
public static ExecutorService newMyThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, int blockingQueueSize, RejectedExecutionHandler handler){
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,//keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(blockingQueueSize),
Executors.defaultThreadFactory(),
handler);
}
public static void main(String[] args) {
doSomething(newMyThreadPoolExecutor(2, 5, 3, new ThreadPoolExecutor.AbortPolicy()), 10);
doSomething(newMyThreadPoolExecutor(2, 5, 3, new ThreadPoolExecutor.CallerRunsPolicy()), 20);
doSomething(newMyThreadPoolExecutor(2, 5, 3, new ThreadPoolExecutor.DiscardOldestPolicy()), 10);
doSomething(newMyThreadPoolExecutor(2, 5, 3, new ThreadPoolExecutor.DiscardPolicy()), 10);
}
}
(3)配置合理的线程数(两种策略)
CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),
而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量:
一般公式:(CPU核数+1)个线程的线程池
IO密集型
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数 * 2。
IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。
所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO密集型时,大部分线程都阻塞,故需要多配置线程数:
参考公式:CPU核数/ (1-阻塞系数)
阻塞系数在0.8~0.9之间
比如8核CPU:8/(1-0.9)=80个线程数
九、死锁编码及定位分析
1、定义
死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够碍到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
2、产生死锁主要原因
- 系统资源不足
- 进程运行推进的顺序不合适
- 资源分配不当
3、发生死锁的四个条件
- 互斥条件,线程使用的资源至少有一个不能共享的。
- 至少有一个线程必须持有一个资源且正在等待获取一个当前被别的线程持有的资源。
- 资源不能被抢占。
- 循环等待。
4、如何解决死锁问题
破坏发生死锁的四个条件其中之一即可。
5、产生死锁的代码
package com.lun.concurrency;
import java.util.concurrent.TimeUnit;
class MyTask implements Runnable{
private Object resourceA, resourceB;
public MyTask(Object resourceA, Object resourceB) {
this.resourceA = resourceA;
this.resourceB = resourceB;
}
@Override
public void run() {
synchronized (resourceA) {
System.out.println(String.format("%s 自己持有%s,尝试持有%s",//
Thread.currentThread().getName(), resourceA, resourceB));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (resourceB) {
System.out.println(String.format("%s 同时持有%s,%s",//
Thread.currentThread().getName(), resourceA, resourceB));
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
Object resourceA = new Object();
Object resourceB = new Object();
new Thread(new MyTask(resourceA, resourceB),"Thread A").start();
new Thread(new MyTask(resourceB, resourceA),"Thread B").start();
}
}
6、查看是否死锁工具
- jps命令定位进程号
- jstack找到死锁查看 ```bash C:\Users\abc>jps -l 11968 com.lun.concurrency.DeadLockDemo 6100 jdk.jcmd/sun.tools.jps.Jps 6204 Eclipse
C:\Users\abc>jstack 11968 2021-03-09 02:42:46 Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.251-b08 mixed mode):
“DestroyJavaVM” #13 prio=5 os_prio=0 tid=0x00000000004de800 nid=0x2524 waiting on condition [0 x0000000000000000] java.lang.Thread.State: RUNNABLE
“Thread B” #12 prio=5 os_prio=0 tid=0x000000001e0a5800 nid=0x6bc waiting for monitor entry [0x 000000001efae000] java.lang.Thread.State: BLOCKED (on object monitor) at com.lun.concurrency.MyTask.run(DeadLockDemo.java:27)
- waiting to lock <0x000000076b431d80> (a java.lang.Object)
- locked <0x000000076b431d90> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
“Thread A” #11 prio=5 os_prio=0 tid=0x000000001e0a4800 nid=0x650 waiting for monitor entry [0x 000000001eeae000] java.lang.Thread.State: BLOCKED (on object monitor) at com.lun.concurrency.MyTask.run(DeadLockDemo.java:27)
- waiting to lock <0x000000076b431d90> (a java.lang.Object)
- locked <0x000000076b431d80> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
“Service Thread” #10 daemon prio=9 os_prio=0 tid=0x000000001e034000 nid=0x2fb8 runnable [0x000 0000000000000] java.lang.Thread.State: RUNNABLE
“C1 CompilerThread3” #9 daemon prio=9 os_prio=2 tid=0x000000001dffa000 nid=0x26e8 waiting on c ondition [0x0000000000000000] java.lang.Thread.State: RUNNABLE
“C2 CompilerThread2” #8 daemon prio=9 os_prio=2 tid=0x000000001dff6000 nid=0x484 waiting on co ndition [0x0000000000000000] java.lang.Thread.State: RUNNABLE
“C2 CompilerThread1” #7 daemon prio=9 os_prio=2 tid=0x000000001dfe0800 nid=0x35c8 waiting on c ondition [0x0000000000000000] java.lang.Thread.State: RUNNABLE
“C2 CompilerThread0” #6 daemon prio=9 os_prio=2 tid=0x000000001dfde800 nid=0x3b7c waiting on c ondition [0x0000000000000000] java.lang.Thread.State: RUNNABLE
“Attach Listener” #5 daemon prio=5 os_prio=2 tid=0x000000001dfdd000 nid=0x3834 waiting on cond ition [0x0000000000000000] java.lang.Thread.State: RUNNABLE
“Signal Dispatcher” #4 daemon prio=9 os_prio=2 tid=0x000000001dfdb000 nid=0x214 runnable [0x00 00000000000000] java.lang.Thread.State: RUNNABLE
“Finalizer” #3 daemon prio=8 os_prio=1 tid=0x000000001df70800 nid=0x2650 in Object.wait() [0x0 00000001e54f000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b388ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
- locked <0x000000076b388ee0> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:216)
“Reference Handler” #2 daemon prio=10 os_prio=2 tid=0x000000001c17d000 nid=0x1680 in Object.wa it() [0x000000001e44f000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method)
- waiting on <0x000000076b386c00> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x000000076b386c00> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
“VM Thread” os_prio=2 tid=0x000000001c178000 nid=0x3958 runnable
“GC task thread#0 (ParallelGC)” os_prio=0 tid=0x0000000002667800 nid=0xd3c runnable
“GC task thread#1 (ParallelGC)” os_prio=0 tid=0x0000000002669000 nid=0x297c runnable
“GC task thread#2 (ParallelGC)” os_prio=0 tid=0x000000000266a800 nid=0x2fd0 runnable
“GC task thread#3 (ParallelGC)” os_prio=0 tid=0x000000000266c000 nid=0x1c90 runnable
“GC task thread#4 (ParallelGC)” os_prio=0 tid=0x000000000266f800 nid=0x3614 runnable
“GC task thread#5 (ParallelGC)” os_prio=0 tid=0x0000000002670800 nid=0x298c runnable
“GC task thread#6 (ParallelGC)” os_prio=0 tid=0x0000000002674000 nid=0x2b40 runnable
“GC task thread#7 (ParallelGC)” os_prio=0 tid=0x0000000002675000 nid=0x25f4 runnable
“VM Periodic Task Thread” os_prio=2 tid=0x000000001e097000 nid=0xd54 waiting on condition
JNI global references: 5
Found one Java-level deadlock:
“Thread B”: waiting to lock monitor 0x000000001e105dc8 (object 0x000000076b431d80, a java.lang.Object), which is held by “Thread A” “Thread A”: waiting to lock monitor 0x000000001c181828 (object 0x000000076b431d90, a java.lang.Object), which is held by “Thread B”
Java stack information for the threads listed above:
“Thread B”: at com.lun.concurrency.MyTask.run(DeadLockDemo.java:27)
- waiting to lock <0x000000076b431d80> (a java.lang.Object)
- locked <0x000000076b431d90> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
“Thread A”: at com.lun.concurrency.MyTask.run(DeadLockDemo.java:27)
- waiting to lock <0x000000076b431d90> (a java.lang.Object)
- locked <0x000000076b431d80> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
C:\Users\abc> ```