一 线程相关
Java并发,那么便要说到一个人:DougLee,JUC便出自他之手。
1、JUC是Java.util.concurrent的简称,是Java源码下的一个包,提供并发功能
2、在 Java1.5 后,JUC被加入了JDK源码中
3、线程操作资源类
1.1 为什么多线程如此重要
1)硬件方面:摩尔定律失效(摩尔定律:当价格不变时,集成电路上的可容纳元器件的数目约每隔18-24个月便会增加一倍,性能也会提升一倍。换言之,每一美元买到的电脑性能,将在每隔18-24个月之后将翻一倍。这个定律揭示了信息技术的进步速度),但是,从2003年开始,CPU主频已经不再开始翻倍,而是采用了多核而不是更快的主频。在主频不再提高而且核数不断增加的时候,想要程序变得更快,就需要用到并发并行编程。
2)软件方面:高并发系统,异步+回调等生产需求
1.2 JUC包结构
java.util.concurrent
:这个包下的主要是线程池,并发集合,及一些并发工具类,线程池围绕Executor框架构建java.util.concurrent.atomic
:主要是一些原子类,算是辅助工具类,主要实现依赖CASjava.util.concurrent.locks
:主要是一些与锁相关的类
二 JMM内存模型
- Java内存模型(Java Memory Model)本身是一种抽象的概念,是一种规范,并不是真实存在的,它描述的是一组规则或规范通过规范定制了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。
- 线程解锁前,必须把共享变量的值刷新回主内存。
- 线程加锁前,必须读取主内存的最新值到自己的工作内存。
- 加锁解锁是同一把锁。
- 由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方成为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作空间,然后对变量进行操作,操作完成再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存储存着主内存中的变量副本拷贝,因此不同的线程无法访问对方的工作内存,此线程间的通讯(传值)必须通过主内存来完成。
- JMM遵循的特性:可见性,原子性,有序性
2.1 可见性
1)其意思就是多个线程中从主内存拷贝值到各自线程的工作内存,若有一个线程改了当前拷贝过来的这个值,并将修改好的值返回给了主内存,那么此时其他线程不知道这个线程已经把值改了,此时必须要有一种机制,只要有一个线程修改完自己的工作内存的值,并写回给主内存以后要及时通知其他线程,这样即使通知的这种情况,就是JMM内存模型里面第一个重要特性:俗称:可见性。
2)可以使用 volatile 关键字来保证共享变量的可见性
3)使用volatile保证了共享变量的可见性Demo:(面试题常问) ```java import java.util.concurrent.TimeUnit;
class MyData{
/**
* 没加volatile的时候,程序执行,只会打印前两句,而main线程则会一直判断data.i==0为true
* 因为它一直认为i的值没有改变。i不具有可见性。
* 当加上volatile,在子线程中修改了i的值,它会通知其他线程,把最新的值告知
* 所以main线程判断i!=0,就会跳出循环
*/
volatile int number = 0;
public void addTO60(){
this.number = 60;
}
}
/**
- 1.验证volatile的可见性
1.1 假如int number = 0; number变量之前没有添加volatile关键字修饰,没有可见性 */ public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
// 线程暂停3秒钟
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
}
// 3秒钟以后将把0改为1
data.addTo1();
System.out.println(Thread.currentThread().getName() + "\t updated number value:" + data.i);
}, "AAA").start();
// 第二个线程就是我们的main线程
while (data.number == 0) {
// main主线程就一直在这里等待循环,直到number不再等于零
}
// 若这句话打印出来了,说明main主线程感知到了number的值发生了变化,那么此时可见性就会被触发
System.out.println(Thread.currentThread().getName() +
"\t mission is over,main get number value:" + data.i); // 这个是main线程
} }
<a name="e4v3O"></a>
## 2.2 原子性
1)JAVA内存模型(JMM)要求保证原子性,但是volatile是不保证原子性的。<br />2)volatile不能保证原子性的案例演示
```java
public class VolatileTest1 {
public static void main(String[] args) {
Data1 data = new Data1();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
data.add1();
}
}, i + "").start();
}
// 当存活线程为2时
while (Thread.activeCount() > 2) {
Thread.yield();
}
/**
* i的值不能在多线程操作中保证原子性
* 取值总是在 170000 - 200000 之间
*/
System.out.println("data == " + data.i);
}
}
class Data1 {
volatile int i = 0; // 使用 volatile 关键字修饰
public void add1() {
this.i += 1;
}
}
2.3 有序性
1)计算机在执行程序时,为了提高性能,编译器和处理器常常会做指令重排,一把分为以下3种
- 编译器优化的重排
- 指令并行的重排
- 内存系统的重排
2)单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致
3)处理器在进行重新排序是必须要考虑指令之间的数据依赖性
4)多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程使用的变量能否保持一致性是无法确定的,结果无法预测
public class Test1 {
public static void main(String[] args) {
new Test1().mySort();
/**
* 语句1234的执行顺序是怎样的
* 问题:在重排后,语句4会排到第一位来执行吗?
* 答:由于语句4需要使用到x,和其他语句存在着数据的依赖性,所以不可能排到第一位执行
*/
}
public void mySort() {
int x = 11; // 语句1
int y = 12; // 语句2
x = x + 5; // 语句3
y = x * x; // 语句4
}
}
public class ReSortSeqDemo {
private int a = 0;
private boolean flag = false;
public void m1() {
a = 1;
flag = true;
}
/**
* 在多线程环境下,由于编译器优化重排的存在,在m1方法中,a=1,flag=true,这俩个语句不知道哪一个会被先执行。
* 由于这俩个语句没有数据依赖性,所以多线程下语句的执行顺序不同就会可能出现不同的结果。
* 比如:有线程1进来执行m1,flag=true先执行,但是这时候线程2进来执行m2,判断flag=true
* 此时线程1的m1的a=1还没有执行,那么线程2的a=a+5先执行,然后m1的a=1再执行,
* 最后结果就是a=1
* 多线程下变量不能保证一致性,无法预测结果
*/
public void m2() {
if (flag) {
a = a + 5;
flag = false;
}
}
}
三 CompletableFuture
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
3.1 Future和Callable接口
- Future接口定义了操作异步任务执行的一些方法,如获取异步任务的执行结果,取消任务的执行,判断任务是否被取消,判断任务执行是否完毕等。
- Callable接口中定义了需要有返回值的任务需要实现的方法。比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程之后开始执行任务之后,主线程就去干其他事情了,过了一会才去获取子线程的执行结果。
FutureTask案例
- 优点:有返回值
缺点:阻塞,get()方法会阻塞当前进程,直到FutureTask执行完毕。可以使用get(int, TimeUnit),定义阻塞时间。
public class FutureTaskTest {
public static void main(String[] args) throws Exception {
/**
* FutureTask的构造器中的形参为 Callable 对象
*/
FutureTask<String> task = new FutureTask<>(() -> {
System.out.println("I am a future task");
TimeUnit.SECONDS.sleep(2);
return "I done. from future task";
});
new Thread(task, "task1").start();
System.out.println("main thread running...");
// get()方法将阻塞主线程,直到子线程执行完毕,返回结果
//System.out.println(task.get());
// 阻塞 2 秒钟,不让其一直阻塞在这里
task.get(2, TimeUnit.SECONDS);
// 使用轮循的方式
while (true) {
if (task.isDone()) {
break;
} else {
System.out.println("task is still working");
}
}
System.out.println("main thread still running");
}
}
复杂场景:
- 应对Future的完成时间,完成了可以告诉当前线程,也就是回调通知
- 将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果
- 当Future集合中某个任务最快结束时,返回结果
-
3.2 CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
CompletionStage
CompletionStage代表异步计算过程中某一个阶段,一个阶段完成之后,可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个Function,Comsumer或者Runnable
- 一个阶段的执行可能被单个阶段的完成触发,也可能是由多个阶段一起触发的
3.2.1 初级
CompletableFuture的四个生成对象的静态方法
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
20, 1L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20));
/**
* 无返回值,参数是一个 Runnable 对象
*/
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---- running");
});
/**
* 无返回值,参数是 Runnable对象 和 线程池对象
*/
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---- running");
}, threadPoolExecutor);
/**
* 有返回值,参数是一个
*/
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---- running");
return "future3";
});
// 阻塞,因为这个也是使用了Future的方法
System.out.println(future3.get());
/**
* 有返回值,参数是一个
*/
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---- running");
return "future4";
}, threadPoolExecutor);
// 阻塞,这个是Future的方法
System.out.println(future4.get());
threadPoolExecutor.shutdown();
}
}
3.2.2 中级
三 生产者和消费者
面试的几大问题:单例模式、八大排序算法、生产者和消费者问题、死锁
3.1 synchronized版生产者消费者
/**
* synchronized 版的生产者和消费者问题
*/
public class Test1 {
/**
* 线程之间的通信问题:生产者和消费者问题
* 线程交替执行 A B 两个线程操作同一个变量 num
* A num+1
* B num-1
*/
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.incre();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decre();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
class Data {
// 共享资源
private int num = 0;
// 生产一个资源
public synchronized void incre() throws InterruptedException {
while (num > 0) { // 这里使用while代替if
this.wait();
}
num++;
this.notifyAll();
System.out.println(Thread.currentThread().getName() + " ==> " + num);
}
// 消费一个资源
public synchronized void decre() throws InterruptedException {
if (num <= 0) {
this.wait();
}
num--;
this.notifyAll();
System.out.println(Thread.currentThread().getName() + " ==> " + num);
}
}
上述的代码可能会出现的问题:当生产者和消费者都不止一个的时候,那么线程通知唤醒了所有的线程。
使用 while 代替 if
3.2 JUC版的生产者消费者
class Data2 {
// 共享资源
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// 生产一个资源
public void incre() throws InterruptedException {
//condition.await(); 等待
//condition.signalAll(); 唤醒全部
lock.lock();
try {
while (num > 0) {
condition.await();
}
num++;
condition.signalAll();
System.out.println(Thread.currentThread().getName() + " ==> " + num);
} finally {
lock.unlock();
}
}
// 消费一个资源
public void decre() throws InterruptedException {
lock.lock();
try {
while (num <= 0) {
condition.await();
}
num--;
condition.signalAll();
System.out.println(Thread.currentThread().getName() + " ==> " + num);
} finally {
lock.unlock();
}
}
}
四 volatile关键字
4.1 实例
- 线程的共享变量,两个线程访问
public class ThreadDemo1 implements Runnable {
private boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
setFlag(true);
System.out.println("flag = " + flag);
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
上面这种情况下,main主线程和threadDemo1 子线程两个同时访问共享变量 flag ,初始值为 false,两个线程得到的值都是false,但是子线程更新 flag 为 true 后,main线程中的这个值没有更新,仍然是 false,造成死循环。public class VolatileTest {
public static void main(String[] args) {
ThreadDemo1 threadDemo1 = new ThreadDemo1();
new Thread(threadDemo1).start();
while (true) {
// 当线程类的共享变量 flag=true 那么就打印,跳出循环
if (threadDemo1.isFlag()) {
System.out.println("==============");
break;
}
}
}
}
解决的方法有:
1)加锁(同步代码块,获取flag的值,刷新main线程中flag的值,同步主存的flag值。但是这种方法的效率会变得很慢)。
2)使用volatile 关键字修饰共享变量while (true) {
synchronized (threadDemo1) { // synchronzied 锁
if (threadDemo1.isFlag()) {
System.out.println("==============");
break;
}
}
}
private volatile boolean flag = false;
4.2 volatile 关键字
当多个线程操作共享变量的时候,可以保证内存中的数据是可见的。
Java 提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。可以将 volatile 看做一个轻量级的锁,但是又与锁有些不同。
注意:
1)volatile 不具有“互斥性”。也即是说,多个线程可以同时操作共享变量
2)volatile 不能保证变量的“原子性”。
4.3 内存可见性
1)内存可见性(Memory Visibility)是指当某个线程正在使用对象状态而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。
2)可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。
3)我们可以通过同步来保证对象被安全地发布。除此之外我们也可以使用一种更加轻量级的 volatile 变量。
五 八锁现象
八锁,就是指关于锁的8个问题,弄明白这个8个问题,也就是大概弄明白锁了
1)标准情况下,两个线程执行,先发短信?还是打电话?
2)如果sendSMS延迟4秒执行,先发短信?还是打电话?
/**
* 普通方法 synchonized 锁的对象是方法的调用者,也就是this
* 当只有一个phone对象时,先得到锁的方法会先执行,即是这个方法需要休眠,另一个方法也要等待
*/
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sendSMS();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
// 普通同步方法
public synchronized void sendSMS() {
// 休眠4秒
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通同步方法
public synchronized void call() {
System.out.println("打电话");
}
}
3)增加一个普通方法,先发短信?还是打电话?
4)两个对象,两个同步方法,先发短信?还是打电话?
public class Test2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(() -> {
phone.sendSMS();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone2 {
// 普通方法
public void call2() {
System.out.println("打电话");
}
// 普通同步方法
public synchronized void sendSMS() {
// 休眠4秒
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通同步方法
public synchronized void call() {
System.out.println("打电话");
}
}
5)两个静态的同步方法,一个对象,先发短信?还是打电话?
6)两个静态的同步方法,两个对象,先发短信?还是打电话?
/**
* 静态同步方法的synchronzied是Class,只有一个
*/
public class Test3 {
public static void main(String[] args) {
Phone3 phone = new Phone3();
Phone3 phone2 = new Phone3();
new Thread(() -> {
phone.sendSMS();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone3 {
// 静态同步方法
public static synchronized void sendSMS() {
// 休眠4秒
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 静态同步方法
public static synchronized void call() {
System.out.println("打电话");
}
}
7)一个静态同步方法,一个普通同步方法,一个对象,先发短信?还是打电话?
8)一个静态同步方法,一个普通同步方法,两个对象,先发短信?还是打电话?
/**
* 静态同步方法的synchronzied是Class,只有一个
*/
public class Test4 {
public static void main(String[] args) {
Phone4 phone = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(() -> {
phone.sendSMS();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone4 {
// 静态同步方法
public static synchronized void sendSMS() {
// 休眠4秒
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通同步方法
public synchronized void call() {
System.out.println("打电话");
}
}
六 集合类不安全
集合的大多数实现类为了提高效率,都不是线程安全的。
7.1 ArrayList
在并发环境下 ArraList是不安全的,如果对list的写入操作有同时多个线程的话,就会出现异常:java.util.ConcurrentModificationException
可以有几种解决方案:
1)使用Vector 线程安全的集合
2)使用Collections.synchronizedList(new ArrayList<>())
3)使用CopyOnWriteArrayList类,JUC包下的线程并发工具类
public static void main(String[] args) {
// List<String> list = new ArrayList<>();
// List<String> list = new Vector<>();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}).start();
}
}
copyOnWrite 写入时复制,COW,计算机程序设计领域的一种优化策略。
// CopyOnWriteArrayList类的写入数据的方法的JDK8源码
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock(); // 解锁
}
}
7.2 HashSet
HashSet同样是不安全的,可能会出现异常:java.util.ConcurrentModificationException
public class HashSetTest {
public static void main(String[] args) {
// HashSet set = new HashSet();
// Set set = Collections.synchronizedSet(new HashSet<>());
Set set = new CopyOnWriteArraySet();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}).start();
}
}
}