创建线程的方式

方法一:继承 Thread 创建线程

  1. public class CreateThread {
  2. public static void main(String[] args) {
  3. Thread myThread = new MyThread();
  4. // 启动线程
  5. myThread.start();
  6. }
  7. }
  8. class MyThread extends Thread {
  9. @Override
  10. public void run() {
  11. System.out.println("my thread running");
  12. }
  13. }

方法二:使用 Runnable 配合 Thread

推荐使用

  1. public class CreateThread2 {
  2. private static class MyRunnable implements Runnable {
  3. @Override
  4. public void run() {
  5. System.out.println("my runnable running");
  6. }
  7. }
  8. public static void main(String[] args) {
  9. MyRunnable myRunnable = new MyRunnable();
  10. Thread thread = new Thread(myRunnable, "my thread name");
  11. thread.start();
  12. }
  13. }

方法三:使用 FutureTask 配合 Thread

使用 FutureTask 可以用泛型指定线程的返回值类型(Runnable 的 run() 没有返回值)

  1. public class UseFutureTask {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<String> futureTask = new FutureTask<>(new MyCall());
  4. Thread thread = new Thread(futureTask);
  5. thread.start();
  6. // 获得线程运行后的返回值
  7. System.out.println(futureTask.get());
  8. }
  9. }
  10. class MyCall implements Callable<String> {
  11. @Override
  12. public String call() throws Exception {
  13. return "hello world";
  14. }
  15. }

源码

Thread 的源码

理解 Runnable 的原理

  1. public class Thread implements Runnable {
  2. /* What will be run. */
  3. private Runnable target;
  4. @Override
  5. public void run() {
  6. if (target != null) {
  7. target.run();
  8. }
  9. }
  10. }

FutureTask 的源码

  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. /** The underlying callable; nulled out after running */
  3. /** 底层可调用对象:运行后无效 */
  4. private Callable<V> callable;
  5. public FutureTask(Callable<V> callable) {
  6. if (callable == null)
  7. throw new NullPointerException();
  8. this.callable = callable;
  9. this.state = NEW; // ensure visibility of callable
  10. // 确保可调用对象的可见性
  11. }
  12. public void run() {
  13. if (state != NEW ||
  14. !UNSAFE.compareAndSwapObject(this, runnerOffset,
  15. null, Thread.currentThread()))
  16. return;
  17. try {
  18. Callable<V> c = callable;
  19. if (c != null && state == NEW) {
  20. V result;
  21. boolean ran;
  22. try {
  23. result = c.call();
  24. ran = true;
  25. } catch (Throwable ex) {
  26. result = null;
  27. ran = false;
  28. setException(ex);
  29. }
  30. if (ran)
  31. set(result);
  32. }
  33. } finally {
  34. // runner must be non-null until state is settled to
  35. // prevent concurrent calls to run()
  36. runner = null;
  37. // state must be re-read after nulling runner to prevent
  38. // leaked interrupts
  39. int s = state;
  40. if (s >= INTERRUPTING)
  41. handlePossibleCancellationInterrupt(s);
  42. }
  43. }
  44. }

线程的常用方法

方法 功能说明 注意
start() 启动一个新线程,在新的线程中运行 run() 中的代码
run() 新线程启动后会调用的方法
join() 等待线程运行结束再继续
join(long n) 等待线程运行结束,最多等待 n 毫秒
getId() 获取线程长整型的 id,id 唯一
getName() 获取线程名
setName(String) 修改线程名
getPriority() 获取线程优先级
setPriority(int) 修改线程优先级 java中规定线程优先级是1~10 的整数
较大的优先级能提高该线程被 CPU 调度的机率
getState() 获取线程状态
isInterrupted() 判断线程是否被打断,不会清除打断标记
isAlive() 线程是否还存活
interrupt() 打断线程
interrupted() 判断当前线程是否被打断,会清除打断标记
方法是 static 的
currentThread() 获取当前正在执行的线程
方法是 static 的
sleep(long n) 让当前执行的线程休眠 n 毫秒,休眠时让出 cpu 的时间片给其它线程
方法是 static 的
yield() 提示线程调度器 让出当前线程对 CPU的使用,主要是为了测试和调试
方法是 static 的

start() & run()

  • start() 只是让线程进入就绪,里面代码不一定立刻运行
  • 每个线程对象的 start() 只能调用一次,如果调用多次会抛出 IllegalThreadStateException
  • 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默认行为
  • 直接调用 run(),是在主线程中执行了 run(),没有启动新的线程
  • 使用 start() 是启动新的线程,通过新的线程间接执行 run() 中的代码

    sleep() & yield()

    sleep (使线程阻塞)

  • 调用 sleep() 会让当前线程从 Running (运行) 进入 Timed Waiting 状态 (阻塞)。并让出 CPU 的使用权

  • 其它线程可以使用 interrupt() 打断正在睡眠的线程,这时 sleep() 会抛出 InterruptedException
  • 睡眠结束后的线程未必会立刻得到执行
  • 如果持有对象锁,则调用 sleep() 时,并不会释放对象锁
  • 建议用 TimeUnit 的 sleep() 代替 Thread 的 sleep() 来获得更好的可读性

    1. Long timeout = 1L;
    2. try {
    3. TimeUnit.SECONDS.sleep(timeout);
    4. } catch (InterruptedException e) {
    5. e.printStackTrace();
    6. }

    yield (让出当前线程)

  • 调用 yield() 会让当前线程从 Running (运行) 进入 Runnable 状态 (就绪),然后调度执行其它线程

  • 具体的调度实现依赖于操作系统的任务调度器

    interrupt()

  • 如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException。并清除打断标记,即执行 isInterrupted() 结果为 false

  • 如果被打断线程正在运行,则会设置打断标记,即执行 isInterrupted() 结果为 true
  • park 的线程被打断,也会设置打断标记 ( LockSupport.part() )
  • 正常运行的线程在被打断后,不会停止,会继续执行。如果要让线程在被打断后停下来,需要使用打断标记来判断

    1. while(true) {
    2. if(Thread.currentThread().isInterrupted()) {
    3. break;
    4. }
    5. }

    查看进程 & 线程的方法

    windows

  • 任务管理器可以查看进程和线程数,也可以用来杀死进程

  • tasklist 查看进程 taskkill 杀死进程

linux

  • ps -fe 查看所有进程
  • ps -fT -p <pid> 查看某个进程 (PID) 的所有线程
  • kill 杀死进程
  • top 按大写 H 切换是否显示线程
  • top -H -p 查看某个进程 (PID) 的所有线程

Java

  • jps 命令查看所有 Java 进程
  • jstack 查看某个 Java 进程 (PID) 的所有线程状态
  • jconsole 来查看某个 Java 进程中线程的运行情况 (图形界面)

jconsole 远程监控配置

  • 需要以如下方式运行你的 java 类

    1. java -Djava.rmi.server.hostname=`ip地址` -Dcom.sun.management.jmxremote -
    2. Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 -
    3. Dcom.sun.management.jmxremote.authenticate=是否认证 java
  • 修改 /etc/hosts 文件将 127.0.0.1 映射至主机名

如果要认证访问,还需要做如下步骤

  • 复制 jmxremote.password 文件
  • 修改 jmxremote.password 和 jmxremote.access 文件的权限为 600 即文件所有者可读写
  • 连接时填入 controlRole (用户名),R&D (密码)

    线程的状态

    操作系统的五种状态

Java API 的六种状态

线程状态的转换

  1. new –> runnable
  • 调用 t.start()
  1. runnable <–> waiting
  • 情况1: t 线程用 synchronized(obj) 获取了对象锁后
    • 调用 obj.wait(),t 线程从 runnable –> waiting
    • 调用 obj.notify()、obj.notifyAll() , t.interrupt() 时
      • 竞争锁成功,t 线程从 waiting –> runnable
      • 竞争锁失败,t 线程从 waiting –> blocked
  • 情况2:
    • 当前线程调用其他线程的 t.join(),当前线程从 runnable –> waiting(注意是当前线程在 t 线程锁对象的监视器上等待)
    • t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 waiting –> runnable
  1. runnable <–> timed_waiting
  • 情况1:t 线程用 synchronized(obj) 获取了对象锁后
    • 调用 obj.wait(long n) 方法时,t 线程从 runnable –> timed_waiting
    • t 线程等待时间超过了 n 毫秒,或调用 obj.notify()、obj.notifyAll()、t.interrupt() 时
      • 竞争锁成功,t 线程从 timed_waiting –> runnable
      • 竞争锁失败,t 线程从 timed_waiting –> blocked
  • 情况2
    • 当前线程调用其他线程的 t.join(long n),当前线程从 runnable –> timed_waiting(注意是当前线程在 t 线程锁对象的监视器上等待)
    • 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 timed_waiting –> runnable
  • 情况3
    • 当前线程调用 Thread.sleep(long n) ,当前线程从 runnable –> timed_waiting
    • 当前线程等待时间超过了 n 毫秒,当前线程从 timed_waiting –> runnable
  • 情况4
    • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程从 runnable –> timed_waiting
    • 调用 LockSupport.unpark(目标线程) 或调用了线程的 interrupt() ,或是等待超时,会让目标线程从 timed_waiting –> runnable
  1. runnable <–> blocked
    • t 线程用 synchronized(obj) 获取了对象锁时,如果竞争失败,从 runnable –> blocked
    • 持对象锁线程的同步代码块执行完毕,会唤醒该对象上所有 blocked 的线程重新竞争,如果其中 t 线程竞争 成功,则该线程从 blocked –> runnable ,其它失败的线程仍然是 blocked
  2. runnable <–> terminated
    • 当前线程所有代码运行完毕,进入 terminated

      线程池

      线程池是一种池化技术,类似的还有数据库连接池、HTTP 连接池等。
      池化的思想主要是:减少每次获取和结束资源的消耗,提高对资源的利用率。
      线程池维护着多个线程,还可以维护一些基本统计信息,例如已完成任务的数量。
      使用线程池的好处
  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性:使用线程池可以进行线程的统一分配,监控和调优。

图片.png下面学习 ThreadPoolExecutor

线程池状态

状态名称 高3位的值(二进制) 描述
running 111 (-1) 接收新任务,同时处理任务队列中的任务
shutdowm 000 (0) 不接受新任务,但是处理任务队列中的任务
stop 001 (1) 中断正在执行的任务,同时抛弃阻塞队列中的任务
tidying 010 (2) 任务执行完毕,活动线程为 0,即将进入终结阶段
terminated 011 (3) 终结状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
这些信息存储在一个原子变量 ctl 中,目的是:将线程池状态 与 线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值

  1. // c 为旧值, ctlOf 返回结果为新值
  2. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
  3. // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
  4. private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法

  1. public ThreadPoolExecutor(int corePoolSize, // 核心线程数
  2. int maximumPoolSize, // 最大线程数
  3. long keepAliveTime, // 救急线程空闲时的生存时间
  4. TimeUnit unit, // 救急线程的时间单位
  5. BlockingQueue<Runnable> workQueue, // 阻塞队列(存放任务)
  6. ThreadFactory threadFactory, // 线程工厂(可以给线程取名字)
  7. RejectedExecutionHandler handler) // 拒绝策略
  8. {}

补充说明

  • 阻塞队列的种类:
    • 有界阻塞队列 ArrayBlockingQueue
    • 无界阻塞队列 LinkedBlockingQueue
    • 最多只有一个同步元素的 SynchronousQueue
    • 优先队列 PriorityBlockingQueue
  • jdk 提供了 4 种实现拒绝策略
    • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy:将任务分给调用线程来执行
    • DiscardPolicy:放弃本次任务
    • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
  • 其他著名框架实现的拒接策略

    • Dubbo 的实现:在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
    • Netty 的实现:创建一个新线程来执行任务
    • ActiveMQ 的实现:带超时等待 (60s) 尝试放入队列
    • PinPoint 的实现:使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

      工作方式

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务

  • 当线程数达到 核心线程数,并没有线程空闲,这时再加入任务,新加的任务会被加入 阻塞队列 排队,直到有空闲的线程
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 个线程来救急,救急线程用完以后,超过生存时间后会被释放
  • 如果线程到达 最大线程数,但仍然有新任务,这时会执行拒绝策略

    创建线程池的工厂

    根据构造方法,JDK 的 Executors 类中提供了众多工厂方法来创建各种用途的线程池
    newFixedThreadPool

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }

    特点

  • 核心线程数 == 最大线程数

  • 即没有救急线程被创建,因此无需超时时间
  • 阻塞队列是无界,可以放任意数量的任务
  • 适用于任务量已知,相对耗时的任务

newCachedThreadPool

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
  • 救急线程可以无限创建
  • 整个线程池表现为:线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。
  • 适合任务数比较密集,但每个任务执行时间较短的情况
  • 队列采用了 SynchronousQueue 实现
    • 特点是:它没有容量,没有线程来取是放不进去的,只有当线程取任务时,才会将任务放入该阻塞队列中(一手交钱、一手交货)

newSingleThreadExecutor

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

使用场景:希望多个任务排队执行。当任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程不会被释放。
和 单线程 及 newFixedThreadPool 的区别

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止,那么没有任何补救措施。而线程池还会新建一个线程,保证池的正常工作
  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
  • FinalizableDelegatedExecutorService 类应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 线程个数初始时为 1,以后还可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
    1. // 强转为ThreadPoolExecutor
    2. ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    3. // 改变核心线程数
    4. threadPool.setCorePoolSize(2);

    提交任务

    ```java // 执行任务 void execute(Runnable command);

// 提交任务,用返回值 Future 获得任务执行结果

Future submit(Callable task);

// 提交 tasks 中所有任务

List> invokeAll(Collection<? extends Callable> tasks);

// 提交 tasks 中所有任务,带超时时间

List> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit);

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

T invokeAny(Collection<? extends Callable> tasks);

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间

T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit);

  1. <a name="rLECl"></a>
  2. ## 关闭线程池
  3. ```java
  4. // 线程池状态变为 shutdowm,此方法不会阻塞调用线程的执行
  5. void shutdown();
  6. // 线程池状态变为 stop,会将队列中的任务返回
  7. List<Runnable> shutdownNow();
  8. // 不在 running 状态的线程池,此方法就返回 true
  9. boolean isShutdown();
  10. // 线程池状态是否是 terminated
  11. boolean isTerminated();
  12. // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 terminated 后做些事情,可以利用此方法等待
  13. boolean awaitTermination(long timeout, TimeUnit unit);

任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能。
Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
Timer 测试

  1. public static void main(String[] args) {
  2. Timer timer = new Timer();
  3. TimerTask task1 = new TimerTask() {
  4. @Override
  5. public void run() {
  6. log.debug("task 1");
  7. Thread.sleep(2000);
  8. }
  9. };
  10. TimerTask task2 = new TimerTask() {
  11. @Override
  12. public void run() {
  13. log.debug("task 2");
  14. }
  15. };
  16. // 使用 timer 添加两个任务,希望它们都在 1s 后执行。
  17. // 但由于 timer 内只有一个线程来顺序执行队列中的任务,
  18. // 因此『任务1』的延时,影响了『任务2』的执行
  19. timer.schedule(task1, 1000);
  20. timer.schedule(task2, 1000);
  21. }
  22. // 输出结果
  23. 20:46:09.444 c.TestTimer [main] - start...
  24. 20:46:10.447 c.TestTimer [Timer-0] - task 1
  25. 20:46:12.448 c.TestTimer [Timer-0] - task 2

ScheduledExecutorService 测试

  1. public static void main(String[] args) {
  2. ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
  3. // 添加两个任务,希望它们都在 1s 后执行
  4. pool.schedule(() -> {
  5. System.out.println("任务1,执行时间:" + new Date());
  6. Thread.sleep(2000);
  7. }, 1000, TimeUnit.MILLISECONDS);
  8. pool.schedule(() -> {
  9. System.out.println("任务2,执行时间:" + new Date());
  10. }, 1000, TimeUnit.MILLISECONDS);
  11. // 间隔是 上一个任务开始 <-> 延时 <-> 下一个任务开始
  12. pool.scheduleAtFixedRate();
  13. // 间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始
  14. pool.scheduleWithFixedDelay();
  15. }
  16. // 输出结果
  17. 任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
  18. 任务2,执行时间:Thu Jan 03 12:45:17 CST 2019

整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务

Fork / Join 分支合并

Fork / Join 是 JDK7 加入的线程池实现,它体现的是一种分治思想。
适用于能够进行任务拆分的 cpu 密集型运算,任务拆分是:将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。
Fork / Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork / Join 默认会创建与 cpu 核心数大小相同的线程池。


提交给 Fork / Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)