基础构件

AbstractQueuedSynchronizer

工作中几乎不会直接用到,但AQS是JUC很多构件底层都会用到的,粗读
image.png
image.png
继承的这个父类其实没有太多东西

数据结构
在数据结构上,AQS有两个,一个是Node的双向链表
image.png

一个是ConditionObject的单向链表,但ConditionObject的内容本质也是Node节点,还实现Condition接口
image.png

**


实现原理**
image.png
本质还是使用了底层的Unsafe工具来支持CAS

核心函数
**

独占模式下的几个核心函数
image.png
image.png
注释说的很明白,tryAcquire是一个空方法,交个子类来实现,比如ReentrLock等
image.png

image.png
image.png
为什么这里要两次compareAndSetTail呢?
image.png

这里可以看到底层还是Unsafe的使用

看下获取失败和中断处理
image.png

  1. 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
  2. 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

休眠使用了LockSupport
image.png

ReentrantLock

image.png
image.png
底层使用的AQS,提供一个Sync的抽象类,然后公平锁和非公平锁分别使用自己的方法,来看下非公平锁的lock方法
image.png
先尝试以 CAS 方式修改 state 的值,若修改成功,则表示成功获取到锁,将 owner 设为当前线程;否则就执行 AQS 中的 acquire 方法。
image.png

公平锁的主要差距是判断主队列中是否有其他线程在等待,当没有其他线程在排队时再去获取,否则获取失败
image.png
image.png

ReentrantReadWriteLock

适用于读写分离的场景,读锁共享,写锁独占,其实这些分离都是尽可能的让锁粒度降低,比较合适读多写少的场景。这里主要看下怎么实现读共享的
image.png
这个ReadWriteLock仅仅只是接口定义,没有什么东西
image.png
image.png

还是继承了AQS,然后公平锁和非公平锁都分别继承Sync

image.png

image.png
AQS 的状态state是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数 (exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁。

image.png
获取读锁时,需要检查是否有线程持有写锁,如果没有读锁则只增加数量,实现读锁共享
image.png
获取写锁时,c为0说明,既没有读锁也没有写锁。非0则两者必有其一或者两者都有。
w等于0则说明有读锁,如果w不等于0而当前线程和持有写锁的线程不同则说明写锁不能获得,这样便实现了写锁的独享。

CountDownLatch

image.png

image.png
image.png
这个类其实非常简单,继承自AQS,然后使用共享锁,初始化传入的数量就是可以countDown的数量

CyclicBarrier

使用了ReentrantLock实现同步,与 CountDownLatch 相比,可以把后者理解为“一次性(one-shot)”的,而是“可循环”
image.png
image.png
初始化时 parties 和 count 的值相同(由构造器 parties 参数传入),之后每有一个线程调用 await 方法 count 值就减 1,直至 count 为 0 时(若不为 0 则等待),执行传入的回调函数 barrierCommand(若不为空),然后唤醒所有线程,并将 count 重置为 parties,开始下一轮操作。
image.png
Generation 表示代数,每次屏障(barrier)破坏之前属于同一代,之后进入下一代。

Semaphore

更像是令牌桶,可以作为限流器使用,即限制访问某个资源的线程个数,比如用于限制连接池的连接数
image.png
还是继承自AQS,还是通过操作state的数量来处理,acquire减令牌,release恢复令牌数量

小结:可以看出来,大部分构件都直接继承AQS,然后通过state字段来实现持有的线程数量判断和锁判断

返回与传入

Thread

类,提供了功能让线程开始、等待、中断等一系列的操作
image.png

image.png
可以看到存储线程的是一个数组,开始前把线程加到数组中,失败后清理掉,那如果是异常了呢,这个线程不是一直在数组中了吗?
start0是一个本地方法,JVM内部的实现

image.png
run方法只是对Runnable接口做了调用

Runnable

image.png
一个接口定义,没有返回值的

Callable

image.png
带返回值

Future

image.png
提供了对Runnable和Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果等操作。

FutureTask

image.pngimage.png
具备了Runnable和Future的双重特性,但线程本质的特性跟明显,当我们在使用线程池本质使用的就是FutureTask,这个内部类做了很多的增加,而不仅仅只是一个接口。

线程池

ThreadPoolExecutor

image.png

image.png
高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量,ctl 表示线程池的运行状态和线程池内部的线程数量。

线程的状态及转换

image.png

  1. RUNNING: 接受新的任务,并且处理任务队列中的任务;
  2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;
  3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;
  4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;
  5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。

image.png

任务提交流程

继承自AbstractExecutorService的submit方法提交任务,不管是Runnable还是Callble都会转换FutureTask,而核心函数就是execute,这个方法交个各自子类来实现。
image.png

image.png
这里可以看到有多次ctl.get()的操作,因为存在并发的可能,其大致流程如下图

image.png

image.png
addWorker函数用于新建一个线程,这个函数分为两部分功能一是增加workCount数量,然后才新建线程
image.png

image.png
新建的线程使用Worker进行了封装,同步使用了ReentrantLock,这里的步骤也比较简单,新建一个worker放到set中,添加成功就启动线程,失败则进行回退。

image.png
这里可以看到worker也继承自AQS,大致框架跟lock有点类似。

image.png
image.png
image.png

ScheduledThreadPoolExecutor

阿里内部是不允许使用Timer和TimerTask的,代码扫描和CR的时候会过不去
image.png
image.png

image.png

image.png
内部实现了一个延时队列,基于数组实现
image.png
image.png
通过period字段来控制是一次性还是周期性的

image.png

image.png
不管一次性还是周期性都会入队,然后执行,区别在于会不会设置下一次的执行时间
image.png

Executor、ExecutorService、ScheduledExecutorService、AbstractExecutorService

image.png
Executor
image.png
就一个简单的接口

ExecutorService
image.png

增加了一堆的方法,非常重要的是核心的接口有了返回值

ScheduledExecutorService
image.png
增加了定时执行的方法

AbstractExecutorService
image.png
**
这个是核心类
image.png
几个submit方法做的是,从Runnable、Callable到FutureTask的转换,然后调用子类实现的execute方法

invoeAll
image.png
处理这里是超时和任何一个处理有异常都会导致全部任务被cancel掉
image.png
取消就是做中断处理,因此我们看到invokeAll方法只有InterruptedException异常,而不是超时异常。
在应用中调用完invokeAll之后,我们可以循环取List,然后判断isDone方法是不是true决定后继的流程。
image.png


invokeAny
image.png

使用了ecs,本质ecs内部就是一个阻塞队列,这里把它当成增强的阻塞队列来看即可
image.png**

CompletionService

image.png
接口定义非常简单,提交和获取方法,非常适用于异步IO的场景。

ExecutorCompletionService

image.png
内部维护了一个阻塞队列用于存放Future

image.png

默认是用LinkedBlockingQueue,注意这里进行了executor的转换,本质还是使用了aes的功能
image.png
其实内部没有太多东西,主要我们不需要自己维护一个阻塞队列,和异步没有太多关系。


容器

ArrayBlockingQueue

image.png

image.png
image.png
image.png
注意这里每次入队都会signal
image.png
带超时参数的offer是可中断的,在队满时notFull会等待

image.png
image.png
这是为了处理并发情况下的iterator?
image.png

LinkedBlockingQueue

image.png

image.png
image.png
注意这里只有写锁的获取,同时这里的conditon的操作也更加细化了,避免每次都进行唤醒

image.png
image.png
image.png

PriorityBlockingQueue

image.png

image.png
add/put/带超时的offer都是基于offer函数
image.png
优先级队列的元素添加是不阻塞的?而读是阻塞的?
image.png

image.png
image.png

CopyOnWriteArrayList


image.png
声明极其耗内存

image.png

image.pngimage.png
写操作都是上了锁的,每次写操作都会进行拷贝复制
image.png

ConcurrentHashMap


工具

LockSupport

image.png
image.png

ForkJoinPool

Doug Lea的paper 《A Java Fork/Join Framework》
中文翻译版
image.png

Arrays.parallelSort使用了该线程池工具
image.png

image.png

分治、合并是核心思路

CompletableFuture

使用举例

ThreadLocal

原子类

AtomicInteger

image.png

image.png
image.png
没法看到底层源码,但是很明显是CAS的套路

AtomicLong

image.png

剩下的AtomicBoolean、AtomicIntegerArray、AtomicLongArray、AtomicReference、AtomicReferenceArray的实现原理都差不多,大概瞟一眼就可以了。

AtomicIntegerFieldUpdater

image.png
image.png

image.png

AtomicLongFieldUpdater

image.png
image.png
image.png

LongAdder

image.png

核心思路
image.png

image.png
image.png

image.png
image.png
image.png
image.png
不用过于关注这里的细节
image.png

AtomicMarkableReference、AtomicStampedReference

image.png
image.png
image.png
image.png