取消和关闭
要做到安全、快速、可靠地停止任务或线程并不容易。
Java没有提供任何机制来安全地强迫线程停止手头的工作。
java提供了:
中断:一个协作机制,是一个线程能够要求另一个线程停止当前的工作。
一个线程给另一个线程发送信号,通知它在方便或者可能的情况下停止工作。
当要求它们停止时,它们首先会清除当前进程中的任务,然后再终止。(因为任务代码本身比发出取消请求的代码更明确应该清除什么)
6.1任务取消
当外部代码能够在活动自然完成之前,把它更改为完成状态,那么这个活动被称为可取消的。
一个可取消的任务必须拥有取消策略,这个策略详细说明关于取消的“how”、“when”、“what”:
- how:其他代码如何请求取消该任务
- when:任务在什么时候检查取消的请求是否到达
- what:响应取消请求的任务中应有的行为。
示例:
@ThreadSafe
public class PrimeGenerator implements Runnable {
@GuardedBy(this)
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
//when:考虑在什么时候检查取消请求
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
//how:其他请求该如何取消任务
//what:取消任务中的行为
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}
取消应用场景:
- 用户请求的取消:例如程序界面上的“关闭”按钮,或者通过管理接口请求取消。
- 限时活动:一个应用程序需要在有限时间内执行任务,超时就取消
- 应用程序事件:一个应用程序对问题空间进行分解搜索,当一个任务发现了解决方案,其他仍在工作的搜索就会被取消。
- 错误:当任务遭遇错误,那么所有的任务都会被取消,不过可以记录它们当前的状态,稍后重新启动。
- 关闭:当一个程序或者服务关闭时,必须对正在处理和等待处理的工作进行一些操作。(优雅的关闭)
6.1.1中断
在PrimeGenerator中,取消机制不是立刻发生的,如果我们在其中使用了一个阻塞方法,比如BlockingQueue.put,那么就可能永远不会检查到取消标志。
示例:
//不要这样做,当队列阻塞时,是检测不到取消标记的
public class PrimeGenerator implements Runnable {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
public PrimeGenerator(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
//这是可阻塞的
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void cancel() {
cancelled = true;
}
}
每一个线程都有一个boolean类型的中断状态。在中断时:这个中断状态被设置为true。
Thread包含其他用于中断线程的方法,以及请求线程中断状态的方法:
interrupt:中断目标线程
isInterrupted:返回目标线程的中断状态
静态的interrupted:清除当前线程中断状态,并返回它之前的值。
调用interrupt并不意味着必然停止目标线程正在进行的工作,它仅仅传递了请求中断的消息。
示例:
public class PrimeGenerator extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled;
public PrimeGenerator(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run(){
BigInteger p = BigInteger.ONE;
try {
while (!Thread.currentThread().isInterrupted()){
queue.put(p = p.nextProbablePrime());
}
}catch (InterruptedException e){
}
}
public void cancel(){interrupt();}
}
6.1.2中断策略
正如取消需要制订取消策略一样,中断也应该制定中断策略。
中断策略:
- 当发现中断请求时,应该做什么
- 哪些工作单元对于中断来说是原子操作
- 在多快的时间里响应中断。
区分任务和线程对中断的反应是很重要的。
一个单一的中断请求可能有一个或一个以上预期的接收者——在线程池中中断一个工作者线程,意味着取消当前任务,并关闭工作线程。
任务不会再自己拥有的线程中执行:它们借用属于服务的线程。代码如果并不是线程的所有者,就应该保存中断状态,这样所有者的代码才能够最终对其起到作用。
例如:大多数可阻塞的库函数,都是仅仅抛出InterruptedException作为中断响应。因为它们不可能自己运行再一个线程中,它们会尽可能的为异常信息让路,把它传递给调用者。
当检查到中断请求时,任务不需要立即放弃所有事情——它可以选择推迟。但是需要记得它已经被请求过中断了,完成当前正在进行的事情,然后抛出InterruptedException或者指明中断。
线程应该只能够被线程的所有者中断,所有者可以把线程的中断策略信息封装到一个合适的取消机制中。
因为每一个线程都有其自己的中断策略,所以你不应该中断线程,除非你知道中断对这个线程意味着什么。
6.1.3响应中断
当你调用可中断的阻塞函数时,比如 Thread.sleep 或者BlockingQueue.put,有两种处理InterruptedException的实用策略:
- 传递异常,使你的方法也成为可中断的阻塞方法 (throws InterruptedException)
- 或者保存中断状态,上层调用栈中的代码能够对其进行处理。
如果你不想或不能传递InterruptedException,需要寻找另一种方式保存中断请求,通常是再次调用interrupt来恢复中断状态
只有实现了线程中断策略的代码才可以接收中断请求,通用目的的任务和库的代码绝不应该接收中断请求.
//有一些活动不支持取消,却仍可能调用中断的阻塞方法,那么它们必须在循环中调用这些方法,当发现中断后重新尝试
//在本地保存中断状态,并在返回前恢复状态
public Task getNextTask(BlockingQueue<Task> queue){
boolean interrupted = false;
try{
while(true){
try{
return queue.take();
}catch(InterruptedException e){
interrupted = true;
//失败并重试
}
}
}finally{
if(interrupted){
Thread.currentThread().interrupt();
}
}
}
6.1.4通过Future取消
ExecutorService.Submit 会返回一个Future来描述任务。Future有一个**cancel方法**,它**需要一个boolean类型的参数**(mayInterruptIfRunning),它的**返回值表示取消尝试是否成功**(这仅仅是告诉你它是否**能够接收中断**,而不是任务是否检测并处理了中断)。
除非你知道线程的中断策略,否则你不应该中断线程。
任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,所以当他们在标准Executor中运行时,通过它们的Future来取消任务是安全的。
public static void timeRun(Runnable r, long timeout, TimeUnit unit,)
throws InterruptedException{
Future<?> task = taskExec.submit(r);
try{
task.get(timeout, unit);
}catch(TimeoutException e){
//下面任务会被取消
}catch(ExecutionException e){
//task中抛出的异常:重抛出
}finally{
//如果任务已经结束,是无害的
task.cancel(true);
}
}
当Future.get抛出 InterruptedException 或TimeoutException 时,如果你知道不再需要结果时,就可以调用Future.cancel来取消任务
6.1.5处理不可中断阻塞
很多可阻塞的库方法通过提前返回和抛出InterruptedException来实现对中断的响应,这使得构建可以响应取消的任务更加容易了。
但是,并不是所有的阻塞方法或阻塞机制都响应中断:如果一个线程是由于进行同步Socket I/O 或者等待获得内部锁而阻塞的,那么中断除了能够设置线程的中断状态以外,什么都不能改变。
- java.io中的同步Socker I/O。InputStream 和 OutputStream 中的read和write 方法都不响应中断,但是通过关闭底层的Socket,可以让read或write所阻塞的线程抛出一个SocketException。
- java.nio中的同步I/O。中断一个等待InterrputibleChannel的线程,会导致抛出ClosedByInterruptException,并关闭链路(其他线程在这条链路的阻塞,也会抛出该异常)。
- Selector的异步I/O。如果一个线程阻塞于Selector.select方法,close方法会导致它通过抛出ClosedSelectorException提前返回。
- 获得锁。如果一个线程在等待内部锁,那么如果不能确保它最终获得锁,是不能停止它的。然而,显示Lock类提供了lockInterruptibly方法,允许你等待一个锁,并仍然能够响应中断。
示例:
public class ReaderThread extends Thread{
private final Socket socket;
private final InputStream in;
public ReaderThread(Socker socket) throws IOException{
this.socket = socket;
this.in = socke.getInputStream();
}
public void interrupt(){
try{
socket.close();
}catch(IOExcpetion e){
}finally{
super.interrupt();
}
}
public void run(){
try{
byte[] buf = new byte[BUFSZ];
while(true){
int count = in.read(buf);
if(count < 0)break;
else if(count > 0) processBuffer(buf, count);
}
}catch(IOException e){
//允许线程退出
}
}
}
6.1.6用newTaskFor封装非标准取消
在ReaderThread中,可以使用newTaskFor钩子函数来改进用来封装非标准取消的方法。
newTaskFor钩子是一个工厂方法,创建Future来代表任务,它返回一个RunnableFuture,这是一个接口,它扩展了Future和Runnable(并由FutureTask实现)
示例:
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
public abstract class SocketUsingTask<T> implements CancellableTask<T>{
@GuardedBy("this")private Socket socket;
protected synchronized void setSocket(Socket e){socket = e;}
@Override
public synchronized void cancel() {
try {
if (socket!=null)socket.close();
}catch (IOException e){}
}
//重写future的cancel方法,非标准取消,先取消socket连接,再中断线程
@Override
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this){
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
}finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
public class CancellingExecutor extends ThreadPoolExecutor {
//.......
//如果传入的Callable参数是我们重写的类的子类,则使用我们自定义的newTask();
//否则,使用父类的newTaskFor();
@Override
protected<T>RunnableFuture<T> newTaskFor(Callable<T> callable){
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
源码:
//ThreadPoolExecutor 继承于此类
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
//.......
}
6.2停止基于线程的服务
封装:你不应该操控某个线程——中断、改变优先级等等。除非你拥有这个线程。
拥有者:创建线程的类。
线程池拥有它的工作者线程,如果需要中断这些线程,那么应该由线程池来负责。
**应用程序拥有服务,服务拥有工作者线程,但是应用程序并不拥有工作者线程**。
所以,服务应该提供生命周期方法来关闭它自己,并关闭它所拥有的线程。
例如ExecutorService提供了shutdown和shutdownNow方法,其他线程持有的服务也应该提供类似的关闭机制。
对于线程持有的服务,只要服务的存在时间大于创建线程的方法存在的时间,那么就应该提供生命周期方法。
6.2.1关闭ExecutorService
ExecutorService提供了关闭的两种方法:使用shutdown、shutdownNow。
- shutdown:优雅的关闭,直到队列中的所有任务完成前,ExecutorService都不会关闭。
- shutdownNow:强制关闭,强制终结,速度会更快,但是风险大,因为任务很可能再执行到一半的时候被终结。
6.2.2致命药丸
另一种保证生产者和消费者服务关闭的方式是:**致命药丸**:**一个可识别的对象,置于队列中,意味着“当你得到它时,停止一切工作”**。
在**先进先出队列**中,**致命药丸保证了消费者完成队列关闭之前的所有工作**,因为早于致命药丸提交的工作都会在处理它之前就完成了。
示例:
public class IndexingService {
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
//生产者线程
class CrawlerThread extends Thread {
@Override
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e) {
}
}
}
}
private void crawl(File root) throws InterruptedException {
}
}
//消费者线程
class IndexerThread extends Thread {
@Override
public void run() {
try {
while (true) {
//如果取出的对象为致命药丸对象,则停止
File file = queue.take();
if (file == POISON) break;
else indexFile(file);
}
} catch (InterruptedException e) {
}
}
private void indexFile(File file) {
}
}
public void stop() {
producer.interrupt();
}
public void auaitTermination() throws InterruptedException {
consumer.join();
}
}
这个方法同样也可以扩展为多个消费者使用,只要让生产者向队列置入N(消费者数量)个药丸。不过这在生产者和消费者的数量较大时难以处理。
致命药丸只有在无限队列中工作时,才是可靠的。
6.2.3示例:只执行一次的服务
如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么它可以通过使用私有的Executor来简化服务的生命周期管理。(在这种情况下,通常会用到invokeAll和invokeAny方法)。
//检查是否有新的邮件,当所有检查邮件的任务完成后,他会关闭Executor,并等待结束。
boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final AtomicBoolean =new AtomicBoolean(false);
try {
for (final String host : hosts) {
executorService.execute(() -> {
if (checkMail(host)) hasNewMail.set(true);
});
}
} finally {
executorService.shutdown();
executorService.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
6.2.4shutdownNow的局限性
当通过**shutdownNow强行关闭一个ExecutorService时**,它**试图取消正在进行的任务**,并**返回那些已经提交、但并没有开始的任务清单**,这样,这些任务可以被日志记录,或者存起来等待进一步处理。
但是,我们并**没有任何常规的方法**,用于**找出那些已经开始,却没有结束的任务**。
//通过封装ExecutorService并使用execute,来记录哪些任务是在关闭后取消的。
//TrackingExecutor存在竞争条件,可能会产生假阳性现象:识别出的被取消任务事实上可能已经结束,产生的原因是在任务执行的最后一条指令,以及线程池记录任务结束之间,线程池发生了关闭(任务也就结束了)。
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService executorService;
private final Set<Runnable> tasksCancelledAtshutdown = Collections.synchronizedSet(new HashSet<Runnable>());
//.......
public List<Runnable> getCancelledTasks() {
if (!executorService.isTerminated())
throw new IllegalStateException("......");
return new ArrayList<Runnable>(tasksCancelledAtshutdown);
}
@Override
public void execute(Runnable runnable) {
executorService.execute(() -> {
try {
runnable.run();
} finally {
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtshutdown.add(runnable);
}
}
});
}
//其他要实现的方法委托给executorService
}
使用TrackingExecutor
public abstract class WebCrawler {
private volatile TrackingExecutor executor;
@GuardedBy("this")
private final Set<URL> urlsToCrawl = new HashSet<URL>();
//......
public synchronized void start() {
executor = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() {
try {
saveUncrawled(executor.shutdownNow());
if (executor.awaitTermination(TIMEOUT, UNIT)) {
saveUncrawled(executor.getCancelledTasks());
}
} finally {
executor = null;
}
}
private void saveUncrawled(List<Runnable> shutdownNow) {
for (Runnable task : shutdownNow) {
urlsToCrawl.add(((CrawlTask) task).getPage());
}
}
private void submitCrawlTask(URL url) {
executor.execute(new CrawlTask(url));
}
protected abstract List<URL> processPage(URL url);
private class CrawlTask implements Runnable {
private final URL url;
private CrawlTask(URL url) {
this.url = url;
}
@Override
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted()) return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
6.3处理反常的线程终止
在并发程序中线程的失败往往没那么明显。栈追踪可能会从控制台输出,但是没有人会去观察控制台,并且,**当线程失败的时候,应用程序可能看起来仍在工作**。
导致线程死亡的最主要原因是RuntimeException。它们通常是不能被捕获的,也不会顺着栈的调用传递,此时,默认的行为是在控制台打印栈追踪的信息,并终止线程。
**任何代码都可以抛出RuntimeException,无论何时,当你调用另一个方法的时候,你都要对它的行为保持怀疑**。
//典型线程池的工作者线程的构建
//如果任务抛出了一个未检查的异常,它将允许线程终结,但是首先会通知框架:线程已经终结
public void run(){
Throwable thrown = null;
try{
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(THrowable e){
thrown = e;
}finally{
threadExited(this, thrown);
}
}
6.3.1未捕获异常的处理
线程的API同样提供了**UncaughtExceptionHandler的工具**,能够**监控到线程因未捕获的异常引起的“死亡”**。与我们主动解决未检查异常问题的方案**互为补充**。
当一个**线程因为未捕获异常而退出时**,**JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler**,**如果处理器不存在**,默认的行为是**向System.err打印出栈追踪信息**。(可以通过Thread.setUncaughtExceptionHandler为每一个线程设置,也可以使用setDefaultUncaughtExceptionHandler设置默认的)
源码:
//UncaughtExceptionHandler接口
public interface UncaughtExceptionHandler{
void uncaughtException(Thread t, THrowable e);
}
//UncaughtExceptionHandler将异常写入日志
public class UEHLogger implements Thread.UncaughtExceptionHandler{
public void uncaughtException(Thread t, Throwable e){
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, t.getName(), e);
}
}
在一个长时间允许的应用程序中,所有的线程都要给未捕获异常设置一个处理器,这个处理器至少要将异常信息记入日志中。
只有通过execute提交的任务,才能将它抛出的异常送交给UncaughtExceptionHandler,而通过submit提交的任务,抛出的任何异常,无论是否受检查的,都被认为是任务返回状态的一部分。
如果一个以submit提交的任务以异常作为终结,这个异常会被Future.get重抛出,包装在ExecutionException中。
6.4JVM关闭
JVM既可以通过正常手段关闭,也可以强行关闭。
正常手段:
- 当最后一个正常线程(非精灵线程)终结时
- 调用了System.exit
- 通过使用其他平台相关手段(发送SIGING、或者键入Ctrl+C)
强行关闭:
- 调用Runtime.hait
- “杀死”JVM的操作系统进程被强行关闭(比如发送SIGKILL)
6.4.1关闭钩子
在正常的关闭中,JVM会首先启动所有已注册的Shutdown hook。
Shutdown hook :关闭钩子,是使用Runtime.addShutdownHook注册的尚未开始的线程。
JVM不会保证关闭钩子的开始顺序。
如果关闭应用程序线程时,它仍然在运行,它们接下来将会和关闭进程并发执行。
当所有关闭钩子结束时,如果runFinalizersOnExit 为true,JVM可以选择运行finalizer,之后停止。
JVM不会尝试停止或中断任何关闭时仍然在运行中的应用程序线程:它们在JVM最终终止时被强制退出。
如果关闭钩子或finalizer没有完成,那么正常的关闭进程“挂起”并且JVM必须强行关闭。
强行关闭:JVM不需要完成除了关闭JVM以外的任何事情:不会运行关闭钩子。
关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步,避免死锁。
关闭钩子的存在会延迟JVM的终止。
关闭钩子全部是并发执行的,所以会出现一个问题:关闭日志文件可能引起其他需要使用日志服务的关闭钩子的麻烦。
解决方案:对所有服务使用唯一关闭钩子,让它调用一系列关闭行为,而不是每个服务使用一个,避免了竞争条件的出现。
关闭钩子应用场景:
可以用于服务或应用程序的清理,比如删除临时文件,或者清除OS不能自动清除的资源。
示例:
//注册关闭钩子来停止日志服务
public void start(){
Runtime.getRuntime().addShutdownHook(new Thread){
public void run(){
try{LogService.this.stop();}
catch(InterruptedException e){}
}
}
}
6.4.2精灵线程
线程分为两种:普通线程、精灵线程
精灵线程:执行一些辅助工作,又不会阻碍JVM的关闭。(不会阻碍JVM关闭的线程)
JVM启动时创建的所有线程,除了主线程以外,其他的都是精灵线程(垃圾回收器和其他类似线程)。
当一个新的线程创建时,新线程继承了创建它的线程的后台状态。(也就是说主线程创建的都是普通线程)
普通线程和精灵线程的区别:
仅仅在退出时会有区别。当一个线程退出时,JVM 会检查一个运行中线程的详细清单,如果仅剩下精灵线程,它会发起正常的退出,当JVM停止时,所有仍然存在的精灵线程都会被抛弃(不会执行finally快、也不会释放栈),JVM直接退出。
6.4.3Finalizer
垃圾回收器回对哪些具有特殊finalize方法的对线进行特殊对待:在回收器获得它们后,finalize被调用,这样就能保证持久化资源可以被释放。
但是。finalizer在运行时不提供任何线程安全的保证,并且复杂的finalizer会带来对象巨大的性能开销。
所以,大多数情况下,使用finally块和显示close方法的结合来管理资源。
避免使用finalizer
总结
Java没有提供具有明显优势的机制来取消活动或者终结线程。
它提供了协作的中断机制,能够用来帮助取消,但是这取决于如何构建取消的协议,并是否能一致地使用该协议。
使用FutureTask和Executor框架可以简化构建可取消的任务和服务。
