知识结构
线程管理
- 启动与中断
- 实现Runable与继承Thread类启动线程
- Callable与Future可以返回结果的线程
- interrupt
- 线程组、异常处理器
- 线程状态机
- NEW
- RUNABLE
- WAITING/TIME_WAITING 阻塞在AQS锁,或者线程sleep
- BLOCKED 阻塞在MONITOR锁
- TERMINATED
- 线程属性
- 优先级
- 守护线程
- ThreadLocal**
线程同步
- 核心
- AQS
- volatile, 防止指令重排序(内存屏障)/其他线程立即可见(Lock指令)
- 锁
- 互斥锁
- ReentrantLock,由AQS实现的可重入互斥锁, Lock.condition()作为条件变量
- Synchronized,由JVM实现的可重入互斥锁,wait/notify作为条件变量,引入了分级锁优化机制进行优化
- 读写锁
- ReentrantReadWriteLock 悲观读写锁,AQS实现
- StampedLock 乐观读写锁,AQS实现
- 互斥锁
- 同步器
- LockSupport
- 调用unsafe.park()与unsafe.unpark()来阻塞与唤醒线程,会释放cpu
- Posix线程库pthread中的mutex,condition来实现的,保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1
- CyclicBarrier
- 同步屏障,由ReentrantLock与Condition来实现
- 阻塞一组线程直到该组线程全部到达屏障点
- CountDownLatch
- 同步屏障,由ReentrantLock与Condition来实现
- 允许一个或多个线程一直等待,直到其它线程完成它们的操作
- Semaphore
- 由AQS实现的PV原语,限制同一时刻访问资源的最大线程数目
- LockSupport
- 原子变量
- Atomic
线程池
- 核心
- 防止不断地创建与销毁线程,影响系统性能;控制线程数目
- 全部由类ThreadPoolExecutor实现,其中可配置线程数量,任务队列类型,拒绝策略
- 分类
- 预定执行线程池
- newCachedThreadPool
- newFixedThreadPool
- newSingleThreadExecutor
- 非预定执行线程池
- newScheduledThreadPool
- newSingleThreadScheduledExecutor
- 预定执行线程池
- 执行控制器
- 控制任务组
- 依次获取执行完的每个任务
- 等待所有任务全部执行完成
- 等待任意一个任务执行完成
- Fork-Join 多线程分治法
- CompletableFuture 事件驱动,为异步任务注册回调函数**
- 控制任务组
线程安全数据结构
- 哈希表
- ConcurrentHashMap 更新数据时锁定每一个哈希桶
- ConcurrentSkipListMap 有序哈希表,由跳表实现
- HashTable 由synchronized锁定读写操作
- 列表
- CopyOnWriteArrayList 数组列表,修改线程会对底层数组进行复制
- CopyOnWriteArraySet 集合,修改线程会对底层数组进行复制
- ConcurrentSkipListSet 有序集合,由跳表实现
- 阻塞队列
- ConcurrentLinkedQueue,CAS操作节点头部
- size() 是要遍历一遍集合,慎用
- ArrayBlockingQueue,构造时需要指定容量,并且可以指定公平性
- 一个可重入锁,和该锁的notEmpty和notFull两个条件变量
- LinkedBlockingQueue,队列容量没有边界,但是也可以选择最大容量
- 两个可重入锁,与两个锁分别对应的notEmpty和notFull两个条件变量
- SynchronousQueue 写数据会阻塞直到另一个线程读数据为止,反之亦然
- PriorityBlockingQueue,带优先级的队列,元素按照他们的优先级被移出
- DelayQueue
- TransferQueue,允许生产者线程等待,直到消费者可以接受一个元素
- ConcurrentLinkedQueue,CAS操作节点头部
多线程算法
- Arrays.parallelSort
- Arrays.parallelPrefix
- Arrays.parallelSetAll
启动与终止线程
启动线程
public class Foo implements Runnable {
@Override
public void run() {
System.out.println("Running!");
}
public static void main(String[] args) {
Foo foo = new Foo();
Thread thread1 = new Thread(foo, "Thread1");
Thread thread2 = new Thread(foo, "Thread2");
thread1.start();
thread2.start();
}
}
public class Bar extends Thread {
@Override
public void run() {
System.out.println("Running!");
}
public static void main(String[] args) {
Bar bar = new Bar();
bar.start();
}
}
java.Iang.Thread
- Thread(Runnable target ), 构造一个新线程, 用于调用给定目标的 run() 方法
- void start(), 启动这个线程, 将引发调用 mn() 方法。这个方法将立即返回, 并且新线程将并发运行
- 调用关联 Runnable 的 run 方法
java.Iang.Runnable
void run(),必须覆盖这个方法, 并在这个方法中提供所要执行的任务指令
中断线程
当线程的 run 方法执行方法体中最后一条语句后, 并经由执行 return 语句返冋时, 或者出现了在方法中没有捕获的异常时,线程将终止。在java新的版本中,没有可以强制终止线程的方法(stop方法被废弃,因为会损坏线程安全);推荐使用interrupt方法可以用来请求终止线程。
当对一个线程调用 interrupt 方法时,线程的中断状态将被置位,这是每一个线程都具有的 boolean 标志。线程都应该不时地检査这个标志, 以判断线程是否被中断。
java.Iang.Threadvoid interrupt(),向线程发送中断请求。线程的中断状态将被设置为 true。如果目前该线程被一个 sleep
调用阻塞,那么, InterruptedException 异常被抛出。
- static boolean interrupted(),测试当前线程(即正在执行这一命令的线程)是否被中断。注意,这是一个静态方法。这一调用会产生副作用—它将当前线程的中断状态重置为 false。
- boolean islnterrupted(),测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。
- static Thread currentThread(),返回代表当前执行线程的 Thread 对象。
线程状态
Java多线程系统采用抢占式调度,每个线程由系统来分配执行的时间;线程从出生到结束在以下状态中不断转换;当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它:
NEW
- 通过new()操作符创建
- 程序还没有开始运行程序中的代码。
RUNNABLE
- 调用start()方法,线程处于runnable状态
- 一个running状态的线程可能正在运行也可能在等待运行,这取决于操作系统的调度
- Thread.yield()?
BLOCKED
- 当一个线程试图获取一个内部对象的锁,而该锁被其他线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态。
WAITING
- 当线程等待另一个线程通知调度器一个条件时, 它自己进入等待状态
- Object.wait()
- Thread.join()
- 等待 java.util.concurrent 库中的 Lock 或 Condition
- LockSupport.park()
TIMED_WAITING
- 有几个方法有一个超时参数。调用它们导致线程进人计时等待(timed waiting) 状态。
- Thread.sleep
- Object.wait
- Lock.tryLock
TERMINATED
- 因为run方法正常退出而自然死亡
- 因为一个没有捕获的异常终止了 run 方法而意外死亡**
线程属性
优先级
每一个线程有一个优先级,一个线程继承它的父线程的优先级。可以使用setPriority方法提高或降低任何一个线程的优先级。
每当线程调度器有机会选择新线程时, 它首先选择具有较高优先级的线程。但是,线程优先级是高度依赖于系统的。当虚拟机依赖于宿主机平台的线程实现机制时, Java 线程的优先级被映射到宿主机平台的优先级上, 优先级个数也许更多,也许更少。谨慎的使用优先级,如果有几个高优先级的线程没有进入非活动状态,低优先级的线程可能永远也不能执行。
java.lang.Thread
- void setPriority(int newPriority),设置线程的优先级
- static int MIN_PRIORITY,线程的默认优先级,值为1
- static int NORM_PRIORITY,线程的默认优先级,值为5
- static int MAX_PRIORITY,线程的默认优先级,值为10
- static void yield(),导致当前执行线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高
的优先级,那么这些线程接下来会被调度。
守护线程
当我们在Java程序中创建一个线程,它就被称为用户线程。守护线程有一个非常明显的特征,当用户线程结束的时候deamon线程将会自动退出。一个守护线程创建的子线程依然是守护线程。
Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC (垃圾回收器)。 使用Thread类的setDaemon(true)方法可以将线程设置为守护线程。
java.lang.Thread
- void setDaemon( boolean isDaemon )
- void join(),等待终止指定的线程
- void join(long millis),等待指定的线程死亡或者经过指定的毫秒数
异常处理器
线程的 run方法不能抛出任何受查异常, 但是非受査异常会导致线程终止。
但是,不需要任何 catch 子句来处理可以被传播的异常。相反, 就在线程死亡之前, 异常被传递到一个用于未捕获异常的处理器。我们可以通过实现 Thread.UncaughtExceptionHandler 接口来自定义异常处理器,通常使用默认的处理器就可以了。
ThreadGroup
如果不安装默认的处理器, 默认的处理器为空。 但是, 如果不为独立的线程安装处理器,此时的处理器就是该线程的 ThreadGroup 对象。线程组是一个可以统一管理的线程集合。默认情况下,创建的所有线程属于相同
的线程组, 但是, 我们也可以自己建立分组。ThreadGroup 类实现 Thread.UncaughtExceptionHandler 接口。 它的 uncaughtException 方法做如下操作:
- 如果该线程组有父线程组, 那么父线程组的 uncaughtException 方法被调用。
- 否则, 如果 Thread.getDefaultExceptionHandler 方法返回一个非空的处理器, 则调用该处理器。
- 否则, 如果 Throwable 是 ThreadDeath 的一个实例, 什么都不做。
- 否则,线程的名字以及 Throwable 的栈轨迹被输出到 System.err 上
java.lang.Thread
- static void setDefaultUncaughtExceptionHandler(Thread.UncaughtException,Handler handler)
- static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler)设置或获取未捕获异常的默认处理器
- currentThread().getThreadGroup().list() 打印当前线程组中的线程
线程同步
在大多数实际的多线程应用中, 两个或两个以上的线程需要共享对同一数据的存取。这时候就需要一些机制来解决这样的竞争条件。
“同”字应是指协同、协助、互相配合。即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态。
可重入锁
ReentrantLock锁是可重入的,锁保持一个持有计数( holdcount ) 来跟踪对 lock 方法的嵌套调用。线程在每一次调用 lock 都要调用 unlock 来释放锁。因为可能想要保护需若干个操作来更新或检查共享对象的代码块。要确保这些操作完成后, 另一个线程才能使用相同对象。
下面结构,确保任何时刻只有一个线程进人临界区。一旦一个线程封锁了锁对象, 其他任何线程都无法通过lock 语句。
private Lock myLock = new ReentrantLock();
myLock.lock();
try {
critical section
} finally{
myLock.unlock();
}
java.util.concurrent.locks.ReentrantLock
- void lock(), 获取这个锁,如果锁同时被另一个线程拥有则发生阻塞。
- void unlock(), 释放这个锁
- ReentrantLock(boo1ean fair),构建一个带有公平策略的锁。一个公平锁偏爱等待时间最长的线程。但是,这公平的保证将大大降低性能。
条件变量
通常, 线程进入临界区,却发现在某一条件满足之后它才能执行。要使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程,一旦一个线程调用 await方法, 它进入该条件的等待集,并处于阻塞状态。
直到另一个线程调用同一条件上的 signalAll 或signal方法时为止,当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。同时, 它们将试图重新进获取锁,一旦锁成为可用的,它们中的某个将从 await 调用返回, 获得该锁并从被阻塞的地方继续执行。
package com.example.demo;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
public class ConditionTest {
private Lock lock;
private Condition condition;
public ConditionTest() {
lock = new ReentrantLock();
condition = lock.newCondition();
}
public void run(Integer index) {
try {
if (index == 0) {
Thread.sleep(4000);
}
lock.lock();
System.out.println(index.toString() + " get Lock");
if (index != 0) {
condition.await();
System.out.println(index.toString() + " is awake!");
} else {
condition.signalAll();
}
} catch (Exception e) {
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws Exception {
ConditionTest conditionTest = new ConditionTest();
for (int i = 0; i < 3; i++) {
class R implements Runnable {
private Integer index;
public R(Integer index) {
this.index = index;
}
@Override
public void run(){
conditionTest.run(index);
}
}
R r = new R(i);
Thread t = new Thread(r);
t.start();
}
}
}
java.util.concurrent.locks.Lock
- Condition newCondition() 返回一个与该锁相关的条件对象
java.util.concurrent.locks.Condition
- void await() , 进入该条件的等待集, 直到线程从等待集中移出或等待了指定的时间之后才解除阻塞。如果因为等待时间到了而返回就返回 false, 否 则 返 回 true。
- void signalAll(),解除该条件的等待集中的所有线程的阻塞状态
- void signal(),从该条件的等待集中随机地选择一个线程, 解除其阻塞状态
void awaitUninterruptibly( ),如果被中断并抛出InterruptedException异常
synchronized
Lock 和 Condition 对象
锁用来保护代码片段, 任何时刻只能有一个线程执行被保护的代码。
- 锁可以管理试图进入被保护代码段的线程。
- 锁可以拥有一个或多个相关的条件对象,每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
synchronized 中的lock
synchronized无需手动获取锁和释放锁,发生异常会自动解锁,不会出现死锁
// 以实例对象作为锁
public synchronized void method() {
}
等价于
public void method() {
synchronized(this) {
}
}
// 以类对象作为锁,没有其他线程可以调用同一个类的这个或任何其他的同步静态方法
// 获取到的锁是属于类的锁(类的字节码文件对象)
public static synchronized method() {}
等价于
public void method() {
synchronized(this.getClass()) {
}
}
synchronized 中的Condition
内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中, notifyAll() /notify方法解除等待线程的阻塞状态。换句话说,调用 wait 或 notifyAll等价于intrinsicCondition.await(),intrinsicCondition.signalAll()
java.lang.Object
- void notifyAll(),解除那些在该对象上调用 wait 方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。如果当前线程不是对象锁的持有者,该方法拋出一个 IllegalMonitorStateException异常。
- void notify().随机选择一个在该对象上调用 wait 方法的线程, 解除其阻塞状态。该方法只能在一个同步方法或同步块中调用。 如果当前线程不是对象锁的持有者, 该方法抛出一个IllegalMonitorStateException 异常。
- void wait(),导致线程进人等待状态直到它被通知。该方法只能在一个同步方法中调用。如果当前线程不是对象锁的持有者, 该方法抛出一个IllegalMonitorStateException 异常。
void wait(long millis),导致线程进入等待状态直到它被通知或者经过指定的时间。这些方法只能在一个同步方
法中调用客户端锁
有的时候synchronized关键字有以下使用方法,使用一个对象的锁来实现额外的原子操作, 实际上称为客户端锁定。
public void transfer(Vector<Double> accounts, int from, int to, int amount) { synchronized (accounts) { accounts.set(from, accounts.get(from) - amount): accounts.set(to, accounts.get(to) + amount); } }
原理
synchronized 关键字自动提供一个锁以及相关的条件,使用一种嵌人到 Java 语言内部的机制,java中每个对象都有一个内部锁,如果一个方法用 synchronized关键字声明, 线程必须获得内部的对象锁。
内部锁是有20世纪70年代提由Tony Hoare和Per Birth Hansen提出的监视器概念的简单实现。Monitor是 Java中用以实现线程之间的互斥与协作的主要手段,它可以看成是对象或者 Class的锁。每一个对象都有,也仅有一个 monitor。
进入区(Entrt Set):表示线程通过synchronized要求获取对象的锁。如果对象未被锁住,则迚入拥有者;否则则在进入区等待。一旦对象锁被其他线程释放,立即参与竞争。
- 拥有者(The Owner):表示某一线程成功竞争到对象锁。
- 等待区(Wait Set):表示线程通过对象的wait方法,释放对象的锁,并在等待区等待被唤醒。
volatile
volatile用来标记一个变量,来保证多线程中对该变量读写的线程安全。
- 保证线程可见性,当一条线程修改了这个变量的值,新值对于其他线程来说是立即可见的。在JVM中当一个线程修改了这个变量的值后强制将线程的工作内存刷新到主内存,并通知其他线程的工作内存此变量过期,重新从主内存中获取。
编译器可以改变指令执行的顺序以使吞吐量最大化,这会导致其他线程看对方代码会产生乱序,volatile使用内存屏障方式阻止这种场景的发生。
缺点
volatile不能确保以下场景线程安全,仍然需要通过加锁来保证原子性:
运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值,例如public voidflipDone() {done = !done}
- 变量不需要其他状态变量来参与不变约束。
```java
volatile的典型使用场景
volatile boolean shutdownRequested;
public void shutdown() { shutdownRequested = true; }
public void doWork() { while (!shutdownRequested) {
# do something
}
}
<a name="qci3P"></a>
## Atomic
提供了很多类使用了很高效的机器级指令(而不是使用锁)来保证其他操作的原子性。
- AtomicReference
<a name="kMSpV"></a>
## 死锁
一个典型的死锁程序:
```java
package com.example.demo.dao;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
public class Foo {
private Lock leftLock;
private Lock rightLock;
public Foo() {
leftLock = new ReentrantLock();
rightLock = new ReentrantLock();
}
public void run1() {
leftLock.lock();
try {
Thread.sleep(1000);
rightLock.lock();
try {
System.out.println("Running 1");
} finally {
rightLock.unlock();
}
} catch (Exception e) {
}
finally {
leftLock.unlock();
}
}
public void run2() {
rightLock.lock();
try {
Thread.sleep(1000);
leftLock.lock();
try {
System.out.println("Running 2");
}
finally {
leftLock.unlock();
}
} catch (Exception e) {
} finally {
rightLock.unlock();
}
}
public static void main(String[] args) throws Exception {
Foo foo = new Foo();
class R1 implements Runnable {
@Override
public void run() {
foo.run1();
}
}
class R2 implements Runnable {
@Override
public void run() {
foo.run2();
}
}
R1 r1 = new R1();
R2 r2 = new R2();
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
}
线程局部变量**
java.lang.ThreadLocal
值得注意的是:在线程池中如果在线程是被循环使用,使用ThreadLocal不及时清理,会获取之前线程的数据
class Test {
private static ThreadLocal<Integer> params = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 100;
}
};
private int get() {
return params.get();
}
private void set(Integer value) {
t.set(value);
}
}
锁的超时
线程在调用 lock 方法来获得另一个线程所持有的锁的时候,很可能发生阻塞。tryLock 方法试图申请一个锁, 在成功获得锁后返回 true, 否则, 立即返回false, 而且线程可以立即离开去做其他事情。
lock方法无法被中断,但是tryLock可以被中断并抛出InterruptedException异常,这一特性允许程序打破死锁。
java.util.concurrent.locks.Lock
- boolean tryLock(),尝试获得锁而没有发生阻塞,如果成功则返回真
- boolean tryLock(long time, Time unit),尝试获得锁阻塞时间不会超过给定的值,如果成功返回true
void lockInterruptibly(), 相当于一个超时时间为无限的死锁
读写锁
Java中内置的读写锁有以下特点:
读写锁允许读者读线程共享访问数据,但是对写锁的访问是互斥的
- 支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
- 不允许一个线程从读锁升级为写锁,但是允许从写锁降级为读锁(如果当先线程是写锁的持有者,并保持获得写锁的状态,同时又获取到读锁,然后释放写锁的过程)
- 读锁不允许newConditon获取Condition接口,而写锁的newCondition接口实现方法同ReentrantLock。
java.util.concurrent.locks.ReentrantReadWriteLock
- Lock readLock(),得到一个可以被多个读操作公用的锁,但是会排斥所有写操作
Lock wrietLock(),得到一个写锁,排斥其他所有读操作和写操作
AQS
在java中Lock的子类都是通过AQS实现的,所谓AQS就是就是指java.util.concurrent.locks包中的AbstractQueuedSynchronizer,AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架。
state状态
AbstractQueuedSynchronizer维护了一个volatile int类型的变量,用户表示当前同步状态。使用下面三种原子操作获取state
getState()
- setState()
- Unsafe.compareAndSetState()
阻塞队列
对于许多线程问题, 可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插人元素, 消费者线程则取出它们。
当试图向队列添加元素而队列已满, 或是想从队列移出元素而队列为空的时候, 阻塞队列(blocking queue ) 导致线程阻塞。阻塞队列会自动地负载均衡,如果第一个线程集运行得比第二个慢, 第二个线程集在等待结果时会阻塞。阻塞队列不但可以实现线程安全,还可以自动地负载均衡。
向队列添加元素或者删除元素提供了多种方法,可以根据试图向满队列里添加或从空队列中移除元素的响应方式来分为3类
- put, take 如果队列慢或者空操作阻塞,向队列中添加或者删除元素
- add, remove 和 element 如果队列慢或者空操作抛出异常
- offer, poll 和 peek返回布尔值是否可以添加
线程安全的集合
锁机制
如果多线程要并发地修改一个数据机构,会很容易破坏这个数据结构;可以通过提供锁来来保护共享数据结构,但是选择线程安全的实现作为代替会更加容易,java中提供了了许多线程安全的数据结构,这些集合通过复杂的算法,通过允许使用并发的访问数据结构的不同部分来使竞争最小化。
- 集合返回弱一致性的迭代器, 当构建一个迭代器的时候, 它包含一个对当前数组的引用。如果数组后来被修改了,迭代器仍然引用旧数组,但是,集合的数组已经被替换了。因而,旧的迭代器拥有一致的(可能过时的)视图,访问它无须任何同步开销
自定义并发读写数,默认支持16线程同时读写,超过后会阻塞线程
ConcurrentHashMap
HashTable对全局进行加锁来实现线程安全,效率比较低,ConcurrentHashMap利用了很多精巧的设计来减少锁竞争带来的影响。
JDK 1.7采用的拆分锁的思想,为一段连续的桶分配一个ReentrantLock来提升并发效率。
JDK 1.8将分段锁的粒度细分到每个哈希桶上,使用synchronized与CAS对每一个桶进行更新操作
写时复制
每次对集合进行修改时,都会复制一份底层数组出来修改之后再替换原有数组;读操作仍然只用原有镜像进行读取数据。
CopyOnWriteArrayList
- CopyOnWriteArraySet
Callable 与 Future
继承Thread或者实现Runable接口来执行线程无法返回结果。Callable与Future接口提供了一种方案,使用线程来执行任务并获取结果。
由于Callable接口有返回结果,我们就要用到Future对象,执行完Callable任务后会返回Future对象,我们可以根据Future对象来获取任务执行的结果和状态。JDK中还提供了FutureTask包装类,将一个Callable对象包装成Runable和Future类,可以直接通过线程执行,并通过此对象查看返回的结果。
java.util.concurrent.Callable
- V call() 运行一个将产生结果的任务
java**.util.concurrent.Future<V>**
- V get()
- V get(long time, TimeUnit unit) 获取结果, 如果没有结果可用, 则阻塞直到真正得到结果超过指定的时间为止
- boolean cancel(boolean maylnterrupt)
- boolean isCancelled() 如果任务在完成前被取消了,则返回true
- boolean isDone()
java.util.concurrent.FutureTask
- FutureTask(Callable
task) - FutureTask(Runnable task, V result)
线程池
构建一个新的线程是有一定代价的, 因为涉及与操作系统的交互。如果程序中创建了大量的生命期很短的线程,应该使用线程池。另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。Executors提供了许多静态工厂方法来构建线程池
- newCachedThreadPool 必要时创建新线程,空闲线程会被保留 60 秒
- newFixedThreadPool 该池包含固定数量的线程,空闲线程会一直被保留
- newSingleThreadExecutor 只有一个线程的”池”, 该线程顺序执行每一个提交的任务
- newScheduledThreadPool 建一个大小无限的线程池,用于预定执行而构建的固定线程池
- newSingleThreadScheduledExecutor 用于预定执行而构建的单线程 “池”
ThreadPoolExecutor
Executor中提供的静态工厂方法都是调用ThreadPoolExecutor来实现的,一般来说当线程池核心数量不够时,新加入的任务会被存放在队列中,如果队列存满了,线程池会创建更多的线程,直到maximumPoolSize。如果还不足以处理新的任务,则面临一个丢弃策略,默认的丢弃策略是抛异常,线程池自动扩容。我们可以通过分析他的初始化函数来了解线程池的一些细节
- int corePoolSize,核心线程数
- maximumPoolSize,最大线程数
- keepAliveTime 当线程数大于核心线程时,空闲线程存活时间
- TimeUnit unit 时间的单位
- BlockingQueue
workQueue 任务队列,被添加到线程池中,但尚未被执行的任务;它可以是直接提交队列、有界队列、无界队列、优先队列 - ThreadFactory threadFactory 线程工厂,用于创建线程,一般用默认即可
- RejectedExecutionHandler handler 拒绝策略;当任务太多来不及处理时,如何拒绝任务
- CallerRunsPolicy :这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功。
- AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。
- DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。
- DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
非预定执行线程池
- 调用 Executors 类中静态的方法 newCachedThreadPool 或 newFixedThreadPool,newSingleThreadExecutor 。
- 调用 submit 提交 Runnable 或 Callable 对象,会返回一个Future对象
- 如果想要取消一个任务,那就要保存好返回的 Future对象
当不再提交任何任务时,对线程池对象调用shutdown,该线程池不再接受新的任务,当所有任务完成后,线程池中的线程死亡。
public class DemoApplication { public static void main(String[] args) { Runnable task = new Runnable() { @Override public void run() { System.out.println(System.nanoTime()); } }; ExecutorService pool = Executors.newCachedThreadPool(); Future result1 = pool.submit(task); Future result2 = pool.submit(task); } }
java.util.concurrent.Executors
- ExecutorService newCachedThreadPool() 返回一个带缓存的线程池, 该池在必要的时候创建线程, 在线程空闲 60 秒之后终止线程
- ExecutorService newFixedThreadPool(int threads) 返回一个线程池, 该池中的线程数由参数指定
- ExecutorService newSingleThreadExecutor() 返回一个执行器, 它在一个单个的线程中依次执行各个任务
java.util.concurrent.ExecutorService
- Future submit(Callable task)
- Future
submit(Runnable task, T result) - Future<?> submit(Runnable task) 提交指定的任务去执行
void shutdown() 关闭服务, 会先完成已经提交的任务而不再接收新的任务。
预定执行
ScheduledExecutorService 接口具有为预定执行任务而设计的方法。Executors 类的newScheduledThreadPool 和 newSingleThreadScheduledExecutor 方法将返回实现了 ScheduledExecutorService 接口的对象。可以预定 Runnable 或 Callable 在初始的延迟之后只运行一次,也可以预定一个 Runnable对象周期性地运行。
java.util.concurrent.ExecutorsScheduledExecutorService newScheduledThreadPool(int threads) 返回一个线程池, 它使用给定的线程数来调度任务
- ScheduledExecutorService newSingleThreadScheduledExecutor() 返回一个执行器, 它在一个单独线程中调度任务
java.util.concurrent.ScheduledExecutorService
- ScheduledFuture
schedule(Callable task , long time, Timellnit unit) - ScheduledFuture<?> schedule(Runnable task , long time , TimeUnit unit ) 预定在指定的时间之后执行任
- ScheduledFuture<?> scheduleAtFixedRate ( Runnable task , long initialDelay , long period, TimeUnit unit ) 预定在初始的延迟结束后, 周期性地运行给定的任务,如果任务阻塞会放入队列中
- ScheduledFuture<?> scheduleWithFixedDelay( Runnable task , longinitialDel y , long delay, TimeUnit unit)预定在初始的延迟结束后周期性地运行给定的任务, 在一次调用完成和下一次调用开始之间有长度为 delay 的延迟
控制任务组
任务控制组可以让我们一次性执行所有任务并有选择的获取其中的任务结果,我们可以等待一个最先返回任务的结果,也可以等在所有任务都完成。
java.util.concurrent.ExecutorCompletionService
- ExecutorCompletionService(Executor e) 构建一个执行器完成服务来收集给定执行器的结果
- Future
submit( Callable task ) - Future
submit(Runnable task , V result) 提交一个任务给底层的执行器 - Future
take() 移除下一个已完成的结果, 如果没有任何已完成的结果可用则阻塞 - Future
poll() - Future
poll(long time , TimeUnit unit) 移除下一个已完成的结果, 如果没有任何已完成结果可用则返回 null。 第二个方法将等待给定的时间
java.util.concurrent.ExecutorService
- T invokeAny(Collection
> tasks) - T invokeAny(Collection
> tasks , long timeout , TimeUnit unit) 执行给定的任务,返回其中一个任务的结果。若发生超时, 抛出一个Timeout Exception 异常 - List
> invokeAll (Collection > tasks) - List
> invokeAll ( Collection > tasks, long timeout, TimeUnit unit)执行给定的任务, 返回所有任务的结果。若发生超时, 拋出一个 TimecmtException 异常。
Fork-Join 框架
Fork-Join框架是分治法的体现,它将一个任务分解成两个部分,在分别以递归的方式处理这两部分直到处理完成。举个例子,假设想统计一个数组中有多少个元素满足某个特定的属性。可以将这个数组一分为二,分别对这两部分进行统计, 再将结果相加。Fork-join框架并不能保证这个任务的顺序,所以这个子任务必须是没有关联的。
fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。
class Counter extends RecursiveTask<Integer> {
protected Integer compute() {
if (to - from < THRESHOLD) {
// solve problem directly
} else {
int mid = (from + to) / 2;
Counter first = new Counter(va1ues, from, mid, filter);
Counter second = new Counter(va1ues, mid, to, filter);
invokeAll(first, second):
return first.join() + second.join();
}
}
}
CompletableFuture
实现异步处理任务方法的控制器,相当于简单的事件驱动。
同步器
同步器是一系列用于同步线程的有用工具,大部分都是由AQS实现的;在一些场合下,可以更好的完成锁的工作。
Semaphore
限制访问资源的线程总数。 它允许多个线程在同一时刻访问同一资源,但是需要限制在同一时刻访问此资源的最大线程数目
CountDownLatch
当一个或多个线程需要等待直到指定数目的事件发生 允许线程集等待直到计数器减为 0。
-
CyclicBarrier
允许线程集等待直至其中预定数目的线程到达一个公共障栅,然后可以选择执行一个处理障栅的动作
考虑大量线程运行在一次计算的不同部分的情形。当所有部分都准备好时,需要把结果组合在一起。当一个线程完成了它的那部分任务后, 我们让它运行到障栅处。一旦所有的线程都到达了这个障栅,障栅就撤销, 线程就可以继续运行。
- CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier
- 是由ReentrantLock和Conditon来实现的,其背后的原理是每个线程都获取锁中条件变量的await()方法来等待,并有同步屏障委维护一个计数器来监视等待线程的数量,直到随后一个线程执行完毕,就使用signalAll()方法来通知线程执行。
Exchanger
当两个线程在同一个数据缓冲区的两个实例上工作的时候, 就可以使用交换器( Exchanger) 典型的情况是, 一个线程向缓冲区填人数据, 另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。
LockSupport
LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,其阻塞线程的方法是park(),如果把一个线程比作一辆车的话,就是把这辆车停下来,而unpark()则可以使这台车重新启动。
LockSupport.park() 的实现原理是通过二元信号量做的阻塞,要注意的是,这个信号量最多只能加到1。我们也可以理解成获取释放许可证的场景。unpark()方法会释放一个许可证,park()方法则是获取许可证,如果当前没有许可证,则进入休眠状态,知道许可证被释放了才被唤醒。无论执行多少次unpark()方法,也最多只会有一个许可证。还有park()方法并不会抛出InterruptedException异常,在进程执行interrupt后线程会接着立即执行。
public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super(name);
}
@Override public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
if (Thread.currentThread().isInterrupted()) {
System.out.println("被中断了");
}
System.out.println("继续执行");
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(1000L);
t2.start();
Thread.sleep(3000L);
t1.interrupt();
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
参考:
synchronized实现:https://www.cnblogs.com/sevencutekk/archive/2019/09/21/11563367.html
JAVA的线程安全机制:https://www.jianshu.com/p/2c76c7101a2b
AQS实现细节:https://www.jianshu.com/p/282bdb57e343
https://www.cnblogs.com/waterystone/p/4920797.html
https://www.cnblogs.com/zaizhoumo/p/7749820.html
ad-hoc线程封闭 JSR-133
lockSupport的实现:https://cloud.tencent.com/developer/article/1198491
https://www.jianshu.com/p/f30ee2346f9f
ConcurrentHashMap的原理:http://ifeve.com/java-concurrent-hashmap-1/
http://ifeve.com/java-concurrent-hashmap-2/