1. 源码分析
1.1. 线程状态的枚举类源码
public enum State {
//新建
NEW,
//运行
RUNNABLE,
//阻塞
BLOCKED,
//等待(一直等)
WAITING,
//超时等待(等一段时间,超时就不等了)
TIMED_WAITING,
//终止
TERMINATED;
}
1.2. ReentrantLock
构造方法可以产生公平锁和非公平锁,空参构造直接创建非公平锁。
公平锁:公平,先来后到
非公平锁:不公平,可以插队。(默认)
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
1.3. HashSet 底层原理
HashSet底层直接创建了一个HashMap,只是在利用HashMap的key,value填入了一个空的Object对象。
/**
* Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
* default initial capacity (16) and load factor (0.75).
*/
public HashSet() {
map = new HashMap<>();
}
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
1.4. 线程池
实现线程池的三个方法,本质都是调用了ThreadPoolExecutor()方法
// 单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//固定数量线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//缓存线程池,数量可多可少
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2. 案例分析
2.1. 线程有哪些创建方法?
方式1:定义类继承Thread
public class Test {
public static void main(String[] args) {
MyThread mt1 = new MyThread();
MyThread mt2 = new MyThread();
mt1.start();
mt2.start();
}
}
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(this.getName() + " " + i);
}
}
}
方法2:定义类实现Runnable,将该类对象作为参数传入Thread方法
public class TestRunnable {
public static void main(String[] args) {
HisThread hisThread = new HisThread();
Thread thread1 = new Thread(hisThread);
Thread thread2 = new Thread(hisThread);
thread1.start();
thread2.start();
}
}
class HisThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(i);
}
}
}
方式3:定义类实现Callable,将该类对象作为参数传入FutureTask生成futureTask对象,将FutureTask对象作为参数传入Thread。
Callable可以有返回值,可以抛出异常,重写call()方法
details:有缓存、结果可能需要等待,有阻塞
package multithreading;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
HerThread herThread = new HerThread();
FutureTask<Integer> futureTask = new FutureTask<>(herThread);//适配类
Thread thread1 = new Thread(futureTask);
Thread thread2 = new Thread(futureTask);//结果会被缓存
thread1.start();
thread2.start();
Integer res = futureTask.get();//这个get方法可能会产生阻塞,把它放到最后或者使用异步通信来处理
System.out.println(res);
}
static class HerThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()");
return 1024;
}
}
}
方式4:创建线程池。
2.2. 多线程与单线程传输多个文件
package multithreading;
import java.io.*;
public class TransferFiles {
public static void main(String[] args) throws IOException {
File[] files = new File[18];
for (int i = 1; i <19 ; i++) {
files[i-1] = new File("resourceFile/1"+" ("+i+")"+".mp4");
}
// oneThreadTransfer(files);
multithreadingTransfer(files);
}
//单线程传输三个视频
public static void oneThreadTransfer(File[] files) throws IOException {
long start = System.currentTimeMillis();
int len;
byte[] b = new byte[1024 << 3];
for (File file : files) {
FileInputStream fis = new FileInputStream(file);
FileOutputStream fos = new FileOutputStream("resultFile/" + file.getName());
while((len = fis.read(b)) != -1) {
fos.write(b,0,len);
}
fos.close();
fis.close();
}
long end = System.currentTimeMillis();
System.out.println("单线程:" + (end-start));
}
//给每个视频开一个线程
public static void multithreadingTransfer(File[] files) {
for (File file : files) {
new Thread(new Thread1(file)).start();
}
}
}
class Thread1 implements Runnable {
private File file;
public Thread1(File file) {
this.file = file;
}
@Override
public void run() {
long start = System.currentTimeMillis();
FileInputStream fis = null;
FileOutputStream fos = null;
try {
int len;
byte[] b = new byte[1024 << 3];
fis = new FileInputStream(file);
fos = new FileOutputStream("resultFile/" + file.getName());
while((len = fis.read(b)) != -1) {
fos.write(b,0,len);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
fos.close();
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println("start : " + start + " end : " + end);
System.out.println(end - start);
}
}
2.3. 生产者和消费者问题
2.3.1. 问题代码:使用if会存在虚假唤醒问题。在哪行进行wait操作就会在哪行被唤醒
package multithreading;
public class ProducerAndConsumer {
static class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0) {
this.wait();
}
++number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
this.wait();
}
--number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
this.notifyAll();
}
}
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer2").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer2").start();
}
}
👇👇👇👇👇👇👇👇👇👇以下内容来自 JDK1.8 官方文档👇👇👇👇👇👇👇👇👇👇
2.3.2. 使用while替换if,可以解决问题:
package multithreading;
public class ProducerAndConsumer {
static class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
while (number != 0) {
this.wait();
}
++number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
this.wait();
}
--number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
this.notifyAll();
}
}
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer2").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"consumer2").start();
}
}
2.3.3. 使用lock、Condition代替synchronized等
使用Condition类的await() 和 notifyAll() 替代Object类wait() 和 signalAll() 【Condition condition = lock.newCondition();//创建同步监视器】 Condition 可以精准的通知和唤醒线程
package multithreading;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerAndConsumer {
static class Data {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();//创建同步监视器
public void increment() {
lock.lock();
try {
while (number != 0) {
condition.await();
}
++number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
--number;
System.out.println(Thread.currentThread().getName()+"=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer1").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.increment();
}
},"producer2").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
data.decrement();
}
},"consumer2").start();
}
}
2.4. 八锁问题
被锁的分别是对象本身和Class模版。
- 对普通方法(非静态方法)使用synchronized会锁住对象本身,该对象其它添加synchronized锁的非静态方法都会争抢这个锁,但对象间不会构成争抢。
- 对静态方法使用synchronized会锁住整个Class模版(这个类的所有对象),所有该类对象中添加了synchronized锁的静态方法都会争抢这把锁,但不会和任何对象中的任何添加了synchronized锁的费静态方法构成竞争。
- 相当于静态方法被锁上的是门,门只有一个,所以所有对象的被锁静态方法都在争抢这个门;
- 而非静态方法被锁上的是窗户,窗户有很多个,所以每个对象内竞争一个窗户;对象间是不同窗户,不构成竞争;同时,窗户和门又不构成竞争,即静态方法和非静态方法之间不构成竞争。
接下来用案例来说明。
1.对象内非静态方法同步
package multithreading.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
static class Action{
public synchronized void goDoor() {
try {
TimeUnit.SECONDS.sleep(4);//报锁而眠
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("走大门");
}
public synchronized void goWindow() {
System.out.println("翻窗户");
}
}
public static void main(String[] args) throws InterruptedException {
Action action = new Action();
new Thread(action::goDoor).start();
TimeUnit.SECONDS.sleep(1);
new Thread(action::goWindow).start();
}
}
2.对象间非静态方法同步
package multithreading.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
static class Action{
public synchronized void goDoor() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("走大门");
}
public synchronized void goWindow() {
System.out.println("翻窗户");
}
}
public static void main(String[] args) throws InterruptedException {
Action action1 = new Action();
Action action2 = new Action();
new Thread(action1::goDoor).start();
TimeUnit.SECONDS.sleep(1);
new Thread(action2::goWindow).start();
}
}
3.对象内静态非静态测试(对象间测试与之结果一致)
窗户哥没有因为大门报锁而眠就陷入等待。
package multithreading.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Action action1 = new Action();
new Thread(() -> {
action1.goDoor();
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
action1.goWindow();
}).start();
}
}
class Action{
public static synchronized void goDoor() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("走大门");
}
public synchronized void goWindow() {
System.out.println("翻窗户");
}
}
4.对象间静态方法同步(对象内测试与之结果一致)
package multithreading.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Action action1 = new Action();
Action action2 = new Action();
new Thread(() -> {
action1.goDoor();
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
action2.goWindow();
}).start();
}
}
class Action{
public static synchronized void goDoor() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("走大门");
}
public static synchronized void goWindow() {
System.out.println("翻窗户");
}
}
2.5. List类的线程安全问题
1.ArrayList 写操作的不安全问题,会产生 java.util.ConcurrentModificationException
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class TestList {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,4));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
解决办法:
1.List
list = new Vector<>(); 2.List list = Collections.synchronizedList(new ArrayList<>()); 3.List list = new CopyOnWriteArrayList<>();
CopyOnWrite 指写入时复制,COW 计算机程序设计领域的一种优化策略;
多线程调用List时,写入会覆盖,容易造成数据问题,所以写入时要避免覆盖;
读写分离;
CopyOnWrite 底层使用的是Lock 而不是 synchronized,所以效率比Vector要高(Vector底层使用的是synchronized);
2.Set多线程写入不安全问题
解决办法:
1.Set
set = Collections.synchronizedSet(new HashSet<>()); 2.CopyOnWriteArraySet set = new CopyOnWriteArraySet<>();
3.Map多线程写入不安全问题
解决办法:
ConcurrentHashMap
2.6. 常用辅助类
名称 | 作用 |
---|---|
CountDownLatch | 线程数量减法计数器,等计数器归零后再向下执行,不归零就会阻塞 |
CyclicBarrier | 线程数量加法计数器,不达到设定值就会阻塞 |
Semaphore | 线程数量限制器,达到限定数量就会阻塞 |
1.CountDownLatch
import java.util.concurrent.CountDownLatch;
public class TestCountDownLatch {
static int num = 0;
public static void main(String[] args) throws InterruptedException {
//减法计数器,初始值为4
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " go out");
num++;
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();// 等到计数器归零,然后再向下执行,如果不归零就会在这里阻塞
System.out.println("Close Door");
System.out.println(num);
}
}
2.TestCyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class TestCyclicBarrier {
public static void main(String[] args) {
//加法计数器,不达到设定值就会阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙");
});
for (int i = 1; i <= 6; i++) {
final int temp = i;
// lambda表示的类是无法获取外界变量的,但如果是final类型的则可以
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" 收集了第 "+temp+" 颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
3.Semaphore
semaphore.acquire():获得,使信号量-1,如果达到设定上限(信号量剩余为0),等待,直到被释放 semaphore.release():释放,会将当前的信号量释放+1,然后唤醒等待的线程
作用:多个共享资源互斥地使用;并发限流,控制最大的线程数。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class TestSemaphore {
public static void main(String[] args) {
//线程数量:停车位,限流
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
//acquire()获得
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+" 抢到了车位");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
2.7. 读写锁
ReadWriteLock 写入的时候只有一个线程写,读取的时候所有线程都可以读
独占锁——写锁 一次只能被一个线程占有
共享锁——读锁 多个线程可以同时占有
读-读 可以共存
读-写 不能共存
写-写 不能共存
import java.util.HashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestReadWriteLock {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp+"",null);
},String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCache {
private volatile HashMap<String,Object> hm = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// private Lock lock = new ReentrantLock(); 类似于定义Lock
//写操作,写入的时候希望同时只有一个线程写
public void put(String key , Object obj) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
hm.put(key,obj);
System.out.println(Thread.currentThread().getName()+"写入成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
//读操作,所有人都可以读
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
hm.get(key);
System.out.println(Thread.currentThread().getName()+"读取成功!");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
2.8. 阻塞队列
1.ArrayBlockingQueue
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(n); 阻塞情况: 1.队列为空时,从中取值 2.队列已满时,向其放值
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞,等待(一直阻塞) | 超时等待 |
---|---|---|---|---|
添加 | add | offer() | put | offer(element,timeout,timeunit) |
移除 | remove | poll() | take | poll(element,timeout,timeunit) |
查看队首元素 | element | peek() |
2.SynchronousQueue —— 存放一个元素后必须要先取出
package multithreading.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class TestBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+ " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+ " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+ " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" => "+blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" => "+blockingQueue.take());
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" => "+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
2.9. JMM&volatile
JMM
问题:线程不能立即知道主内存中的值被修改过了
import java.util.concurrent.TimeUnit;
public class TestVolatile {
private static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{ while(num == 0) {} }).start();
TimeUnit.SECONDS.sleep(1);
num = 1;
System.out.println(num);
}
}
volatile
1、保证可见性
2、不保证原子性
3、禁止指令重排
保证可见性: 使用volatile 可以及时更新工作内存中的变量值
public class TestVolatile {
// 不加 volatile 程序就会死循环
// 加 volatile 保证可见性
private static volatile int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while(num == 0) {}
}).start();
TimeUnit.SECONDS.sleep(1);
num = 1;
System.out.println(num);
}
}
不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰,也不能被分割。要么全部成功,要么全部失败
public class TestVolatile {
// 加 volatile 不能保证原子性
private static volatile int num = 0;
private static void add() {
num++; //不是一个原子性操作
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 2000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){ //如果活跃线程数大于2
Thread.yield();
}
System.out.println(num); //得到的结果小于 40000
}
}
使用java.util.concurrent.atomic 原子类的方法,可以替代synchronized实现原子操作
import java.util.concurrent.atomic.AtomicInteger;
public class TestVolatile {
// 加 volatile 不能保证原子性
// 原子类的 Integer
private static volatile AtomicInteger num = new AtomicInteger();
private static void add() {
//num++; //不是一个原子性操作
num.getAndIncrement(); // AtomicInteger + 1 方法 , CAS
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 2000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){ //如果活跃线程数大于2
Thread.yield();
}
System.out.println(num);
}
}
指令重排
写的程序,计算机并不是按照我们写的执行,而是进行重排
源代码——>编译器优化的重排——>指令并行也可能会重排——>内存系统也会重排——>执行
int x = 0; /1
int y = 2; /2
x = x + 5; /3
y = x * x; /4
我们所希望的执行顺序是:1234 ,但执行的时候会变成 2134 1324
a ,b,x,y 四个值都为0;
我们的逻辑如下👇:得到的结果是:x=0;y=0
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
但可能会因为指令重排导致如下结果:x=2;y=1;
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
volatile可以避免指令重排:
内存屏障。CPU指令。作用:
1.保证特定的操作的执行顺序
2.可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)
volatile可以保持可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生。
**
volatile可见性的应用:
单例模式之 DCL懒汉式
2.10. 单例设计模式(5种创建方式)
- 饿汉式——线程安全的单例设计模式,但是非常占用资源
public class Hungry {
private Hungry() {}
private static int[] arr1 = new int[1024*1024];
private static int[] arr2 = new int[1024*1024];
private static int[] arr3 = new int[1024*1024];
private static int[] arr4 = new int[1024*1024];
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance() {
return HUNGRY;
}
}
- 懒汉式——线程不安全的单例设计模式,优点是占用资源少
略
- DCL懒汉式——线程相对安全的单例设计模式
双重检测模式+volatile禁止指令重排
第一重检测:静态方法getInstance() 检测是否创建过对象
第二重检测:静态方法getInstance() 中同步代码块(锁懒汉式的class模版),检测是否创建过对象
volatile 修饰命名的静态对象
另外,DCL懒汉式无法防止利用反射获取构造方法来破坏单例。
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
public class Lazy {
private static boolean flag = false;
private Lazy() {
synchronized (Lazy.class){
if (!flag) {
flag = true;
}else {
throw new RuntimeException("不要试图用反射破坏单例");
}
}
System.out.println(Thread.currentThread().getName()+" ok");
}
private volatile static Lazy lazy;
//双重检测模式的 懒汉式单例 DCL懒汉式
public static Lazy getInstance() {
if (lazy == null) {
synchronized (Lazy.class){
if (lazy == null) {
lazy = new Lazy();// 不是一个原子性操作
}
}
}
return lazy;
}
/*
1.分配内存空间
2.执行构造方法,初始化对象
3.把这个对象指向这个空间
顺序 123
顺序 132 -> A线程先占用内存空间,再创建对象
B线程 会因为对象已经指向内存空间,而认为 lazy != null,而此时lazy还没有完成构造,
所以为了防止指令重排,一定要加volatile
*/
public static void main(String[] args) throws Exception {
//获取空参构造器
Constructor<Lazy> declaredConstructor = Lazy.class.getDeclaredConstructor(null);
//破坏私有
declaredConstructor.setAccessible(true);
//获取flag参数
Field flag = Lazy.class.getDeclaredField("flag");
//破坏私有
flag.setAccessible(true);
//创建第一个Lazy对象
Lazy lazy1 = declaredConstructor.newInstance();
//设置静态变量flag为false
flag.set(lazy1,false);
//flag为false就破坏了单例模式,故可以创建新的Lazy对象了
Lazy lazy2 = declaredConstructor.newInstance();
System.out.println(lazy1);
System.out.println(lazy2);
}
}
- 静态内部类的方式 —— 线程相对安全,延迟加载
依旧会被反射破坏单例
public class StaticInner {
private StaticInner(){}
private static StaticInner getInstance(){
return InnerClass.staticInner;
}
private static class InnerClass {
private static StaticInner staticInner = new StaticInner();
}
}
5.使用枚举类的方式——线程安全,不会被反射破坏单例模式
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) {
EnumSingle instance1 = EnumSingle.INSTANCE;
EnumSingle instance2 = EnumSingle.INSTANCE;
System.out.println(instance1);
System.out.println(instance2);
}
}
2.11. CAS
CAS:compareAndSet 比较并交换——比较当前内存中的值和主内存中的值,如果这个值是期望的,那么执行,如果不是就一直循环。public final boolean compareAndSet(int expect, int update)
缺点:
1.循环会耗时
2.一次性只能保证一个共享变量的原子性
3.会存在ABA问题
import java.util.concurrent.atomic.AtomicInteger;
public class TestCAS {
public static void main(String[] args) {
//CAS compareAndSet:比较并交换
AtomicInteger atomicInteger = new AtomicInteger(2020);
//期望、更新
// public final boolean compareAndSet(int expect, int update)
// 如果我们期望的值达到了,那么就更新,否则,就不更新,CAS 是CPU的并发原语!
atomicInteger.compareAndSet(2020,2021);
System.out.println(atomicInteger.get());
atomicInteger.compareAndSet(2020,2021);
System.out.println(atomicInteger.get());
}
}
ABA问题:线程B的期望值是x=1,线程a将x由1改为2后,又将x由2改为1,B不知道x发生过改变,所以会继续进行原先的操作。
为解决ABA问题,使用原子引用(带版本号的源自操作),引入Stamp版本戳:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
public class TestABA {
public static void main(String[] args) {
// AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
new Thread(() -> {
System.out.println("a1=> " + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("a2=> " + atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println("a3=> " + atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();
System.out.println("b1=> " + atomicStampedReference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 6,
stamp, stamp + 1));
System.out.println("b1=> " + atomicStampedReference.getStamp());
},"b").start();
}
}
Unsafe 类
Java无法操作内存,但可以调用c++ native,c++可以操作内存,Java的后门,可以通过这个类操作。
AtomicInteger.java 👇👇👇👇👇
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
Unsafe.class 👇👇👇👇👇
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
//自旋锁
do {
// 获取内存地址中的值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
3. 面试题
3.1. 什么是线程和进程?
进程:它表示运行中的程序。在操作系统中能够独立运行,并作为资源分配的基本单位。系统运行一个程序就是一个进程从创建、运行到消亡的过程。
线程:是比进程更小的执行单位,能够完成进程中的一个功能,也被称为轻量级进程。一个进程在其执行的过程中可以产生多个线程。
进程与线程的不同点:同类的多个线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多。
3.2. Java可以开启线程吗?
开不了,只能通过native方法start0(),让底层的C++去操作硬件。
3.3. 并行与并发
并发指的是多个任务交替进行,并行则是指真正意义上的“同时进行”
并发编程的本质:充分利用CPU的资源
如果系统内只有一个CPU,使用多线程时,在真实系统环境下不能并行,只能通过切换时间片的方式交替进行,从而并发执行任务。真正的并行只能出现在拥有多个CPU的系统中。
3.4. 线程的生命周期和状态
初始状态(被创建)、运行状态、阻塞状态、等待状态、超时等待状态、终止状态
序号 | 状态名称 | 说明 |
---|---|---|
1 | NEW | 初始状态,线程被创建,但还没有调用start()方法 |
2 | RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态统称为运行状态 |
3 | BLOCKED | 阻塞状态,表示线程阻塞于锁 |
4 | WAITING | 等待状态,表示当前线程需要等待其他线程做出一些特定的动作(通知或中断) |
5 | TIME_WAITING | 超时等待状态,它在指定时间后自行返回 |
6 | TERMINATED | 终止状态,表示当前线程已经执行完毕 |
这里还有状态切换条件要写
3.5. sleep和wait的异同
相同点:两者都可以暂停线程的执行,都会让线程进入等待状态。
不同点: | sleep | wait |
---|---|---|
1.来自不同类 | sleep()是来自Thread类的静态方法,作用于当前线程 | wait()是来自Object类的实例方法,作用于对象本身 |
2.锁的释放不同 | sleep时不会释放锁 | wait时会释放锁 |
3.使用的范围不同 | sleep可以在任何地方使用 | wait必须在同步代码块中使用 |
3.6. synchronized关键字
synchronized关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。
synchronized关键字最主要的三种使用方式:修饰实例方法、修饰静态方法、修饰代码块。
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的Class对象。
- 对于同步代码块,锁是synchronized括号里配置的对象。
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
synchronized在JVM里是怎么实现的?
synchronized 同步语句块的实现使用的是 monitorenter 和 monitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。 当执行 monitorenter 指令时,线程试图获取锁也就是获取 monitor的持有权。当计数器为0则可以成功获取,获取后将锁计数器设为1也就是加1。相应的在执行 monitorexit 指令后,将锁计数器设为0,表明锁被释放。如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止。
(monitor对象存在于每个Java对象的对象头中,synchronized 锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因)
synchronized用的锁是存在哪里的?
synchronized用到的锁是存在Java对象头里的。
3.7. Synchronized 和 Lock 的区别
Synchronized | Lock |
---|---|
是内置Java关键字 | 是一个Java类 |
无法判断获取锁的状态 | 可以判断是否获取到锁(tryLock()方法) |
会自动释放锁,出现异常的时候会自动释放锁 | 必须要手动释放锁,如果不释放会产生死锁,所以需要在finally中进行 lock.unlock() |
没有获得锁的线程进入等待,等待的线程会一直等待下去,不能响应中断 | 不一定会等待下去 |
可重入锁,不可以中断,非公平 | 可重入锁,可以判断锁,默认非公平(可以设置) |
适合锁少量的代码同步问题 | 适合锁大量的同步代码 |
Lock可以提高多个线程进行读操作的效率。 如果竞争不激烈,两者性能差不多;大量线程竞争时,Lock的性能要远优于synchronized。 |
3.8. 为什么要调用start()方法执行run()方法,而不是直接调用run()方法?
- new 一个 Thread,线程进入初始状态;
- 调用 start()方法,会调用底层native方法start0(),启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。
- start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,这是真正的多线程工作。
- 而直接执行 run() 方法,会把 run 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,所以这并不是多线程工作。
总结:调用start()方法可以启动线程,使线程进入就绪状态,而run()方法只是Thread的一个普通方法调用,是在主线程中执行的。
3.9. 线程池(重点)
线程池:3大方法、7大参数、4种拒绝策略
池化技术:事先准备好一些资源,要用拿走,不用还回。
线程池的好处:
1.降低资源消耗;
2.提高响应速度;
3.方便管理
线程复用、可以控制最大并发数、管理线程
线程池:三大方法,底层都是调用ThreadPoolExecutor()方法
在自己使用线程池的时候一定要通过ThreadPoolExecutor()方法创建,而不是直接调用这三个方法!
1 | newSingleThreadExecutor() |
---|---|
2 | newFixedThreadPool(n) |
3 | newCachedThreadPool() |
线程池:七大参数,ThreadPoolExecutor()方法的7个参数
1 | int corePoolSize | 核心线程池大小 |
---|---|---|
2 | int maximumPoolSize | 最大核心线程池大小 |
3 | long keepAliveTime | 超时未调用释放 |
4 | TimeUnit unit | 超时单位 |
5 | BlockingQueue workQueue | 阻塞队列 |
6 | ThreadFactor threadFactory | 线程工厂 |
7 | RejectedExecutionHandler handler | 拒绝策略 |
线程池:四种拒绝策略,RejectedExecutionHandler 接口的四个实现类
1 | ThreadPoolExecutor.AbortPolicy | 超过线程池的最大承载,不处理并且抛出异常 |
---|---|---|
2 | ThreadPoolExecutor.CallerRunsPolicy | 哪个线程递交过来的,由哪个线程处理 |
3 | ThreadPoolExecutor.DiscardPolicy | 阻塞队列满了,丢掉任务,不抛出异常 |
4 | ThreadPoolExecutor.DiscardOldestPolicy | 阻塞队列满了,尝试和最早线程竞争,失败丢弃,不抛出异常 |
了解:IO密集型,CPU密集型(调优)
池的最大大小该如何设置:
1.CPU 密集型,CPU是几核就设置为几,可以保持CPU的效率最高(Runtime.getRuntime().availableProcessores())
2.IO 密集型 ,一般设置的线程数要大于程序中十分消耗IO的线程数量
3.10. JMM
什么是JMM?
JMM:java内存模型,只是个概念
关于JMM的一些同步的约定:
1.线程解锁前,必须把共享变量立刻刷回主存;
2.线程加锁前,必须读取主存中的最新值到工作内存中;
3.加锁和解锁是同一把锁
8种操作:
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
3.11. volatile
Volatile 是 Java虚拟机提供轻量级的同步机制
1、保证可见性——工作内存需要知道主内存中的值发生了变化
2、不保证原子性——线程执行任务的时候不能被打扰,不能被分割
3、禁止指令重排
3.12. 各种锁
1.公平锁、非公平锁
公平锁:公平,先来后到,不可以插队
非公平锁:不公平,可以插队。(默认)
2.可重入锁
可重入锁(递归锁,好像所有锁都是可重入锁),Lock锁必须配对(lock和unlock成对)否则会出现死锁
3.自旋锁
spin lock
3.13. 死锁
产生:线程各自占有锁的同时去争抢对方占有的锁。
如何排查:在Terminal中用 jps -l 查看进程;然后使用 jstack 进程号 来查看该进程是否有死锁。
出现问题怎么办?
1.看日志
2.看堆栈信息
4.补充内容
4.1. 函数式接口
只有一个方法的接口(@FunctionalInterface)
简化编程模型,在新版本的底层大量应用
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
foreach()的参数为函数式接口,foreach(消费者类型的函数式接口)
1.Function
import java.util.function.Function;
public class TestFunction {
public static void main(String[] args) {
Function<String,String> function = (str) -> str;
System.out.println(function.apply("12341234"));
}
}
2.断定型接口 Predicate,有一个输入参数,返回值只能是boolean值
import java.util.function.Predicate;
public class TestPredicate {
public static void main(String[] args) {
Predicate<String> predicate = String::isEmpty;
System.out.println(predicate.test("sss"));
}
}
3.消费性接口 Consumer 只有输入,没有返回值
import java.util.function.Consumer;
public class TestConsumer {
public static void main(String[] args) {
Consumer<String> consumer = System.out::println;
consumer.accept("asfdas");
}
}
4.供给型接口 Supply 没有参数,只有返回值
import java.util.function.Supplier;
public class TestSupplier {
public static void main(String[] args) {
Supplier<String> supplier = () -> "1234";
System.out.println(supplier.get());
}
}
4.2. 流式计算
package multithreading.stream;
import java.util.Comparator;
import java.util.stream.Stream;
public class TestStream {
/**
* 现在有5个用户,筛选:
* 1.ID必须是偶数
* 2.年龄必须大于23岁
* 3.用户名转为大写字母
* 4.用户名字母倒着排序
* 5.只输出一个用户
*/
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);
//Lambda表达式,链式编程,函数式接口,Stream流式计算
Stream.of(u1,u2,u3,u4,u5)
.filter(u->u.getId()%2==0)
.filter(u->u.getAge()>23)
.map(u->u.getName().toUpperCase())
.sorted(Comparator.reverseOrder())
.limit(1)
.forEach(System.out::println);
}
}
class User{
private int id;
private String name ;
private int age;
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public void setName(String name) {
this.name = name;
}
}
ForkJoin 工作窃取
维护的是双端队列,B的工作做完了,从A里面偷一个来执行
流式并行计算
package multithreading.stream;
import java.util.stream.LongStream;
public class TestStreamSum {
public static void main(String[] args) {
test3();
}
public static void test1() {
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 0; i <= 100_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println( sum + " 时间: " + (end-start));
}
public static void test3() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 100_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println( sum + " 时间: " + (end-start));
}
}
4.3. 异步回调
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class TestAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 有返回值的 supplyAsync 异步回调
// ajax,成功和失败的回调
// 返回的是错误信息
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+ "supplyAsync => Integer");
return 1024;
});
System.out.println(completableFuture.whenComplete((x1, x2) -> {
System.out.println("x1: " + x1); // 正常返回结果,错误返回null
System.out.println("x2: " + x2); // 错误信息,无错返回null
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get());
}
}