为什么需要多线程
1.CPU/内存/IO的巨大性能差异
CPU的调度很快,但是内存/io的执行效率很慢,如果不适用多线程,CPU在调度一个任务以后,必须要等任务执行完毕,才能执行下一个任务,这样会带来时间的浪费。详细参见以下文章
https://zhuanlan.zhihu.com/p/58431253
2.多核心CPU的发展
双核CPU,4核CPU,8核CPU....
3.线程的本质是一个可以执行代码的工人
优点:多个执行流
缺点:
1.慢:切换上下文(切换到一个线程的时候,需要读取该线程之前执行的状态,进度。。。)
2.占用资源:每个线程有独立的方法栈
能不能让上下文切换尽可能少?
一个线程一直在CPU上面跑,在这个线程里面,再开一些工作单元,调度他们去执行不同的工作——-协程(用户态线程)
Thread是什么
Thread类的每一个实例代表一个JVM中的线程,start方法启动之后,且尚未结束
- Runnable/Callable都不是线程
- Thread.start之后,JVM中就增加了
一个工人/执行流
一套方法栈
- 不同的执行流的同步执行是一切线程问题的根源
多线程图解
会被那个线程执行? 不确定。
一个线程抛出的一场,不会被另一个线程catch住。不能跨方法栈抛异常
如果线程卡住,怎么排查
- terminal jps命令 查看线程id
- jstack + id 查询方法栈
每个线程方法执行的时候,会开启一个全新的栈帧
开启的栈帧里面的局部变量都是私有的,没有任何复制发生,只是开启了栈帧
Thread的底层模型
Thread类的每一个实例代表一个JVM中的线程
1.再linux上称为 轻量级进程,和线程五本质区别
2.再Windos上使用系统线程
linux上线程与进程区别(有无共享内存)
优点
缺点
- 占用资源多
- 上下文切换慢
- 不灵活,无法实现灵活的优先级
Thread的生命周期
Thread的类里面有个State的枚举类,里面的元素标注了线程的生命周期
ThreadLocal
- 同一个对象根据调用线程的不同返回不同的值
- 通常用于存储线程私有的值,方便后续流程使用
源码解读
自己实现ThreadLocal
使用场景
http 》 拦截器把cookie的信息存入threadLocal 》 后续的流程可以使用这个threadLocal里面存储的信息<br />threadLocal有一个map再存放数据,这个map存活在每个线程对象里面,在这个线程对象中,如果线程死掉,那么map里面的数据可以被回收掉。如果这个数据放在了ThreadLocal对象里面,会把ThreadLocal变的冗余
栈帧是啥?
不是变量,不是方法,是一块内存的区域,里边包含局部变量,操作数栈。
每调用一个方法,那么就会开启一个栈帧
什么是Runnable和Callable
只有Thread代表线程,那么Runnable呢?
-
什么是Callable
Runnable的限制
不能返回值
- 不能抛出checked exception
- runnable的run方法 void run 而Callable的run方法 T run() throw Exception
Callable解决了这些问题
线程中断方法 interrupt
如果自己没有能力处理终端,那马请重新设置中断标志位,使得其他人能够知道该线程被中断了。
Java Memory Model
java中,所有对象都再堆上,引用都在再栈上。堆上的变量是被所有线程共享的。
线程安全问题
什么是线程安全
当一个累再多线程环境下被使用仍能表现出正确的行为
方法栈
竞争条件
- 原子性
- 看上去勿看的程序再多线程环境下可能有问题
- 如何解决
- 不可变对象,比如String,有final修饰,每次都会返回一个深拷贝的值
- 各种锁,synchronized, lock
- 并发工具包,底层通常是CAS(Compare and Set) 能达到预期目的时,才会进行操作,否则不操作。
A线程先更新,B线程再更新前,先把当前的value拿出来,和自己已经拿到的value比一比,如果当前的值和B线程持有的值是一样的时候,才会进行更新操作,否则不执行更新操作
int/long > atomicInteger/Long<br /> array > atomicLongArray<br /> Object > atomicReference<br /> HahMap > ConcurrentHashMap<br /> ArrayList > CopyOnWriteArrayList<br /> TreeMap > ConcurrentSSkipListMap
- 祈求上天保佑
死锁
思索问题的预防,排查和解决
死锁实战:排查,避免和解决
什么是死锁
两个线程持有一份资源,并且再等待对方手里面的资源。会产生死锁
怎么写一个死锁
static Object lock1 = new Object();
static Object lock1 = new Object();
public static void main(String[] args){
new Thread(
() -> {
synchronized (lock2)}{
try{
Thread.sleep(500);
synchronized(lock1){
System.out.print("111")
}
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
).start();
synchronized (lock1)}{
try{
Thread.sleep(500);
synchronized(lock2){
System.out.print("222")
}
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
怎么解决死锁问题
- 两个命令: jps jstack
并且查看log,哪个线程locked的哪个锁,waiting哪个锁。慢慢查就行
- 所有的资源都以相同的顺序获得锁
- 实际上,再复杂程序中,这一点很难发现
java内存模型与volatile
上图表示,java的世界中其实有两个内存 ,一个是主内存,用来存放堆上面的变量。 还有一个是工作内存,是主内存上变量的一份拷贝,因为线程内和自己的工作内存交互速度要比和主内存交互的速度要快,所以工作内存上会持有一份主内存变量拷贝,并且会自动进行同步。
两个线程更新的变量,有可能是自己持有的副本变量。可能会发现,在一个线程里面设置成true了,但是另一个线程里面发现还是false。 因为线程的工作内存还没有与主内存进行同步。 怎么解决此问题?
static boolean cancelled = false;
public static void main(String[] args){
new Thread(() -> {
while(true){
if (cancelled){
// stop
break;
}
try {
THread.sleep(500);
} catch (InterruptedException e){
e.printStackTrack();
}
// 做一些定时器相关的工作
}
}).start();
// 主线程负责把flag置换成true
Thread.sleep(50000);
cancelled = true;
}
解决上述问题 volatile(易变的) volatile的保证
可见性,并非原子性
写入volatile变量会直接写入主内存
// 声明flag的时候加上volatile static volatile boolean cancelled = false;
从volatile变量读取会直接读取主内存
-
禁止指令重排
编译器和处理器都可能堆指令进行重拍,导致问题
编译的时候,一个线程代码执行的顺序可能会被改变,单线程情况下,不会产生问题。但是多线程的情况可能会出现问题
static boolean initializationFinished = false;
public static void main(String[] args){
init();
initializationFinished = true;
// 真正的执行顺序initializationFinished = true; 》》》 init();
new Thread(() -> {
while(true){
if(initializationFinished){
doSomething();
break;
}
try {
THread.sleep(500);
} catch (InterruptedException e){
e.printStackTrack();
}
}
}).start();
}
private static void doSomething(){
// todo
}
使用volatile可以产生内存屏障,这样就不会改变指令的顺序
有同步的时候无需volatile
- synchronized/lock/atomicInteger
解决多线程问题方案
同步 synchronized
协同 wait() notify() notifyAll()
什么是CAS
乐观锁:我现在追不到你,过一会再来问一下
- 自旋锁(spin lock)
就是上面提到的更新前,先判断预期值是不是和主内存的值一样。然后再更新
悲观锁:我现在追不到你,沮丧的自暴自弃(阻塞)了
CAS: Compare And Swap
潜在的ABA问题.
ABA问题
更新前,A被更新成B,又被更新成A。 那么更新将会被执行,那么会产生ABA问题
解决方案是时间戳。更新的时候,看时间戳,不光要看值,也要判断时间戳
synchronized详解
ConcurrentHashMap VS SynchronizedMap/HashTable
synchronized:
- java语言级的支持,1.6之后性能极大提高
- 字节码层面的实现:monitorenter/monitorexit
- 锁住的是什么? 任何一个对象
如果是一个synchronized块,那么锁住的就是括号里面的对象
如果是一个普通方法被synchronized修饰,那么被锁住的就是,该类的一个实例对象
如果是静态方法,那么就是类的class对象。因为被static修饰之后,方法是隶属与类的,不是某个具体对象
运用,下面的代码,可以实现,只有一个线程来调用doSomething方法吗?
static class MyThread extends Thread{
private synchronized void doSomething(){
}
@override
public void run(){
doSomething();
System.out.print(123)
}
}
public static void main() {
new MyThread().start();
new MyThread().start();
new MyThread().start();
new MyThread().start();
// 上面4个线程同时执行,可以实现,每个线程调用doSomething方法时,独占锁吗?
// 不能,因为synchronized修饰普通方法,那么被锁住的是,调用方法的实例对象,产生的效果是,每个线程锁住自己
// 这个锁是没有任何用处的
}
底层实现
对象头<br /> 无锁 > 偏向锁 > 轻量级锁 > 重量级锁
对象头
锁粗化(jvm自动做)
锁消除(对局部变量进行同步,没有意义,因为没有人和他竞争)
多线程协同
同时,随机执行的线程,如何让他们协同工作
Object类里面的wait notify notifyall 实现协同
java原生的线程协同机制
- 线程:一个工人
- 代码:一份说明书
- 同步对象:一个印章
- wait:拿到印章,把自己的名字加入等待队列,然后放回去
- notify:拿到印章,挑一个等待队列的人通知一下
-
生产者消费者模型
// 生产者消费者模型 public class Main{ public static void main(String[] args){ Container container = new Container(); Producer producer = new Producer(); Consumer consumer = new Consumer(); producer.start(); consumer.start(); producer.join(); consumer.join(); } static class Container{ volatile Object value; } static class Producer extends Thread { Container container; public Producer(Container container){ this.container = container; } @Override public void run(){ synchronized(container){ // 池子里面有东西,就不生产,释放资源,让消费者去消费 while(container.value != null){ try{ container.wait(); }catch(InterruptedException e){ e.printStackTrace(); } } // 池子里面没东西,就可以放心生产 int random = new Random().nextInt(); container.value = random; // 唤醒消费者 container.notify(); } } } static class Consumer extends Thread { Container container; public Consumer(Container container){ this.container = container; } synchronized(container){ // 池子里面没有东西,就不消费,释放资源,让生产者生产 while(container.value == null){ try{ container.wait(); }catch(InterruptedException e){ e.printStackTrace(); } } // 池子里面有东西,就可以放心消费 container.value = null; // 唤醒生产者 container.notify(); } } }
面试题:wait和sleep有上面区别?
wait会释放自己占有的资源,sleep不会释放
JUC包
为什么需要JUC包
- synchronized性能不高
- wait/ notify太原始了,难用
- 不够灵活
JUC的改进
- 提高了性能
- 提供了多种场景下更方便的实现
JUC包AtomicXXX
- AtomicInteger/AtomicBoolean/AtomicLong/AtomicReference
- 全部都是CAS,提高性能
- AtomicXXX的额外用途
Lock/Condition
Lock解决同步问题
Condition解决协同问题
- Lock/Conditon与synchronized/wait/notify机制的差别
- 锁的重入问题
更加灵活
- 同一个锁可以有多个条件
- 读写分离
- tryLock
-
Lock/Condition再次实现生产者消费者模型
```java public class ProducerConsumer { private static Lock lock = new ReentrantLock(); private static Condition queueEmpty = lock.newCondition(); private static Condition queueFull = lock.newCondition();
public static void main(String[] args) throws Exception{ Queue
queue = new LinkedList<>(); Thread producer = new Producer(queue); producer.start(); Thread consumer = new Consumer(queue); consumer.start();
producer.join(); consumer.join(); }
static class Producer extends Thread{ Queue
queue; public Producer(Queue
queue) { this.queue = queue
}
@override public void run(){
lock.lock(); try { while(!queue.isEmpty()){ queueEmpty.await(); } queue.add(1234); queueFull.signal(); } catch(InterruptedException e){ e.printStackTrace(); } finally{ lock.unlock(); }
} }
static class Consumer extends Thread{ Queue
queue; public Consumer(Queue
queue) { this.queue = queue
}
@override public void run(){
lock.lock(); try { while(queue.isEmpty()){ queueFull.await(); } queue.remove(); queueEmpty.signal(); } catch(InterruptedException e){ e.printStackTrace(); } finally{ lock.unlock(); }
} }
}
<a name="nxT5F"></a>
## 多个线程生成数字,主线程把他们的结果统计起来
```java
package singleton;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadCoordination {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition allThreadFinished = lock.newCondition();
AtomicInteger howManyThreadsRunning = new AtomicInteger(10);
ConcurrentHashMap<Integer, Integer> result = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = new Random().nextInt();
result.put(value, value);
lock.lock();
try{
howManyThreadsRunning.decrementAndGet();
allThreadFinished.signal();
} finally {
lock.unlock();
}
}).start();
}
// 等所有的线程执行完,主线程再打印出结果来
lock.lock();
try {
while (howManyThreadsRunning.get() > 0){
allThreadFinished.await();
}
}finally {
lock.unlock();
}
System.out.println(result);
}
}
CountDownLatch倒数闭锁
只有每个线程执行完了,这个锁才会被打开,主线程才能往下执行
package singleton;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadCoordination1 {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<Integer, Integer> result = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = new Random().nextInt();
result.put(value, value);
latch.countDown();
}).start();
}
latch.await();
System.out.println(result);
}
}
CyclicBarrier 可以循环使用的屏障
等待各个线程执行(包括主线程),执行到全部线程遇到一个屏障,然后所有线程一起结束
package singleton;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class ThreadCoordination2 {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
ConcurrentHashMap<Integer, Integer> result = new ConcurrentHashMap<>();
CyclicBarrier barrier = new CyclicBarrier(11); // 加上主线程 一个11个,每个线程遇到屏障以后,这个数字就会-1
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = new Random().nextInt();
result.put(value, value);
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
// 所有线程都遇到这个屏障后,大家一起结束
barrier.await();
System.out.println(result);
}
}
BlockingQueue/BlockingDeque
传统的集合框架的操作要么正常返回,要么丢出异常
BlockingQueue/BlockingDeque提供了一种【等待】的可能api
阻塞操作:put / take
使用BlockingQueue实现生产消费者模型
package singleton;
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumer {
public static void main(String[] args) throws InterruptedException {
// 篮子里面始终保持3个生产物
LinkedBlockingDeque<Integer> queue = new LinkedBlockingDeque<>(3);
// 两个生产者 3个消费者
Producer producer = new Producer(queue);
Producer producer1 = new Producer(queue);
Consumer consumer = new Consumer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
producer.start();
producer1.start();
consumer.start();
consumer1.start();
consumer2.start();
consumer.join();
consumer1.join();
consumer2.join();
producer.join();
producer1.join();
}
static class Producer extends Thread{
BlockingDeque<Integer> queue;
public Producer(BlockingDeque<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(500);
queue.put(new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread{
BlockingDeque<Integer> queue;
public Consumer(BlockingDeque<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
queue.take();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Future/Executor service
- 一个多线程执行器框架,最常用的实现是线程池
他屏蔽了线程的细节,提供了并发执行任务机制
为什么需要线程池?
- 线程的代价太昂贵
- 我们应该避免使用Executors吗?
- Future代表一个[未来才会发生的事情]
- FUture本身是立即返回的
- get()会阻塞并返回执行结果,并抛出可能的异常
import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor;
public class ThreadCoordination4 { public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Executor myExecutor = new MyExecutor(); myExecutor.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(“1111”); });
myExecutor.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("2222");
});
}
// 最原始的执行器框架
static class MyExecutor implements Executor{
@Override
public void execute(Runnable runnable) {
new Thread(runnable).start();
}
}
}
<a name="YO2gA"></a>
## Executors创建线程池,进行多线程操作
```java
package singleton;
import java.io.File;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.*;
public class ThreadCoordination5 {
// 三个线程读取三个文件,统计他们的字符个数,然后把结果汇总
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<File> files = Arrays.asList(
new File("1.txt"),
new File("2.txt"),
new File("3.txt")
);
// 用Executors创建一个线程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
List<Future<Map<String, Integer>>> futures = new ArrayList<>();
for (File file : files) {
// submit方法会立刻返回,是一个异步任务。不会阻塞,不会等到任务执行完。返回的东西是未来才会拿到结果
// 可能拿到结果,也可能拿到任务抛出的异常
Future<Map<String, Integer>> future = threadPool.submit(new WordCount(file));
futures.add(future);
}
Map<String, Integer> finalResult = new HashMap<>();
for (Future<Map<String, Integer>> future : futures) {
// 这个get方法可以拿到未来可能返回的结果,因为要拿到结果需要任务执行完,所以这个get方法可能会阻塞
Map<String, Integer> result = future.get();
System.out.println("result" );
// result放到finalResult中
finalResult.putAll(result);
}
// 关闭线程
threadPool.shuntdown();
}
static class WordCount implements Callable<Map<String, Integer>> {
File file;
public WordCount(File file) {
this.file = file;
}
@Override
public Map<String, Integer> call() throws Exception {
List<String> lines = Files.readAllLines(file.toPath());
Map<String, Integer> wordToCountMap = new HashMap<>();
for (String line : lines) {
String[] words = line.split("\\s+");
for (String word :
words) {
int count = wordToCountMap.getOrDefault(word, 0);
wordToCountMap.put(word, count+1);
}
}
return wordToCountMap;
}
}
}
线程池详解
- 自己是老板,使用线程池完成多线程协同和任务分解
- 线程池的参数们
- corePoolSize核心员工数量
- maximumPoolSize最大招募的员工数量
- keepAliveTime/unit员工闲下来多久之后炒掉他们
- workQueue订单队列
- threadFactory造人的工厂
- handler订单实在太多的处理策略
ForkJoin框架
- java7引入
- 分而治之策略的实例
package singleton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.*;
public class ThreadCoordination6 {
//ForkJoin框架
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<File> files = Arrays.asList(
new File("src/singleton/1.txt"),
new File("src/singleton/2.txt"),
new File("src/singleton/3.txt")
);
ForkJoinPool forkJoinPool = new ForkJoinPool();
System.out.println("@@@@@@@@@@@@@@@@@@@@@");
System.out.println(forkJoinPool.submit(new WordCount(files)));
}
static class WordCount extends RecursiveTask<Map<String, Integer>> {
List<File> files;
public WordCount(List<File> files) {
this.files = files;
}
@Override
protected Map<String, Integer> compute() {
if (files.isEmpty()){
return Collections.emptyMap();
}
// 处理list的第一个文件
Map<String, Integer> wordCount = null;
// 递归调用compute方法,处理文件
Map<String, Integer> countOfRestFiles = null;
try {
wordCount = count(files.get(0));
countOfRestFiles = new WordCount(files.subList(1, files.size())).compute();
} catch (IOException e) {
e.printStackTrace();
}
// 把每一次执行的结果merge起来
return merge(wordCount, countOfRestFiles);
}
private Map<String, Integer> merge(Map<String, Integer> map1, Map<String, Integer> map2) {
Set<String> words = new HashSet<>(map1.keySet());
words.addAll(map2.keySet());
// 把每个单词出现次数加总起来求和
Map<String, Integer> result = new HashMap<>();
for (String word :
words) {
result.put(word, map1.getOrDefault(word, 0) + map2.getOrDefault(word, 0));
}
return result;
}
private Map<String, Integer> count(File file) throws IOException {
List<String> lines = Files.readAllLines(file.toPath());
Map<String, Integer> wordToCountMap = new HashMap<>();
for (String line : lines) {
String[] words = line.split("\\s+");
for (String word :
words) {
int count = wordToCountMap.getOrDefault(word, 0);
wordToCountMap.put(word, count+1);
}
}
return wordToCountMap;
}
}
}