取消和关闭

要做到安全、快速、可靠停止任务或线程并不容易。

Java没有提供任何机制来安全地强迫线程停止手头的工作。

java提供了:

中断:一个协作机制,是一个线程能够要求另一个线程停止当前的工作。

  1. 一个线程给另一个线程发送信号,通知它在方便或者可能的情况下停止工作。

要求它们停止时,它们首先会清除当前进程中的任务,然后再终止。(因为任务代码本身比发出取消请求的代码更明确应该清除什么)

6.1任务取消

外部代码能够在活动自然完成之前把它更改为完成状态,那么这个活动被称为可取消的。

一个可取消的任务必须拥有取消策略,这个策略详细说明关于取消的“how”、“when”、“what”:

  • how:其他代码如何请求取消该任务
  • when:任务在什么时候检查取消的请求是否到达
  • what:响应取消请求的任务中应有的行为

示例

@ThreadSafe
public class PrimeGenerator implements Runnable {

    @GuardedBy(this)
    private final List<BigInteger> primes = new ArrayList<BigInteger>();

    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        //when:考虑在什么时候检查取消请求
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    //how:其他请求该如何取消任务
    //what:取消任务中的行为
    public void cancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }
}

取消应用场景

  • 用户请求的取消:例如程序界面上的“关闭”按钮,或者通过管理接口请求取消。
  • 限时活动:一个应用程序需要在有限时间内执行任务,超时就取消
  • 应用程序事件:一个应用程序对问题空间进行分解搜索,当一个任务发现了解决方案,其他仍在工作的搜索就会被取消。
  • 错误:当任务遭遇错误,那么所有的任务都会被取消,不过可以记录它们当前的状态,稍后重新启动。
  • 关闭:当一个程序或者服务关闭时,必须对正在处理和等待处理的工作进行一些操作。(优雅的关闭)

6.1.1中断

在PrimeGenerator中,取消机制不是立刻发生的,如果我们在其中使用了一个阻塞方法,比如BlockingQueue.put,那么就可能永远不会检查到取消标志

示例

//不要这样做,当队列阻塞时,是检测不到取消标记的
public class PrimeGenerator implements Runnable {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    public PrimeGenerator(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled) {
                //这是可阻塞的
                queue.put(p = p.nextProbablePrime());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void cancel() {
        cancelled = true;
    }
}

每一个线程都有一个boolean类型的中断状态。在中断时:这个中断状态被设置为true。

Thread包含其他用于中断线程的方法,以及请求线程中断状态的方法:

interrupt:中断目标线程

isInterrupted:返回目标线程的中断状态

静态的interrupted:清除当前线程中断状态,并返回它之前的值。

调用interrupt并不意味着必然停止目标线程正在进行的工作,它仅仅传递了请求中断的消息。

示例

public class PrimeGenerator extends Thread {

    private final BlockingQueue<BigInteger> queue;

    private volatile boolean cancelled;

    public PrimeGenerator(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run(){
        BigInteger p = BigInteger.ONE;
        try {
            while (!Thread.currentThread().isInterrupted()){
                queue.put(p = p.nextProbablePrime());
            }
        }catch (InterruptedException e){

        }
    }

    public void cancel(){interrupt();}
}

6.1.2中断策略

正如取消需要制订取消策略一样,中断也应该制定中断策略。

中断策略:

  • 当发现中断请求时,应该做什么
  • 哪些工作单元对于中断来说是原子操作
  • 在多快的时间里响应中断。

区分任务和线程对中断的反应是很重要的。

一个单一的中断请求可能有一个或一个以上预期的接收者——在线程池中中断一个工作者线程,意味着取消当前任务,并关闭工作线程。

任务不会再自己拥有的线程中执行:它们借用属于服务的线程。代码如果并不是线程的所有者,就应该保存中断状态,这样所有者的代码才能够最终对其起到作用。

例如:大多数可阻塞的库函数,都是仅仅抛出InterruptedException作为中断响应。因为它们不可能自己运行再一个线程中,它们会尽可能的为异常信息让路,把它传递给调用者。

检查到中断请求时,任务不需要立即放弃所有事情——它可以选择推迟。但是需要记得它已经被请求过中断了,完成当前正在进行的事情,然后抛出InterruptedException或者指明中断

线程应该只能够被线程的所有者中断,所有者可以把线程的中断策略信息封装到一个合适的取消机制中。
因为每一个线程都有其自己的中断策略,所以你不应该中断线程,除非你知道中断对这个线程意味着什么。

6.1.3响应中断

当你调用可中断的阻塞函数时,比如 Thread.sleep 或者BlockingQueue.put,有两种处理InterruptedException的实用策略:

  • 传递异常,使你的方法也成为可中断的阻塞方法 (throws InterruptedException)
  • 或者保存中断状态,上层调用栈中的代码能够对其进行处理。

如果你不想或不能传递InterruptedException,需要寻找另一种方式保存中断请求,通常是再次调用interrupt来恢复中断状态

只有实现了线程中断策略的代码才可以接收中断请求,通用目的的任务和库的代码绝不应该接收中断请求.
//有一些活动不支持取消,却仍可能调用中断的阻塞方法,那么它们必须在循环中调用这些方法,当发现中断后重新尝试
//在本地保存中断状态,并在返回前恢复状态
public Task getNextTask(BlockingQueue<Task> queue){
    boolean interrupted = false;
    try{
        while(true){
            try{
                return queue.take();
            }catch(InterruptedException e){
                interrupted = true;
                //失败并重试
            }
        }
    }finally{
        if(interrupted){
            Thread.currentThread().interrupt();
        }
    }
}

6.1.4通过Future取消

ExecutorService.Submit 会返回一个Future来描述任务。Future有一个**cancel方法**,它**需要一个boolean类型的参数**(mayInterruptIfRunning),它的**返回值表示取消尝试是否成功**(这仅仅是告诉你它是否**能够接收中断**,而不是任务是否检测并处理了中断)。

除非你知道线程的中断策略,否则你不应该中断线程。

任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,所以当他们在标准Executor中运行时,通过它们的Future来取消任务是安全的。
public static void timeRun(Runnable r, long timeout, TimeUnit unit,)
throws InterruptedException{
    Future<?> task = taskExec.submit(r);
    try{
        task.get(timeout, unit);
    }catch(TimeoutException e){
        //下面任务会被取消
    }catch(ExecutionException e){
        //task中抛出的异常:重抛出
    }finally{
        //如果任务已经结束,是无害的
        task.cancel(true);
    }
}
 当Future.get抛出 InterruptedException 或TimeoutException 时,如果你知道不再需要结果时,就可以调用Future.cancel来取消任务

6.1.5处理不可中断阻塞

很多可阻塞的库方法通过提前返回和抛出InterruptedException来实现对中断的响应,这使得构建可以响应取消的任务更加容易了。

但是,并不是所有的阻塞方法或阻塞机制都响应中断:如果一个线程是由于进行同步Socket I/O 或者等待获得内部锁而阻塞的,那么中断除了能够设置线程的中断状态以外,什么都不能改变。
  • java.io中的同步Socker I/O。InputStream 和 OutputStream 中的read和write 方法都不响应中断,但是通过关闭底层的Socket,可以让read或write所阻塞的线程抛出一个SocketException
  • java.nio中的同步I/O。中断一个等待InterrputibleChannel的线程,会导致抛出ClosedByInterruptException,并关闭链路(其他线程在这条链路的阻塞,也会抛出该异常)。
  • Selector的异步I/O。如果一个线程阻塞于Selector.select方法close方法会导致它通过抛出ClosedSelectorException提前返回
  • 获得锁。如果一个线程在等待内部锁,那么如果不能确保它最终获得锁,是不能停止它的。然而,显示Lock类提供了lockInterruptibly方法,允许你等待一个锁,并仍然能够响应中断

示例:

public class ReaderThread extends Thread{
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socker socket) throws IOException{
        this.socket = socket;
        this.in = socke.getInputStream();
    }

    public void interrupt(){
        try{
            socket.close();
        }catch(IOExcpetion e){

        }finally{
            super.interrupt();
        }
    }

    public void run(){
        try{
            byte[] buf = new byte[BUFSZ];
            while(true){
                int count = in.read(buf);
                if(count < 0)break;
                else if(count > 0) processBuffer(buf, count);
            }
        }catch(IOException e){
            //允许线程退出
        }
    }
}

6.1.6用newTaskFor封装非标准取消

在ReaderThread中,可以使用newTaskFor钩子函数来改进用来封装非标准取消的方法。

newTaskFor钩子是一个工厂方法,创建Future来代表任务,它返回一个RunnableFuture,这是一个接口,它扩展了Future和Runnable(并由FutureTask实现)

示例:

public interface CancellableTask<T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}


public abstract class SocketUsingTask<T> implements CancellableTask<T>{

    @GuardedBy("this")private Socket socket;

    protected synchronized void setSocket(Socket e){socket = e;}

    @Override
    public synchronized void cancel() {
        try {
            if (socket!=null)socket.close();
        }catch (IOException e){}
    }

    //重写future的cancel方法,非标准取消,先取消socket连接,再中断线程
    @Override
    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this){
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                }finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}



public class CancellingExecutor extends ThreadPoolExecutor {
    //.......

    //如果传入的Callable参数是我们重写的类的子类,则使用我们自定义的newTask();
    //否则,使用父类的newTaskFor();
    @Override
    protected<T>RunnableFuture<T> newTaskFor(Callable<T> callable){
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}

源码:

//ThreadPoolExecutor 继承于此类
public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }


    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    //.......
}

6.2停止基于线程的服务

封装:你不应该操控某个线程——中断、改变优先级等等。除非你拥有这个线程。

拥有者:创建线程的类。

线程池拥有它的工作者线程,如果需要中断这些线程,那么应该由线程池来负责。

**应用程序拥有服务,服务拥有工作者线程,但是应用程序并不拥有工作者线程**。

所以,服务应该提供生命周期方法来关闭它自己,并关闭它所拥有的线程。

例如ExecutorService提供了shutdown和shutdownNow方法,其他线程持有的服务也应该提供类似的关闭机制。
对于线程持有的服务,只要服务的存在时间大于创建线程的方法存在的时间,那么就应该提供生命周期方法。

6.2.1关闭ExecutorService

ExecutorService提供了关闭的两种方法:使用shutdown、shutdownNow。
  • shutdown:优雅的关闭,直到队列中的所有任务完成前,ExecutorService都不会关闭。
  • shutdownNow:强制关闭,强制终结,速度会更快,但是风险大,因为任务很可能再执行到一半的时候被终结。

6.2.2致命药丸

另一种保证生产者和消费者服务关闭的方式是:**致命药丸**:**一个可识别的对象,置于队列中,意味着“当你得到它时,停止一切工作”**。

在**先进先出队列**中,**致命药丸保证了消费者完成队列关闭之前的所有工作**,因为早于致命药丸提交的工作都会在处理它之前就完成了。

示例:

public class IndexingService {
    private static final File POISON = new File("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File> queue;
    private final FileFilter fileFilter;
    private final File root;


    //生产者线程
    class CrawlerThread extends Thread {
        @Override
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
            } finally {
                while (true) {
                    try {
                        queue.put(POISON);
                        break;
                    } catch (InterruptedException e) {

                    }
                }
            }
        }

        private void crawl(File root) throws InterruptedException {
        }
    }

    //消费者线程
    class IndexerThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    //如果取出的对象为致命药丸对象,则停止
                    File file = queue.take();
                    if (file == POISON) break;
                    else indexFile(file);
                }
            } catch (InterruptedException e) {
            }
        }

        private void indexFile(File file) {
        }
    }

    public void stop() {
        producer.interrupt();
    }

    public void auaitTermination() throws InterruptedException {
        consumer.join();
    }
}

这个方法同样也可以扩展为多个消费者使用,只要让生产者向队列置入N(消费者数量)个药丸。不过这在生产者和消费者的数量较大时难以处理

致命药丸只有在无限队列中工作时,才是可靠的

6.2.3示例:只执行一次的服务

如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么它可以通过使用私有的Executor来简化服务的生命周期管理。(在这种情况下,通常会用到invokeAll和invokeAny方法)。
//检查是否有新的邮件,当所有检查邮件的任务完成后,他会关闭Executor,并等待结束。
boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final AtomicBoolean =new AtomicBoolean(false);
        try {
            for (final String host : hosts) {
                executorService.execute(() -> {
                    if (checkMail(host)) hasNewMail.set(true);
                });
            }
        } finally {
            executorService.shutdown();
            executorService.awaitTermination(timeout, unit);
        }
        return hasNewMail.get();
    }

6.2.4shutdownNow的局限性

当通过**shutdownNow强行关闭一个ExecutorService时**,它**试图取消正在进行的任务**,并**返回那些已经提交、但并没有开始的任务清单**,这样,这些任务可以被日志记录,或者存起来等待进一步处理。

但是,我们并**没有任何常规的方法**,用于**找出那些已经开始,却没有结束的任务**。

//通过封装ExecutorService并使用execute,来记录哪些任务是在关闭后取消的。

//TrackingExecutor存在竞争条件,可能会产生假阳性现象识别出的被取消任务事实上可能已经结束,产生的原因是在任务执行的最后一条指令,以及线程池记录任务结束之间线程池发生了关闭(任务也就结束了)

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService executorService;
    private final Set<Runnable> tasksCancelledAtshutdown = Collections.synchronizedSet(new HashSet<Runnable>());

    //.......
    public List<Runnable> getCancelledTasks() {
        if (!executorService.isTerminated())
            throw new IllegalStateException("......");
        return new ArrayList<Runnable>(tasksCancelledAtshutdown);
    }

    @Override
    public void execute(Runnable runnable) {
        executorService.execute(() -> {
            try {
                runnable.run();
            } finally {
                if (isShutdown() && Thread.currentThread().isInterrupted()) {
                    tasksCancelledAtshutdown.add(runnable);
                }
            }
        });
    }


    //其他要实现的方法委托给executorService
}

使用TrackingExecutor

public abstract class WebCrawler {
    private volatile TrackingExecutor executor;
    @GuardedBy("this")
    private final Set<URL> urlsToCrawl = new HashSet<URL>();

    //......
    public synchronized void start() {
        executor = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() {
        try {
            saveUncrawled(executor.shutdownNow());
            if (executor.awaitTermination(TIMEOUT, UNIT)) {
                saveUncrawled(executor.getCancelledTasks());
            }
        } finally {
            executor = null;
        }
    }

    private void saveUncrawled(List<Runnable> shutdownNow) {
        for (Runnable task : shutdownNow) {
            urlsToCrawl.add(((CrawlTask) task).getPage());
        }
    }

    private void submitCrawlTask(URL url) {
        executor.execute(new CrawlTask(url));
    }

    protected abstract List<URL> processPage(URL url);

    private class CrawlTask implements Runnable {
        private final URL url;

        private CrawlTask(URL url) {
            this.url = url;
        }

        @Override
        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted()) return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}

6.3处理反常的线程终止

在并发程序中线程的失败往往没那么明显。栈追踪可能会从控制台输出,但是没有人会去观察控制台,并且,**当线程失败的时候,应用程序可能看起来仍在工作**。

导致线程死亡的最主要原因是RuntimeException。它们通常是不能被捕获的,也不会顺着栈的调用传递,此时,默认的行为是在控制台打印栈追踪的信息,并终止线程。

**任何代码都可以抛出RuntimeException,无论何时,当你调用另一个方法的时候,你都要对它的行为保持怀疑**。
//典型线程池的工作者线程的构建
//如果任务抛出了一个未检查的异常,它将允许线程终结,但是首先会通知框架:线程已经终结
public void run(){
    Throwable thrown = null;
    try{
        while(!isInterrupted())
            runTask(getTaskFromWorkQueue());
    }catch(THrowable e){
        thrown = e;
    }finally{
        threadExited(this, thrown);
    }
}

6.3.1未捕获异常的处理

线程的API同样提供了**UncaughtExceptionHandler的工具**,能够**监控到线程因未捕获的异常引起的“死亡”**。与我们主动解决未检查异常问题的方案**互为补充**。

当一个**线程因为未捕获异常而退出时**,**JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler**,**如果处理器不存在**,默认的行为是**向System.err打印出栈追踪信息**。(可以通过Thread.setUncaughtExceptionHandler为每一个线程设置,也可以使用setDefaultUncaughtExceptionHandler设置默认的)

源码:

//UncaughtExceptionHandler接口
public interface UncaughtExceptionHandler{
    void uncaughtException(Thread t, THrowable e);
}
//UncaughtExceptionHandler将异常写入日志
public class UEHLogger implements Thread.UncaughtExceptionHandler{
    public void uncaughtException(Thread t, Throwable e){
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, t.getName(), e);
    }
}
在一个长时间允许的应用程序中,所有的线程都要给未捕获异常设置一个处理器,这个处理器至少要将异常信息记入日志中。

只有通过execute提交的任务,才能将它抛出的异常送交给UncaughtExceptionHandler,而通过submit提交的任务,抛出的任何异常,无论是否受检查的,都被认为是任务返回状态的一部分。

如果一个以submit提交的任务以异常作为终结,这个异常会被Future.get重抛出,包装在ExecutionException中

6.4JVM关闭

JVM既可以通过正常手段关闭,也可以强行关闭。

正常手段:

  • 当最后一个正常线程(非精灵线程)终结时
  • 调用了System.exit
  • 通过使用其他平台相关手段(发送SIGING、或者键入Ctrl+C)

强行关闭:

  • 调用Runtime.hait
  • “杀死”JVM的操作系统进程被强行关闭(比如发送SIGKILL)

6.4.1关闭钩子

正常的关闭中JVM会首先启动所有已注册的Shutdown hook

Shutdown hook关闭钩子,是使用Runtime.addShutdownHook注册的尚未开始的线程

JVM不会保证关闭钩子的开始顺序

如果关闭应用程序线程时,它仍然在运行,它们接下来将会和关闭进程并发执行

所有关闭钩子结束时,如果runFinalizersOnExit 为trueJVM可以选择运行finalizer,之后停止。

JVM不会尝试停止或中断任何关闭时仍然在运行中的应用程序线程:它们在JVM最终终止时被强制退出

如果关闭钩子或finalizer没有完成,那么正常的关闭进程“挂起”并且JVM必须强行关闭

强行关闭:JVM不需要完成除了关闭JVM以外的任何事情:不会运行关闭钩子。

关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步,避免死锁。

关闭钩子的存在会延迟JVM的终止。

关闭钩子全部是并发执行的,所以会出现一个问题:关闭日志文件可能引起其他需要使用日志服务的关闭钩子的麻烦。

解决方案:对所有服务使用唯一关闭钩子,让它调用一系列关闭行为,而不是每个服务使用一个,避免了竞争条件的出现。

关闭钩子应用场景:

可以用于服务或应用程序的清理,比如删除临时文件,或者清除OS不能自动清除的资源。

示例:

//注册关闭钩子来停止日志服务
public void start(){
    Runtime.getRuntime().addShutdownHook(new Thread){
        public void run(){
            try{LogService.this.stop();}
            catch(InterruptedException e){}
        }
    }
}

6.4.2精灵线程

线程分为两种:普通线程、精灵线程

精灵线程:执行一些辅助工作,又不会阻碍JVM的关闭。(不会阻碍JVM关闭的线程)

JVM启动时创建的所有线程除了主线程以外其他的都是精灵线程(垃圾回收器和其他类似线程)

当一个新的线程创建时,新线程继承了创建它的线程的后台状态。(也就是说主线程创建的都是普通线程)

普通线程和精灵线程的区别:

仅仅在退出时会有区别。当一个线程退出时,JVM 会检查一个运行中线程的详细清单,如果仅剩下精灵线程,它会发起正常的退出,当JVM停止时所有仍然存在的精灵线程都会被抛弃(不会执行finally快、也不会释放栈),JVM直接退出

6.4.3Finalizer

垃圾回收器回对哪些具有特殊finalize方法的对线进行特殊对待:在回收器获得它们后,finalize被调用,这样就能保证持久化资源可以被释放。

但是。finalizer在运行时不提供任何线程安全的保证,并且复杂的finalizer会带来对象巨大的性能开销。

所以,大多数情况下,使用finally块和显示close方法的结合来管理资源。
避免使用finalizer

总结

Java没有提供具有明显优势的机制来取消活动或者终结线程。

它提供了协作的中断机制,能够用来帮助取消,但是这取决于如何构建取消的协议,并是否能一致地使用该协议。

使用FutureTask和Executor框架可以简化构建可取消的任务和服务。