线程与进程:一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有其自己得CPU一样。其底层机制是切分CPU时间

线程模型:简化了在单一程序中同时交织在一起的多个操作的处理。在使用线程时,CPU将轮流给每个任务分配其占用时间。每个任务都觉得自己一直占用CPU,但事实上CPU时间是划分成片段分配给了所有的任务。

线程驱动的任务由Runnable接口来提供。Runnable只是定义了线程的任务而已,并不是实现线程的方式。run方法也并无特殊之处,它不会产生任何内在的线程能力。

ex:定义任务

  1. public class LiftOff implements Runnable{
  2. protected int countDown = 10;
  3. private static int taskCount = 0;
  4. private final int id = taskCount++;
  5. public LiftOff(){}
  6. public LiftOff(int countDown){
  7. this.countDown = countDown;
  8. }
  9. public String status() {
  10. return "#" + id + "(" +
  11. (countDown > 0 ? countDown : "Liftoff!") + "), ";
  12. }
  13. @Override
  14. public void run() {
  15. System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
  16. while (countDown-- > 0){
  17. System.out.print(status());
  18. Thread.yield();
  19. }
  20. }
  21. }

shutdownNow与showdown的区别
shutdownNow:会引发中断异常,但此时任务依旧在运行;可以在中断异常程序处理中加上你特定的处理 ( 比如,发生中断异常时候,return 退出程序)
showdown:任务完成之后才会关闭

对使用线程池来说:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.TimeUnit;
  4. public class LiftOff implements Runnable{
  5. protected int countDown = 10;
  6. private static int taskCount = 0;
  7. private final int id = taskCount++;
  8. public LiftOff(){}
  9. public LiftOff(int countDown){
  10. this.countDown = countDown;
  11. }
  12. public String status() {
  13. return "#" + id + "(" +
  14. (countDown > 0 ? countDown : "Liftoff!") + "), ";
  15. }
  16. @Override
  17. public void run() {
  18. System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
  19. while (countDown-- > 0){
  20. System.out.print(status());
  21. Thread.yield();
  22. try {
  23. TimeUnit.SECONDS.sleep(1);
  24. }catch (InterruptedException e){
  25. System.out.println("线程被外部打断了"+e.getMessage());
  26. return;
  27. }
  28. }
  29. }
  30. @Override
  31. public String toString() {
  32. return "LiftOff{" +
  33. "id=" + id +
  34. '}';
  35. }
  36. public static void main(String[] args) {
  37. ExecutorService executorService = Executors.newCachedThreadPool();
  38. executorService.execute(new LiftOff());
  39. executorService.shutdownNow();
  40. }
  41. }

多线程下访问一个资源可能会出现的问题:countDown资源在其他线程处理的时候可能被访问到。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.TimeUnit;
  4. public class LiftOff implements Runnable{
  5. protected int countDown = 10;
  6. private static int taskCount = 0;
  7. private final int id = taskCount++;
  8. public LiftOff(){}
  9. public LiftOff(int countDown){
  10. this.countDown = countDown;
  11. }
  12. public String status() {
  13. return "#" + id + "(" +
  14. (countDown > 0 ? countDown : "Liftoff!") + "), ";
  15. }
  16. @Override
  17. public void run() {
  18. System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
  19. while (countDown-- > 0){
  20. System.out.print(status());
  21. Thread.yield();
  22. try {
  23. TimeUnit.SECONDS.sleep(1);
  24. }catch (InterruptedException e){
  25. System.out.println("线程被外部打断了"+e.getMessage());
  26. return;
  27. }
  28. }
  29. }
  30. @Override
  31. public String toString() {
  32. return "LiftOff{" +
  33. "id=" + id +
  34. '}';
  35. }
  36. public static void main(String[] args) {
  37. LiftOff liftOff = new LiftOff();
  38. ExecutorService executorService = Executors.newCachedThreadPool();
  39. for (int i = 0; i < 5; i++) {
  40. executorService.execute(liftOff);
  41. }
  42. executorService.shutdown();
  43. }
  44. }
  45. //~ output
  46. #0(7), #0(8), #0(9), #0(5), #0(6), #0(3), #0(2), #0(3), #0(Liftoff!), #0(1),

由以下示例扩展对数据表跑批的任务
image.png

  1. import java.util.Random;
  2. import java.util.concurrent.TimeUnit;
  3. public class LiftOffExt1 implements Runnable{
  4. Random random = new Random(47);
  5. private String name;
  6. public LiftOffExt1(){}
  7. public LiftOffExt1(String name){
  8. this.name = name;
  9. }
  10. @Override
  11. public void run() {
  12. System.out.println("对"+name+"表开始进行跑批任务");
  13. try {
  14. TimeUnit.SECONDS.sleep(random.nextInt(10));
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. System.out.println(name+"表跑批任务结束");
  19. }
  20. //待优化 ->参考设计模式进行优化
  21. public static void main(String[] args) {
  22. for (int i = 1; i <= 5; i++) {
  23. switch (i){
  24. case 1:
  25. new Thread(new LiftOffExt1("user")).start();
  26. break;
  27. case 2:
  28. new Thread(new LiftOffExt1("book")).start();
  29. break;
  30. case 3:
  31. new Thread(new LiftOffExt1("cpu")).start();
  32. break;
  33. case 4:
  34. new Thread(new LiftOffExt1("note")).start();
  35. break;
  36. case 5:
  37. new Thread(new LiftOffExt1("mobile")).start();
  38. break;
  39. }
  40. }
  41. }
  42. }

Executor管理线程:允许管理异步任务的执行,而无需显示的管理线程的生命周期

留意各个方法的对应关系,

//从ThreadPoolExecutor构造器参数中分析各个执行器静态方法的区别
Executors.newCachedThreadPool();
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
 }



 Executors.newFixedThreadPool(5);
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

也可以通过ThreadPoolExecutor来手动构建自己的线程池

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

ThreadPoolExecutor属性说明

1、corePoolSize:核心线程数
        * 核心线程会一直存活,及时没有任务需要执行
        * 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
        * 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

    2、queueCapacity:任务队列容量(阻塞队列)
        * 当核心线程数达到最大时,新任务会放在队列中排队等待执行

    3、maxPoolSize:最大线程数
        * 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        * 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

    4、 keepAliveTime:线程空闲时间
        * 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        * 如果allowCoreThreadTimeout=true,则会直到线程数量=0

    5、allowCoreThreadTimeout:允许核心线程超时
    6、rejectedExecutionHandler:任务拒绝处理器
        * 两种情况会拒绝处理任务:
            - 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
            - 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
        * 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
        * ThreadPoolExecutor类有几个内部实现类来处理这类情况:
            - AbortPolicy 丢弃任务,抛运行时异常
            - CallerRunsPolicy 执行任务
            - DiscardPolicy 忽视,什么都不会发生
            - DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
        * 实现RejectedExecutionHandler接口,可自定义处理器

ThreadPoolExecutor执行顺序:

线程池按以下行为执行任务
    1. 当线程数小于核心线程数时,创建线程。
    2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
    3. 当线程数大于等于核心线程数,且任务队列已满
        -1 若线程数小于最大线程数,创建线程
        -2 若线程数等于最大线程数,抛出异常,拒绝任务

ThreadPoolExecutor属性一般设置:

通常核心线程数可以设为CPU数量+1,而最大线程数可以设为CPU的数量*2+1。
获取CPU数量的方法为:
Runtime.getRuntime().availableProcessors();

Executor管理线程:允许管理异步任务的执行,而无需显示的管理线程的生命周期

留意各个方法的对应关系

//从ThreadPoolExecutor构造器参数中分析各个执行器静态方法的区别
Executors.newCachedThreadPool();
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
 }



 Executors.newFixedThreadPool(5);
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

也可以通过ThreadPoolExecutor来手动构建自己的线程池

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

从任务中产生返回值


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

class TaskResult implements Callable<String> {
    Random random = new Random(47);
    private  int id;
    public  TaskResult(int id){
        this.id = id;
    }

    @Override
    public String call() throws Exception {
        try {
            //模拟操作的耗时
            TimeUnit.SECONDS.sleep(random.nextInt(50));
        }catch (InterruptedException e){
            System.out.println("任务被中断了");
        }
        return "TaskResult "+id;
    }
}

public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            futures.add( executorService.submit(new TaskResult(i)));//不会阻塞主线程
        }

        for (Future<String> future : futures) {
            if(future.isDone()) {//isDone是非阻塞调用
                System.out.println(future.get());//阻塞式的调用
            }
        }
        executorService.shutdownNow();//全部执行完成后才会shutdown,但并不阻塞当前线程的执行
        System.out.println("关闭任务后代码继续执行");
    }
}

模拟一个学校收取费用的示例

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

class TaskResult2 implements Callable<String> {
    Random random = new Random();
    private  int collegeNo;
    public  TaskResult2(int collegeNo){
        this.collegeNo = collegeNo;
    }

    /**
     * 前提 student表百万的数据
     * group上有个索引
     * select sum(money) from student where college = "一"
     * select sum(money) from student;  //直接查询
     * select sum(money) from student where college = "二"
     * select sum(money) from student where college = "三"
     * select sum(money) from student where college = "四"
     *
     *  ----------------------------------
     *  /  id   name     college       money
     *  /  1    AA        一           8
     *  /  2    BB        二           9
     *  /  3    CC        三           7
     *  /  4    DD        四           6
     *  .....................
     *
     * @return
     * @throws Exception
     */
    @Override
    public String call() throws Exception {
        //select sum(money) from student where college = collegeNo;
        final int randomMoney = random.nextInt(1000);
        TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
        System.out.println(randomMoney);
        return collegeNo+ "==" +randomMoney;
    }
}

public class CallableDemo2App {
    public static void main(String[] args) {

        Integer totalMoney = 0;

        List<Future<String>> futures = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for(int i = 0;i < 5;i++) {
            Future<String> res = executorService.submit(new TaskResult2(i));
            futures.add(res);
        }

        for (Future<String> future : futures) {
            try {
                final String s = future.get();
                String money = s.split("==")[1];
                totalMoney += Integer.valueOf(money);
            }catch (InterruptedException | ExecutionException e){
                System.out.println("获取结果被中断了"+e.getMessage());
            }
        }
        System.out.println("一共获取的钱数为: "+totalMoney);
        executorService.shutdown();

    }
}

无法捕获从线程中逃逸的异常

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExceptionThread implements Runnable{
    @Override
    public void run() {
        throw new RuntimeException("线程异常");
    }

    public static void testExec(){
        //new Thread(new ExceptionThread()).start();
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            executorService.execute(new ExceptionThread());
        }catch (RuntimeException e){
            System.out.println(e.getMessage());
        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
       testExec();
    }
}


package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;

public class NaiveExceptionHandling {
    public static void main(String[] args) {
        try {
            ExecutorService exec =  Executors.newCachedThreadPool();
            exec.execute(new ExceptionThread());
        } catch (RuntimeException ue) {
            // This statement will NOT execute!
            System.out.println("Exception has been handled!");
        }
    }
}

image.png

注意:无法获取异常栈的信息,只能处理异常

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class ExceptionThread2 implements Runnable{
    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("    run() by "+t);
        System.out.println("    eh = "+t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("    异常信息被caught "+e);
        //e.printStackTrace();
    }
}

class HandlerThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        System.out.println(this + " Creating new Thread");
        Thread t = new Thread(r);
        System.out.println("    created thread "+t);
        t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
        System.out.println("    eh= "+t.getUncaughtExceptionHandler());
        return t;
    }
}


public class CaptureUncaughtException {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
        service.execute(new ExceptionThread2());
        service.shutdown();
    }
}