JDK
并发
JUC的内容图如图所示,方便起见这里将synchronized和volatile也放在了这里
Atomic
提供原子性的更新基本类型、数组、引用、字段的功能,使用Unsafe的CAS操作实现。
Unsafe只提供了compareAndSwapObject、compareAndSwapInt、compareAndSwapLong三个方法,对于Booblean、char、float、double会先转换成int和long再进行操作。
对于数组更新,通过构造器将原数组传入到Atomic时,会将原数组复制一份,因此不会更改原数组。
对于字段更新,均为抽象类,所以需要静态方法newUpdater()创建一个更新器,设置要更新的类和属性,且属性必须使用public volatile修饰。
Lock
AQS
AbstractQueuedSynchronizer即队列同步器,其基本思想是获取锁失败时将线程封装成节点进入到自身的同步队列中等待,当持有锁的线程释放锁时会唤醒同步队列中的线程,使其重新尝试获取锁。
在AQS中,使用一个volatile修饰的state值代表锁的状态,state=0时代表无锁,state>0时代表有锁,state>1时代表重入。
基于AQS的锁的获取和释放流程如下:
- 尝试获取锁(子类提供实现,
CAS设置state=1),如果失败会检查是否是重入,如果是重入会增加state的值。 - 构建节点(分独占和共享两种类型),加入到同步队列中
- 在队列中阻塞前再尝试获取一次锁,如果没有获取到,
CAS设置前驱节点的waitStatus = Node.SIGNAL(只有前驱节点是该状态时,前驱节点才会唤醒后继节点) ,成功后线程陷入阻塞。如果当前队列为空,会先创建一个空的头节点(这个空的头节点其实就代表了当前持有锁的线程,因为在释放锁时会去唤醒头节点的后继节点)。 - 线程被唤醒时,从阻塞位置唤醒,重新进入循环,若前驱节点是头节点,则尝试获取锁,如果没有获取到或前驱不是头节点,则会再次进入阻塞状态。
- 持有锁的线程释放锁时,首先设置自己的
waitStatus = 0,代表已经唤醒其后继节点,然后唤醒其后继节点。
AQS中的try*方法均有子类提供实现,为单次获取/释放动作。当单次操作失败时会进入do*方法,对同步队列进行操作。
Condition
会创建一个新的等待队列,调用await的线程会被封装成Node放入等待队列中(调用的线程必定是持有锁的线程)。当signal时会将等待队列队首节点放入同步队列的队尾,然后唤醒该节点,让其加入到锁的竞争中。注意如果该节点前驱节点不是头节点,唤醒后仍会再次陷入阻塞。
ReentrantLock
基于AQS的独占锁,独占锁在获取锁后会设置独占线程,标识当前持有该锁的线程
公平
当同步队列中存在等待获取锁的线程时,新线程必须加入同步队列排队等待锁。
非公平
新线程可以与同步队列中等待的线程一起竞争锁,当新线程竞争失败时,加入到同步队列。处于同步队列中的线程必须按顺序被唤醒获取锁。因此非公平针对的是队列外和队列内之间的。
基于
ReentrantReadWriteLock
基于AQS的共享锁,写锁为独占锁,读锁为共享锁。当存在写锁时,读锁不可获取。当存在读锁时,写锁不可获取,但读锁可以获取。
使用state的高16位代表读锁个数,低16位代表写锁个数。
读写锁针对的主要是读锁和写锁以及写锁和写锁之间的互斥性
synchronized
一种独占锁,存在锁升级概念。锁升级是其优化的重要手段。
在JVM中,每个对象都对应着一个monitor,即互斥量。JVM通过字节码指令monitorenter和monitorexit来实现持有和释放monitor,持有了monitor即代表获取了对象的锁。编译过程会自动在同步代码块的头尾增加这两个指令
synchronized的实现基于对象头,是用来存储一些对象的额外数据的,这些数据与对象自身定义无关,因此为了节省空间,对象头里的内容是会动态改变的,在不同的情况下存储的内容不同。对象头中的MarkWord记录着对象的锁状态 ,根据锁竞争的情况,锁状态不同,对象头MarkWord中的内容也会不同。
| 存储内容 | 是否偏向 | 锁标志位 | 标志位对应状态说明 |
|---|---|---|---|
| 对象哈希码、对象分代年龄 | 0 | 01 | 无锁 |
| 指向栈中锁记录的指针 | 0 | 00 | 轻量级锁 |
| 指向互斥量(重量级锁)的指针 | 0 | 10 | 重量级锁 |
| 空 | 0 | 11 | GC标记,要被回收 |
| 偏向线程ID、偏向时间戳、对象分代年龄 | 1 | 01 | 偏向锁 |
加锁
当一个线程在获取锁时,会首先在自己的栈帧中开辟一块空间叫做LockRecord,然后将锁对象的MarkWord拷贝到这个LockRecord中,然后线程使用CAS修改对象MarkWord中的内容,如轻量级锁下会将其修改为线程自己的指针记录;如果修改成功,则成功持有锁;如果修改失败,说明当前有其他线程在一同进行锁的获取,发生了竞争且当前线程竞争失败
释放锁
线程将自己栈帧中的LockRecord复制回对象MarkWord中,如果成功,本次同步完成;如果失败,说明当前线程持有锁的过程中有其他线程进行了锁竞争,锁升级为了重量级锁,这时候当前线程释放锁的同时需要唤醒其他竞争线程
锁升级
我们现在假设有T1和T2两个线程进入代码同步块,锁对象为lock对象
- lock对象的
MarkWord此时为:lockHashCode | age | 0 | 01即无锁状态 T1,T2同时访问同步代码块,两个线程均在自己的栈中创建LockRecord并复制MarkWordT1使用CAS成功将MarkWord的内容修改成T1的指针地址,并将锁标志位置为00,即轻量级锁,此时MarkWord为T1 address | 00T2使用CAS修改MarkWord时,发现当前MarkWord中的内容为T1 address | 00,与预期值lockHashCode | age | 0 | 01不符,CAS失败;T2与T1在竞争锁时失败,T2进入自旋状态等待锁;T2自旋结束,再去访问MarkWord,发现锁标志仍为00,即锁仍由其他线程持有中,此时T2使用CAS将MarkWord修改为monitor address | 10,此时锁升级为重量级锁。同时T2进入阻塞状态,进入等待队列。- 此时有其他线程
T3来竞争锁,会直接被阻塞 T1将要释放锁,使用CAS将LockRecord复制回MarkWord,但发现当前MarkWord值为monitor address | 10与预期值T1 address | 00不符合,CAS失败;此时T1会将monitor address | 10修改为0 | 10释放锁并唤醒阻塞等待的线程T2被唤醒后重新进行锁的争夺T2与T3再竞争时,谁CAS将MarkWord由0 | 10替换为monitor address | 10,谁就持有锁,另一方会陷入阻塞状态;当释放锁时会修改回0 | 10释放锁,并唤醒阻塞等待的线程
要注意锁只能升级不能降级,因此当在第5步T2将锁升级为重量级锁后,lock对象的锁将永远处于重量级锁状态,不会再回到轻量级锁状态。
自旋锁
自旋锁是在获取锁失败时不立刻进入阻塞,而是仍持有CPU时间做一会无用功,再尝试获取锁。自旋锁仅存在于轻量级锁的周期中,与轻量级锁共生。当可以通过自旋获取到锁,锁不会升级,之后其他线程来获取锁也存在自旋状态;一旦锁升级为了重量级锁,自旋将不在,线程获取锁失败时会直接进入阻塞状态。
偏向锁
如果一个同步块只会被一个线程访问,每次进出代码块的加锁释放锁就是在浪费无用功,因此有了偏向的概念。
偏向锁开启的情况下,对象的MarkWord内容为0 | 0 | 0 | 1 | 10(无锁状态为0 | 0 | 0 | 01)
线程T1在获取对象的锁时,会先尝试获取偏向锁,对象的MarkWord会被T1修改为T1 ID | Epoch | age | 1 | 01,即偏向锁状态,偏向T1,T1下次进入同步块中,只需要检查一下MarkWord中是否为指向自身线程的偏向锁即可,如果是,则之后的操作将不使用任何同步操作。
如果T2进入同步代码块,发现对象的MarkWord的偏向线程不是自己,会尝试修改偏向为自己,即先判断偏向锁指定的线程是否还活着,如果没活则直接重偏向为自己;如果还活着将在系统安全点safepoint暂停T1线程,将锁标记为轻量级锁模式,并更新MarkWord指向T1的一条帧栈记录,偏向锁模式结束。T1释放锁时发现MarkWord不再为偏向模式且指针记录指向自己的帧栈记录,会知道存在了锁竞争升级为了轻量级锁,释放锁的同时会清除这条指针记录
轻量级锁的释放需要将LockRecord复制回对象头,偏向锁不同,直接释放掉自己栈中的LockRecord即可
volatile
使用volatile修饰的变量具备以下特性:
- 禁止编译时的指令重排序,保证有序性。
- 每次更新立即刷新到主存,每次访问必须去主存中获取,保证可见性。
- 无法保证原子性。
Util
CountDownLatch
其内部使用基于AQS的共享锁实现。在使用构造器创建new CountDownLatch(3)时,会设置state=3,即相当于持有了3把共享锁。在调用await()时,会尝试去获取共享锁,该方法实现为仅当state==0时才能获取成功,获取失败(即计数器未归零)时会构建线程节点丢到AQS的同步队列中阻塞等待。
每次调用countDown()方法时,都会释放一把共享锁,即让state--,当state==0时,会唤醒同步队列中等待的线程,线程苏醒后成功获取共享锁(此时state==0)从而进行后续动作。
CyclicBarrier
内存屏障使用了AQS以及Condition等待队列,在使用构造器创建new CyclicBarrier(index)时,线程调用await()时首先会获取到一个lock锁(没获取到的走锁竞争失败同步队列一套逻辑),然后进行index--操作,如果自减后不为0,则线程释放锁并进入等Condition等待队列中等待。
最后一个线程index--后归零,开始同步执行优先任务,优先任务执行完毕后,设置屏障为新年代标志本轮屏障结束。然后将等待队列中的线程节点移入同步队列中,并释放锁、唤醒同步队列首节点并返回。
首节点被唤醒后拿锁,检查到新年代开始知道本轮已经结束,释放锁并返回,唤醒后继节点。
Semaphore
内部同样基于AQS,在使用构造器创建new Semaphore(2)时,相当于设置了两把共享锁,每次acquire()时都会释放一把锁,如果锁没了就要进入同步队列阻塞等待。每次release(1)时都会再加一把共享锁,并唤醒同步队列的首节点。
可以看到这里的acquire()和release(1)对锁的操作模式反过来了,但是其基本原理是一致的。
Exchanger
Container
ConcurrentHashMap
线程安全的HashMap,基于CAS和synchronized保证线程安全。
JDK7中由于头插法导致的并发场景下产生闭环和数据丢失的问题,JDK8使用尾插法已经修复,但HashMap仍存在并发问题,即在并发插入元素时,当两个线程要插入的元素hash碰撞,会产生元素覆盖的情况,导致元素丢失(这种丢失与JDK7中由于头插法导致的丢失不同)。
针对并发,我们主要关注更新操作,如新增、删除、替换。
putVal
remove
replaceNode
transfer
helpTransfer
CopyOnWrite
对所有的更改操作均会先复制一份,在复制体上进行操作,然后再替换原容器。
CopyOnWriteArrayList
内部持有Object[]数组,所有修改操作均使用System.arraycopy方法复制数组,在复制数组上进行修改后再将原数组引用指向复制后的数组对象上。
CopyOnWriteArraySet
基于CopyOnWriteArrayList实现,内部持有一个CopyOnWriteArrayList实例,添加时调用其addIfAbsent()方法以保持元素的唯一性。
BlockingQueue
不支持NULL元素。消费者在获取元素时,如果队列为空,消费线程会在Condition等待队列中阻塞等待,当有元素入队时会唤醒消费者线程;在生产线程放入元素时,如果队列已满,则生产线程会在Condition等待队列中阻塞等待,当元素被取出时唤醒生产线程。(注意:Condition中的唤醒其实是将节点从等待队列中移到了同步队列中。)
ArrayBlockingQueue
线程安全有界阻塞队列,基于数组实现,添加删除操作使用一个ReentrantLock同步。内部维持一个putIndex、takeIndex、count分别代表放入索引、取出索引、元素个数。当索引达到数组长度时会置零从头重新开始。每次放入、取出元素时都会操作Count,因此可以简单的通过判断count==0 || count == capacity来决定是否阻塞。
LinkedBlockingQueue
线程安全无界阻塞队列,基于链表实现,添加删除操作使用两个ReentrantLock同步,分别为putLock和takeLock。使用两个锁进行同步操作的原因是基于链表构建节点需要额外的时间成本,分开加锁有利于性能提升。而ArrayBlockingQueue中没有额外时间成本,引入两个锁提升复杂度的同时不会带来性能的提升。
LinkedBlockingDeque
线程安全的双端阻塞队列,基于链表实现。
PriorityBlockingQueue
线程安全的无界阻塞队列,基于数组实现最小堆,元素必须实现Comparator接口,队首的元素为最小的元素,元素出队入队后会调整堆。
DelayQueue
线程安全的无界阻塞队列,基于优先级队列实现(非阻塞的优先级队列),元素必须实现Comparator接口。在消费时,队首元素的延时时间如果已经达到,则直接返回;若未达到,计算剩余时间并阻塞消费线程。
DelayedWorkQueue
ScheduledThreadPoolExecutor所提供的内部工作延迟队列
SynchronousQueue
LinkedTransferQueue
Future
RunnableFuture
FutureTask
使用Future包装Runnable和Callable任务,一共存在七种状态,四种状态变化流程。
NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED
需要注意的是,COMPLETING状态实际上上任务已完成,但是结果只暂未设置到Future对象中的一个状态,这个状态非常短,可以通过如下代码看到:
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
COMPLETING -> NORMAL的状态转变仅隔了中间一个赋值操作的时间。
因此对于FutureTask而言,任务在新建后,不存在中间正在计算的状态,COMPLETING实际上也已经完成了任务。
FutureTask使用Treiber Stack(无锁栈)维护阻塞线程,在set结果完成后的finishCompletion()中会移出并唤醒阻塞线程,并调用扩展子类方法done()
FutureTask同样时一个Runnable,复写了Run()如下:
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
可以看到FutureTask在Run()中进行了Future相关的设置,这也是线程池Submit()的实现方式,将Runnable包装为FutureTask返回并进行Run设置结果。
ListenableFutureTask
Spring提供的带回调函数的FutureTask,利用FutureTask在set结果完成后的finishCompletion()中调用扩展子类方法done(),在done()中根据成功/失败调用成功/失败监听器。
ScheduledFuture
ScheduledFutureTask
ScheduledThreadPoolExecutor的私有内部类,用于周期性重复执行的任务。利用FutureTask的runAndReset()方法,运行后不设置结果,将Future状态设置为NEW状态,然后计算下次运行的时间,并将任务重新放入线程池的工作队列中。
CompletableFuture
使用时可以自己指定线程池,如果不指定,则默认使用ForkJoinPool.commonPool线程池。
runAsync(Runnable);//开启一个异步任务supplyAsync(Supplier);//开启一个异步任务//---------------------------------------thenRun(Runnable);//前置结束后开启一个任务,无法获得前面任务的结果thenAccept(Consumer);//前置结束后开启一个任务,可以获得前面任务的结果thenApply(Function);//前置结束后开启一个任务,可以获得前面任务的结果并返回新结果whenComplete(BiConsumer);//完成时的消费行为thenCombine(CompletionStage,BiFunction);//前置和参数中CompletionStage 返回值传递给BiFunctionthenCompose(Function<? super T, ? extends CompletionStage);//前置返回值作为Function入参,注意这里Function的返回类型是一个CompletionStage//----------以上方法名后面加Async的重载方法提供异步操作allof(CompletableFuture...);anyof(CompletableFuture...);
ExecutorService
Thread
ThreadPoolExecutor
提供线程资源的管理,线程池的属性有
- coreSize:核心线程数,核心线程不会自动销毁,无任务时会保持空闲状态(
allowCoreThreadTimeOut设置后核心线程会超时销毁)。有任务时优先使用核心线程执行,若线程不够且核心线程数量没有达到该限制,则创建核心线程执行任务。 - maxSize:最大线程数量,当阻塞队列满了后,线程池会创建额外线程执行任务;该值代表了核心线程数+额外线程数的最大值。
- keepAliveTime:额外线程空闲时间最大值,超过该时间额外线程销毁。
- timeUnit:空闲时间的单位。
- BlockingQueue:当核心线程达到上限无法继续处理任务时,任务被投递到阻塞队列中。
- ThreadFactory:用于创建线程的工厂类。
- RejectHandler:当阻塞队列满了且达到最大线程数无法继续处理新任务时,指定的拒绝任务策略,常见的有:抛弃任务策略(分抛弃当前任务和最旧任务)、开启新线程执行策略、拒绝任务策略(抛异常)。
线程池的任务优先级为:创建核心线程执行-->放入阻塞队列-->创建额外线程执行-->拒绝策略
Worke
用于包装线程池工作线程,封装了Runnable以及Thread并继承了AQS实现了简单的互斥不可重入锁。Worker的作用主要是维持一系列中断状态。
线程池在接收新任务时,根据线程池当前状态决定是否要创建新Worker,Worker的创建是先通过CAS操作将worker数量成功加一 后新建线程封装为Worker,线程池使用一个HashSet存储所有持有的Worker对象,然后启动线程执行任务。Worker本身实现了Runnable,因此线程启动后执行任务时执行的时Worker自身的run()方法,而其run()方法交予外部runWorker()执行。
在新建Worker时,将Worker自身锁状态设置为了-1,而这种情况下时不允许中断的,只有当运行到runWorker()进行unlock()方法时,才能允许中断,如下:
void interruptIfStarted() {Thread t;//中断要求state>=0if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}Worker(Runnable firstTask) {//新建worker时被设置为了-1,因此无法中断setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);//在runWorker中进行了unlock,这里对state进行了置零,可以被中断了setState(0);return true;}
可见Worker可以进行中断控制,在开始进行任务之前不允许中断。
在正式执行任务时,会经过beforeExecute ->run -> afterExecute,其中前后两个都是用于扩展的。
Worker锁的状态只有0和1两种状态(初始时-1),因此是无法重入的。
ThreadPoolTaskExecutor
内部持有一个ThreadPoolExecutor,对其进行一层封装,提供了返回ListenableFutureTask的submit方法。
ScheduledThreadPoolExecutor
提供周期执行任务的功能,将任务封装为ScheduledFutureTask,执行完后计算下次执行时间并重新投入任务队列中。其任务队列基于延迟队列(即优先级队列,根据时间排序)。
ForkJoinPool
Collection
Map
HashMap
数据结构:数组+链表/红黑树,当链表长度达到8个转为红黑树,当长度降为6个转为链表。
NULL的hash为0,因此key和value均可以为NULL
HashSet
基于HashMap实现,value均为相同的占位对象,可以存放NULL,如果存放相同的元素,会返回false
HashTable
相当于加了synchronized的HashMap,性能低,线程安全。
WeakHashMap
WeakHashMap实现大部分与1.7版本的HashMap相同,但WeakHashMap中的Entry继承了WeakReference,key为弱引用。
弱引用对象在发生GC时会被回收,因此key被回收后会进入到ReferenceQueue中,注意这里回收的其实只是key,value没有被回收。WeakHashMap的get()、put()等方法都直接或间接调用了expungeStaleEntries()方法,这个方法会从ReferenceQueue中拿到被回收掉的key,然后去遍历清除对应的值,帮助GC回收value。
对于HashMap,当kye为null时其hash为0,固定位置为table[0],而在WeakHashMap中null会被包装成static object NULL_KEY = new Object(),会正常的求hash等操作,由于NULL_KEY是静态变量不在GC范围内,因此不会被回收,所以key为null的必须手动通过remove()来删除。
- 强引用:GC不回收
- 软引用:内存不足时回收
- 弱引用:垃圾回收时回收
- 虚引用:随时可能被回收
除强引用外后面三个均可与ReferenceQueue联合使用,当引用要被清理时会被放入这个队列中,然后可以从这个队列中拿到对应的值进行清理。虚引用必须与其结合使用,在放入队列中时可以做些额外处理,用来跟踪垃圾回收,虚引用无法用来访问对象。
ThreadLocalMap
作为ThreadMap的key,每个Thrad都持有一个ThreadMap,使用ThradLocal的弱引用包装作为key。由于只有key为弱引用,且没有使用ReferenceQueue,可能造成value无法回收导致内存泄漏。JDK8中在set()中会对Entry遍历,遇到key相同的或者key为NULL的,进行替换并返回(不用创建新的Entry),这样原来的value会失去引用从而可以被GC回收。
ThreadLocalMap的hash不是使用hashcode()计算,而是使用开放地址法,使用一个AtomicInteger累加一个魔值0x61c88647来获取下一个hashCode
List
ArrayList
数组实现,默认容量10,每次扩容一半。newCapacity = oldCapacity + (oldCapacity >> 1);
LinkedList
双向链表实现。
Vector
相当于加了synchronized的ArrayList,线程安全。
Queue
ConcurrentLinkedQueue
PriorityQueue
优先级队列,内部基于数组的最小堆,元素必须实现Comparator接口,队首元素为最小元素。
Stack
先入后出的一种数据结构。
Iterator
Iterable
实现该接口的可以使用foreach循环,该循环实际是使用迭代器进行迭代。
Iterator
- 内部维持一个游标来标明目前迭代的位置
- 根据游标与元素容量的大小比对来确定是否有下个元素
- 更新操作会同步修改操作数,如果是
foreach会根据操作数来禁止修改
IO
InputStream/OutputStream
Input是输入至程序中,Out是从程序中输出。
JDK提供了许多装饰器
BufferedInputStream:提供缓冲buff,可以一次性读取较多内容至buff中,提高IO性能FileInputStream:从文件中读取字节流ObjectInputStream:PipedInputStreamByteArrayInputStreamDataInputStreamFilterInputStream
Reader/Writer
面向字符,注意指定字符集编码,否则容易产生乱码。
BufferedReaderCharArrayReaderFileReaderInputStreamReaderPipedReaderStringReader
JVM
类加载
加载->验证->准备->解析(可能在初始化后)->初始化->使用->卸载
- 启动类加载器(Bootstrap ClassLoader)
使用C++实现,是虚拟机一部分。负责将存放在lib目录中的或者-Xbootclasspath参数指定的路径宏的合法类库加载到JVM内存中。该加载器无法被Java程序直接引用。 - 扩展类加载器(Extension ClassLoader)
负责加载lib\ext目录中的,或java.ext.dirs系统变量指定的路径中的所有类库,开发者可以直接使用该加载器。 - 应用程序类加载器(Application ClassLoader)
是ClassLoader的getSystemClassLoader返回值,又名系统类加载器,负责加载用户类路径(ClassPath)上所指定的类库,开发者可直接使用,一般为程序默认加载器。
类加载器的父子关系一般不以继承实现,而是使用组合实现。
双亲委派:若一个加载器收到类加载请求,先将请求委派给父类加载器完成,若父类加载器无法完成,则再自己加载。
该过程对程序的稳定性和安全性很重要,防止与系统类重名从而出现多个不同的类。
