1 线程基础
1.1 概念区分
1.1.1 进程与线程
进程是操作系统进行资源分配和调度的基本单位 ,是指一个内存中运行的应用程序。
线程是处理器任务调度、执行的基本单位,一个进程可以分为一到多个线程。线程之间通信一般使用wait/notify 机制、Volatile 内存共享等。
1.1.2 并发与并行
并发指交替执行,任务之间是互相抢占资源的(单核心)。
并行指同时执行,任务之间不互相抢占资源的(多核心)。
1.2 创建线程的方法
1.2.1 继承Thread类
定义Thread类的子类,重写Thread类的run方法,设置线程任务(线程开启要做什么),创建该子类的对象,调用start方法
public class MyThread extends Thread{//继承Thread类
public void run(){
//重写run方法
}
}
public class Main {
public static void main(String[] args){
new MyThread().start();//创建并启动线程
}
}
1.2.2 实现runnable接口
实现runnable接口可以避免单继承的局限性增强了程序的扩展性,解耦
创建Runnable接口的实现类,重写run方法,设置线程任务(线程开启要做什么)
创建Thread类对象,传递Runnable接口的实现类对象,调用start方法,开启新的线程并执行run方法
public class MyThread2 implements Runnable {//实现Runnable接口
public void run(){
//重写run方法
}
}
public class Main {
public static void main(String[] args){
//创建并启动线程
MyThread2 myThread=new MyThread2();
Thread thread=new Thread(myThread);
thread().start();
//或者 new Thread(new MyThread2()).start();
}
}
1.2.3 实现Callable接口
实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask 是 Future 接口的实现类
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 在 FutureTask 中传入 Callable 的实现类
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 666;
}
});
// 把 futureTask 放入线程中
new Thread(futureTask).start();
// 获取结果
Integer res = futureTask.get();
System.out.println(res);
}
}
1.3 线程分类
java中的线程分为用户线程和守护线程
程序中的所有的用户线程结束之后,不管守护线程处于什么状态,java虚拟机都会自动退出
调用线程的实例方法setDaemon()来设置线程是否是守护线程,setDaemon()方法必须在线程的start()方法之前调用,在后面调用会报异常,并且不起效。
1.4 线程状态
1.5 线程方法
1.5.1 常用方法
---Thread类
start() //启动线程
run() //执行线程运行时代码
sleep() //使线程进入即使等待状态,时间一到自动唤醒
yield() //调用此方法的线程进行让步,使同优先级的线程更容易执行,不是一定的
join() //在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态
setPriority() //设置线程的运行优先级
getPriority() //获取线程的运行优先级
interrupt() //可以请求中断一个线程,但实际上并不是真的中断了线程,只是标识了一个中断状态
isInterrupted() //目标线程通过检测获取中断状态,再根据状态决定是否中断线程
---Object类
wait() //使对象线程等待
notify() //随机唤醒一个线程
notifyall() //唤醒全部
1.5.2 wait和sleep的区别
- wait 会释放锁,sleep 不会释放;
- wait 必须在同步代码块中,sleep没限制;
- wait 无异常,sleep有异常;
1.6 线程安全与同步
1.6.1 ThreadLocal的原理
//TODO TransmittableThreadLocal
ThreadLocal叫做线程变量,意思是ThreadLocal中填充的变量只属于当前线程,该变量对其他线程而言是隔离的。
- 每个线程维护着一个ThreadLocalMap的引用,ThreadLocalMap是ThreadLocal的内部类,用Entry来进行存储;
调用ThreadLocal的set()方法时,实际上就是往ThreadLocalMap设置值,key是ThreadLocal对象,value是传递进来的对象 ,调用ThreadLocal的get()方法时,实际上就是往ThreadLocalMap获取值ThreadLocal对象的值。
1.6.2 ThreadLocal的使用
```java private static ThreadLocal tl = new ThreadLocal<>();
public static void main(String[] args) throws Exception {
tl.set(1);
System.out.println(String.format("当前线程名称: %s, main方法内获取线程内数据为: %s",
Thread.currentThread().getName(), tl.get()));
fc();
new Thread(()->{
tl.set(2); //在子线程里设置上下文内容为2
fc();
}).start();
Thread.sleep(1000L); //保证下面fc执行一定在上面异步代码之后执行
fc(); //继续在主线程内执行,验证上面那一步是否对主线程上下文内容造成影响
}
private static void fc() {
System.out.println(String.format("当前线程名称: %s, fc方法内获取线程内数据为: %s", Thread.currentThread().getName(), tl.get()));
}
==================================================================================== 当前线程名称: main, main方法内获取线程内数据为: 1 当前线程名称: main, fc方法内获取线程内数据为: 1 当前线程名称: Thread-0, fc方法内获取线程内数据为: 2 当前线程名称: main, fc方法内获取线程内数据为: 1
<a name="8ffdd52e"></a>
#### 1.6.2 使用不可变对象
- 不可变对象天生就是线程安全的,如String类。
- 使用Final关键字修饰或Collection类的unmodifiableXXX相关方法,可以创建不可变对象。
<a name="bad57054"></a>
#### 1.6.3 关键字Synchronized
- Synchronized修饰的方法,锁是当前实例对象
- Synchronized修饰的静态方法,锁是当前类的Class对象
- Synchronized修饰的代码块,锁是Synchronized括号里配置的对象,**可以实现类级锁也可以实现对象锁**
```java
public class SynClass {
public static void main(String[] args) {
Object lock = new Object();
synchronized(SynClass.class){//实现类级锁
//TODO
}
synchronized(this){//实现当前实例对象的对象锁
//TODO
}
synchronized(lock){//实现某个指定对象的对象锁
//TODO
}
}
}
1.6.4 类锁lock
(1)经典使用
Lock 是一个接口,两个直接实现类:ReentrantLock, ReentrantReadWriteLock。
Lock l = ...;
l.lock(); // 上锁
try {
// access the resource protected by this lock
} finally {
l.unlock(); // 解锁
}
(2)ReentrantLock(重入锁)
ReentrantLock可重入锁,默认是非公平锁。
Lock对象中可以创建多个Condition监视器类(lock.newCondition),使用监视其中的await()、signal()方法可以实现选择性通知线程的等待/唤醒机制。
(3)ReetrantReadWriteLock(读写锁)
ReetrantReadWriteLock也是可重入锁,默认是非公平锁。
包含一个ReadLock (共享锁,AQS中的共享式)和 一个 WriteLock(排他锁,AQS中的独占式) ,读锁与读锁不互斥;读锁与写锁,写锁与写锁互斥
public class ReadWriteLockDemo {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
Java 8引入了新的读写锁,StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;
(4)synchronized、lock的区别
- 原始构成
synchronized时关键字属于jvm
Lock是具体类,是api层面的锁(java.util.) - 使用方法
sychronized不需要用户取手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用
ReentrantLock则需要用户去手动释放锁若没有主动释放锁,就有可能导致出现死锁现象,需要lock()和unlock()方法配合try/finally语句块来完成 - 等待是否可中断
synchronized不可中断,除非抛出异常或者正常运行完成
ReentrantLock可中断,设置超时方法tryLock(long timeout, TimeUnit unit),或者lockInterruptibly()放代码块中,调用interrupt()方法可中断。 - 加锁是否公平
synchronized非公平锁
ReentrantLock两者都可以,默认公平锁,构造方法可以传入boolean值,true为公平锁,false为非公平锁 - 锁绑定多个条件Condition
synchronized没有
ReentrantLock用来实现分组唤醒需要要唤醒的线程们,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
2.线程池
2.1 创建线程池
Java标准库提供了ExecutorService
接口表示线程池,常用实现类有:
- FixedThreadPool:线程数固定的线程池;容易堆积大量请求引起OOM;
- CachedThreadPool:线程数根据任务动态调整的线程池;同上;
- SingleThreadExecutor:仅单线程执行的线程池。创建大量的线程容易造成资源耗尽引起OOM;
默认线程池的实现类底层实现基于ThreadPoolExecutor类,但创建线程池时需强制使用ThreadPoolExecutor类。//使用方法 public class Main { public static void main(String[] args) { // 创建一个固定大小的线程池: ExecutorService es = Executors.newFixedThreadPool(4); for (int i = 0; i < 6; i++) { es.submit(new Task("" + i)); } // 关闭线程池: es.shutdown(); } }
2.2 ThreadPoolExecutor
2.2.1 七大参数
public ThreadPoolExecutor(
//线程池基本大小,默认启动线程的个数,当任务数超过此大小时,未处理任务会进入任务队列进行等待;
int corePoolSize,
//线程池最大大小,已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务;
int maximumPoolSize,
//线程存活保持时间,当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
long keepAliveTime,
//keepAliveTime 的时间单位
TimeUnit unit,
// 任务队列,用于传输和保存等待执行任务的阻塞队列,因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。
BlockingQueue<Runnable> workQueue,
// 线程工厂,一般使用默认
ThreadFactory threadFactory,
// 拒绝策略,一般使用默认
RejectedExecutionHandler handler) {}
2.2.2 四种拒绝策略
new ThreadPoolExecutor.AbortPolicy() // 默认,拒绝新任务的处理,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 直接由任务提交者执行这个任务
new ThreadPoolExecutor.DiscardPolicy() //丢弃新任务,不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy() //将丢弃最早的未处理的任务
2.4 ForkJoin
2.4.1 ForkJoin简介
- ForkJoin可以把一个大任务分拆成小任务并行执行,最后合并结果得到最终结果。
- 基于“分治”的算法,fork()分解任务,join()收集数据。
- 适合的是计算密集型的任务。
- 工作窃取,允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务,从而提高效率。
2.4.2 ForkJoin使用方法
创建一个ForkJoin的任务需要继承RecursiveAction和RecursiveTask并且实现compute方法。
使用ForkJoinPooll.execute(ForkJoinTask task) 进行执行。public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; // 1 private Long end; // 1990900000 private Long temp = 10000L; // 临界值 public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } // 计算方法 @Override protected Long compute() { if ((end-start)<temp){ Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; }return sum; }else { // forkjoin 递归 long middle = (start + end) / 2; // 中间值 ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); // 拆分任务,把任务压入线程队列 return task1.join() + task2.join(); } } } public static void test() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); // 提交任务 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum="+sum+" 时间:"+(end-start)); }
2.5 Future
2.5.1 Future
当我们提交一个Callable
任务后,我们会同时获得一个Future
对象,然后,我们在主线程某个时刻调用Future
对象的get()
方法,就可以获得异步执行的结果。在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
get()
:获取结果(可能会等待)get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间;cancel(boolean mayInterruptIfRunning)
:取消当前任务;isDone()
:判断任务是否已完成。
(1)子线程返回类型String,那主线程如何获取子线程中执行结果?
class Task implements Callable<String> {
public String call() throws Exception {
// 模拟子线程处理任务....
return longTimeCalculation();
}
}
(2)主线程通过 future.get()获取子线程中返回的数据
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
2.5.2 CompletableFuture
CompletableFuture
是实现了Future
的接口实现类、 在之前Future
也可以完成异步处理任务的能力, 但是获取其结果的时候(future.get())是阻塞式获取结果的。
CompletableFuture使用的是链式编程,常用方法有
- 转换(
thenCompose
) - 组合(
thenCombine
) - 消费(
thenAccept
) - 运行(
thenRun
)。 - 带返回的消费(
thenApplyAsync
)@Test public void test2() throws Exception { CompletableFuture<Object> completableFuture = null; for (int i = 0; i < 3; i++) { int j = i; long timeMillis = System.currentTimeMillis(); completableFuture = CompletableFuture.supplyAsync(new Supplier<Object>() { @SneakyThrows @Override public Object get() { System.out.println("线程名称:" + Thread.currentThread().getName()); if (j == 1) { //等于1让线程执行耗时8秒 System.out.println("进行了线程休眠"); TimeUnit.SECONDS.sleep(8); } System.out.println("耗时"+(System.currentTimeMillis()-timeMillis)); return "返回CompletableFuture参数" + j; } }); completableFuture.thenAccept((str)->{ System.out.println(str); }); //System.out.println(completableFuture.get()); } // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭: TimeUnit.MINUTES.sleep(1); }
3.JMM内存模型
3.1 概念
JMM,全名为Java Memory Model,即Java内存模型。它是一组概念规范。每条线程拥有各自的工作内存,工作内存中的变量是主内存中的一份拷贝,线程对变量的读取和写入,然后刷回到主存。
JMM的三个特征
- 原子性:原子性指的是一个操作是不可分割,不可中断的,一个线程在执行时不会被其他线程干扰。
- 可见性:可见性指当一个线程修改共享变量的值,其他线程能够立即知道被修改了。
- 有序性:Java内存模型中,允许编译器和处理器对指令进行重排序(happens-before原则),多线程下重排序会影响程序的正确运行。
3.2 实现
3.2.1 Volatile 关键字
Volatile关键字是轻量级的同步机制,可以禁止指令重排,保证可见性,不保证原子性。
- 禁止指令重排:JVM底层volatile是采用内存屏障,保证特定的操作的执行顺序。
- 保证可见性:当一个线程修改了这个共享变量的值,volatile 保证了新值能立即同步到主内存,其他线程每次使用前都会重新换取新值。
- 不保证原子性:当一个线程正在自身的工作内存修改某个变量,还没有完成赋值,另外一个线程也同时获取了该变量,也会造成数据的丢失。
使用场景
4 并发
4.1 并发基础
4.1.1 CAS
CAS(Compare And Swap),即比较并交换。CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。底层是sun.misc.Unsafe类的compareAndSwapInt()方法,compareAndSwapInt是借助C来调用CPU底层指令实现的。
应用java.util.concurrent.atomic
包下的类大多是使用CAS操作来实现的。以AtomicInteger作为示例。
AtomicInteger atomic = new AtomicInteger(123);
int expectValue = 124;
int updateValue = 321;
boolean flag = atomic.compareAndSet(expectValue,updateValue);
System.out.println(flag);
System.out.println(atomic.get());
缺陷
自旋CAS(不成功,就一直循环执行,直到成功)
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
一次性只能保证一个共享变量的原子性
可以使用原子引用AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。AtomicInteger是对整数的封装,而AtomicReference则对应普通的对象引用。- ABA问题,线程1准备用CAS将变量的值由A替换为C,在此之前,线程2将变量的值由A替换为B,又由B替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。可以使用AtomicStampedReference防止ABA,该类多维护了一个状态值。因此,即使对象值被反复读写,写回原值,只要状态值发生变化,就能防止不恰当的写入。
4.1.2 AQS
AQS,全称:AbstractQueuedSynchronizer,内部维护着FIFO双向队列
,即CLH同步队列。AQS依赖它来完成同步状态的管理(voliate修饰的state
,用于标志是否持有锁)。即AQS是基于CLH队列,用volatile修饰共享变量state,线程通过CAS去改变状态符,成功则获取锁成功,失败阻塞(park)则进入等待队列,等待被唤醒(unpark)。
同步状态的获取与释放
由于一个共享资源同一时间可以被一条线程持有,也可以被多个线程持有,因此AQS中存在两种模式,独占模式
和共享模式
。
独占模式是共享状态值state每次只能由一条线程持有,其他线程如果需要获取,则需要阻塞。如
ReentrantLock
。获取锁:acquire
/*独占式获取同步状态*/ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
释放锁:release
共享模式是共享状态值state每次可以由多个线程持有,如
CountDownLatch
和Semaphore
。获取锁:acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) // 获取失败,自旋获取同步状态 doAcquireShared(arg); }
释放锁:doReleaseShared
线程阻塞和唤醒
使用LockSupport的park和unpark,而LockSupport是Unsafe的封装,真正是使用Unsafe的park和unpark来对操作系统的线程进行阻塞和唤醒。
public class LockSupportTest {
public static void main(String[] args) {
Thread parkThread = new Thread(new ParkThread());
parkThread.start();
System.out.println("开始线程唤醒");
LockSupport.unpark(parkThread);
System.out.println("结束线程唤醒");
}
static class ParkThread implements Runnable{
@Override
public void run() {
System.out.println("开始线程阻塞");
LockSupport.park();
System.out.println("结束线程阻塞");
}
}
}
与wait和notify的区别
wait和notify都是Object中的方法,在调用这两个方法前必须先获得锁对象,但是park不需要获取某个对象的锁就可以锁住线程。notify只能随机选择一个线程唤醒,无法唤醒指定的线程,unpark却可以唤醒一个指定的线程。
4.2 原子操作类
4.3 并发工具类
4.3.1 CountDownLatch(倒计时)
一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行
// 计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总数是6,必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown(); // 数量-1
},String.valueOf(i)).start();
}countDownLatch.await();// 等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
4.3.2 CyclicBarrier(加法计数器)
N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7 ; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠");
try {
cyclicBarrier.await(); // 等待
} catch (InterruptedException e){
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
4.3.3 Semaphore(信号量)
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
try {
semaphore.acquire();// acquire() 得到
System.out.println(Thread.currentThread().getName()+"抢到车 位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车 位");
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}
4.4 并发集合类
4.4.1 ConcurrentHashMap
ConcurrentHashMap是线程安全版本的HashMap。采用 CAS 和 synchronized 来保证并发安全。
4.4.2 CopyOnWriteArrayList
线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector。
4.4.3 ConcurrentLinkedQueue
4.4.4 ConcurrentSkipListMap
4.4.5 ArrayBlcokingQueue
基于数组实现的有界的阻塞队列。初始化的时候必须设定上限,之后无法改变。内部以FIFO的顺序对元素进行存储
4.4.6 LinkedBlokingQueue
基于单向链表实现的阻塞队列,如果不指定容量,默认为Integer.MAX_VALUE,也就是无界队列。先进先出的顺序。支持多线程并发操作
4.4.7 PriorityBlockingQueue
一个支持优先级的无界阻塞队列。默认情况下元素 采取自然顺序升序排列。
4.4.8 DelayQueue
4.4.9 SynchronousQueue
即SynchronousQueue不存储任何元素,容量为0。每一次插入操作,必须等待其他线程的取出操作。而每一个取出操作也必须等待其他线程的插入操作。