创建线程的方式
方法一:继承 Thread 创建线程
public class CreateThread {public static void main(String[] args) {Thread myThread = new MyThread();// 启动线程myThread.start();}}class MyThread extends Thread {@Overridepublic void run() {System.out.println("my thread running");}}
方法二:使用 Runnable 配合 Thread
推荐使用
public class CreateThread2 {private static class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println("my runnable running");}}public static void main(String[] args) {MyRunnable myRunnable = new MyRunnable();Thread thread = new Thread(myRunnable, "my thread name");thread.start();}}
方法三:使用 FutureTask 配合 Thread
使用 FutureTask 可以用泛型指定线程的返回值类型(Runnable 的 run() 没有返回值)
public class UseFutureTask {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new MyCall());Thread thread = new Thread(futureTask);thread.start();// 获得线程运行后的返回值System.out.println(futureTask.get());}}class MyCall implements Callable<String> {@Overridepublic String call() throws Exception {return "hello world";}}
源码
Thread 的源码
理解 Runnable 的原理
public class Thread implements Runnable {/* What will be run. */private Runnable target;@Overridepublic void run() {if (target != null) {target.run();}}}
FutureTask 的源码
public class FutureTask<V> implements RunnableFuture<V> {/** The underlying callable; nulled out after running *//** 底层可调用对象:运行后无效 */private Callable<V> callable;public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable// 确保可调用对象的可见性}public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}}
线程的常用方法
| 方法 | 功能说明 | 注意 |
|---|---|---|
| start() | 启动一个新线程,在新的线程中运行 run() 中的代码 | |
| run() | 新线程启动后会调用的方法 | |
| join() | 等待线程运行结束再继续 | |
| join(long n) | 等待线程运行结束,最多等待 n 毫秒 | |
| getId() | 获取线程长整型的 id,id 唯一 | |
| getName() | 获取线程名 | |
| setName(String) | 修改线程名 | |
| getPriority() | 获取线程优先级 | |
| setPriority(int) | 修改线程优先级 | java中规定线程优先级是1~10 的整数 较大的优先级能提高该线程被 CPU 调度的机率 |
| getState() | 获取线程状态 | |
| isInterrupted() | 判断线程是否被打断,不会清除打断标记 | |
| isAlive() | 线程是否还存活 | |
| interrupt() | 打断线程 | |
| interrupted() | 判断当前线程是否被打断,会清除打断标记 方法是 static 的 |
|
| currentThread() | 获取当前正在执行的线程 方法是 static 的 |
|
| sleep(long n) | 让当前执行的线程休眠 n 毫秒,休眠时让出 cpu 的时间片给其它线程 方法是 static 的 |
|
| yield() | 提示线程调度器 让出当前线程对 CPU的使用,主要是为了测试和调试 方法是 static 的 |
start() & run()
- start() 只是让线程进入就绪,里面代码不一定立刻运行
- 每个线程对象的 start() 只能调用一次,如果调用多次会抛出 IllegalThreadStateException
- 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默认行为
- 直接调用 run(),是在主线程中执行了 run(),没有启动新的线程
使用 start() 是启动新的线程,通过新的线程间接执行 run() 中的代码
sleep() & yield()
sleep (使线程阻塞)
调用 sleep() 会让当前线程从 Running (运行) 进入 Timed Waiting 状态 (阻塞)。并让出 CPU 的使用权
- 其它线程可以使用 interrupt() 打断正在睡眠的线程,这时 sleep() 会抛出 InterruptedException
- 睡眠结束后的线程未必会立刻得到执行
- 如果持有对象锁,则调用 sleep() 时,并不会释放对象锁
建议用 TimeUnit 的 sleep() 代替 Thread 的 sleep() 来获得更好的可读性
Long timeout = 1L;try {TimeUnit.SECONDS.sleep(timeout);} catch (InterruptedException e) {e.printStackTrace();}
yield (让出当前线程)
调用 yield() 会让当前线程从 Running (运行) 进入 Runnable 状态 (就绪),然后调度执行其它线程
-
interrupt()
如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException。并清除打断标记,即执行 isInterrupted() 结果为 false
- 如果被打断线程正在运行,则会设置打断标记,即执行 isInterrupted() 结果为 true
- park 的线程被打断,也会设置打断标记 ( LockSupport.part() )
正常运行的线程在被打断后,不会停止,会继续执行。如果要让线程在被打断后停下来,需要使用打断标记来判断。
while(true) {if(Thread.currentThread().isInterrupted()) {break;}}
查看进程 & 线程的方法
windows
任务管理器可以查看进程和线程数,也可以用来杀死进程
tasklist查看进程taskkill杀死进程
linux
ps -fe查看所有进程ps -fT -p <pid>查看某个进程 (PID) 的所有线程kill杀死进程top按大写 H 切换是否显示线程top -H -p查看某个进程 (PID) 的所有线程
Java
- jps 命令查看所有 Java 进程
jstack查看某个 Java 进程 (PID) 的所有线程状态jconsole来查看某个 Java 进程中线程的运行情况 (图形界面)
jconsole 远程监控配置
需要以如下方式运行你的 java 类
java -Djava.rmi.server.hostname=`ip地址` -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 -Dcom.sun.management.jmxremote.authenticate=是否认证 java类
修改 /etc/hosts 文件将 127.0.0.1 映射至主机名
如果要认证访问,还需要做如下步骤
- 复制 jmxremote.password 文件
- 修改 jmxremote.password 和 jmxremote.access 文件的权限为 600 即文件所有者可读写
- 连接时填入 controlRole (用户名),R&D (密码)
线程的状态
操作系统的五种状态
Java API 的六种状态
线程状态的转换
- new –> runnable
- 调用 t.start()
- runnable <–> waiting
- 情况1: t 线程用 synchronized(obj) 获取了对象锁后
- 调用 obj.wait(),t 线程从 runnable –> waiting
- 调用 obj.notify()、obj.notifyAll() , t.interrupt() 时
- 竞争锁成功,t 线程从 waiting –> runnable
- 竞争锁失败,t 线程从 waiting –> blocked
- 情况2:
- 当前线程调用其他线程的 t.join(),当前线程从 runnable –> waiting(注意是当前线程在 t 线程锁对象的监视器上等待)
- t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 waiting –> runnable
- runnable <–> timed_waiting
- 情况1:t 线程用 synchronized(obj) 获取了对象锁后
- 调用 obj.wait(long n) 方法时,t 线程从 runnable –> timed_waiting
- t 线程等待时间超过了 n 毫秒,或调用 obj.notify()、obj.notifyAll()、t.interrupt() 时
- 竞争锁成功,t 线程从 timed_waiting –> runnable
- 竞争锁失败,t 线程从 timed_waiting –> blocked
- 情况2:
- 当前线程调用其他线程的 t.join(long n),当前线程从 runnable –> timed_waiting(注意是当前线程在 t 线程锁对象的监视器上等待)
- 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 timed_waiting –> runnable
- 情况3:
- 当前线程调用 Thread.sleep(long n) ,当前线程从 runnable –> timed_waiting
- 当前线程等待时间超过了 n 毫秒,当前线程从 timed_waiting –> runnable
- 情况4:
- 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线程从 runnable –> timed_waiting
- 调用 LockSupport.unpark(目标线程) 或调用了线程的 interrupt() ,或是等待超时,会让目标线程从 timed_waiting –> runnable
- runnable <–> blocked
- t 线程用 synchronized(obj) 获取了对象锁时,如果竞争失败,从 runnable –> blocked
- 持对象锁线程的同步代码块执行完毕,会唤醒该对象上所有 blocked 的线程重新竞争,如果其中 t 线程竞争 成功,则该线程从 blocked –> runnable ,其它失败的线程仍然是 blocked
- runnable <–> terminated
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行
- 提高线程的可管理性:使用线程池可以进行线程的统一分配,监控和调优。
线程池状态
| 状态名称 | 高3位的值(二进制) | 描述 |
|---|---|---|
| running | 111 (-1) | 接收新任务,同时处理任务队列中的任务 |
| shutdowm | 000 (0) | 不接受新任务,但是处理任务队列中的任务 |
| stop | 001 (1) | 中断正在执行的任务,同时抛弃阻塞队列中的任务 |
| tidying | 010 (2) | 任务执行完毕,活动线程为 0,即将进入终结阶段 |
| terminated | 011 (3) | 终结状态 |
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
这些信息存储在一个原子变量 ctl 中,目的是:将线程池状态 与 线程个数合二为一,这样就可以用一次 CAS 原子操作进行赋值
// c 为旧值, ctlOf 返回结果为新值ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 救急线程空闲时的生存时间TimeUnit unit, // 救急线程的时间单位BlockingQueue<Runnable> workQueue, // 阻塞队列(存放任务)ThreadFactory threadFactory, // 线程工厂(可以给线程取名字)RejectedExecutionHandler handler) // 拒绝策略{}
补充说明
- 阻塞队列的种类:
- 有界阻塞队列 ArrayBlockingQueue
- 无界阻塞队列 LinkedBlockingQueue
- 最多只有一个同步元素的 SynchronousQueue
- 优先队列 PriorityBlockingQueue
- jdk 提供了 4 种实现拒绝策略
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy:将任务分给调用线程来执行
- DiscardPolicy:放弃本次任务
- DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
其他著名框架实现的拒接策略
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到 核心线程数,并没有线程空闲,这时再加入任务,新加的任务会被加入 阻塞队列 排队,直到有空闲的线程
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 个线程来救急,救急线程用完以后,超过生存时间后会被释放
如果线程到达 最大线程数,但仍然有新任务,这时会执行拒绝策略
创建线程池的工厂
根据构造方法,JDK 的 Executors 类中提供了众多工厂方法来创建各种用途的线程池
newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
特点
核心线程数 == 最大线程数
- 即没有救急线程被创建,因此无需超时时间
- 阻塞队列是无界,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
特点
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
- 整个线程池表现为:线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。
- 适合任务数比较密集,但每个任务执行时间较短的情况
- 队列采用了 SynchronousQueue 实现
- 特点是:它没有容量,没有线程来取是放不进去的,只有当线程取任务时,才会将任务放入该阻塞队列中(一手交钱、一手交货)
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
使用场景:希望多个任务排队执行。当任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程不会被释放。
和 单线程 及 newFixedThreadPool 的区别
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止,那么没有任何补救措施。而线程池还会新建一个线程,保证池的正常工作
Executors.newSingleThreadExecutor()线程个数始终为1,不能修改- FinalizableDelegatedExecutorService 类应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1)线程个数初始时为 1,以后还可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改// 强转为ThreadPoolExecutorThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);// 改变核心线程数threadPool.setCorePoolSize(2);
提交任务
```java // 执行任务 void execute(Runnable command);
// 提交任务,用返回值 Future 获得任务执行结果
// 提交 tasks 中所有任务
// 提交 tasks 中所有任务,带超时时间
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<a name="rLECl"></a>## 关闭线程池```java// 线程池状态变为 shutdowm,此方法不会阻塞调用线程的执行void shutdown();// 线程池状态变为 stop,会将队列中的任务返回List<Runnable> shutdownNow();// 不在 running 状态的线程池,此方法就返回 trueboolean isShutdown();// 线程池状态是否是 terminatedboolean isTerminated();// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 terminated 后做些事情,可以利用此方法等待boolean awaitTermination(long timeout, TimeUnit unit);
任务调度线程池
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能。
Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
Timer 测试
public static void main(String[] args) {Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");Thread.sleep(2000);}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}};// 使用 timer 添加两个任务,希望它们都在 1s 后执行。// 但由于 timer 内只有一个线程来顺序执行队列中的任务,// 因此『任务1』的延时,影响了『任务2』的执行timer.schedule(task1, 1000);timer.schedule(task2, 1000);}// 输出结果20:46:09.444 c.TestTimer [main] - start...20:46:10.447 c.TestTimer [Timer-0] - task 120:46:12.448 c.TestTimer [Timer-0] - task 2
ScheduledExecutorService 测试
public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);// 添加两个任务,希望它们都在 1s 后执行pool.schedule(() -> {System.out.println("任务1,执行时间:" + new Date());Thread.sleep(2000);}, 1000, TimeUnit.MILLISECONDS);pool.schedule(() -> {System.out.println("任务2,执行时间:" + new Date());}, 1000, TimeUnit.MILLISECONDS);// 间隔是 上一个任务开始 <-> 延时 <-> 下一个任务开始pool.scheduleAtFixedRate();// 间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始pool.scheduleWithFixedDelay();}// 输出结果任务1,执行时间:Thu Jan 03 12:45:17 CST 2019任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务
Fork / Join 分支合并
Fork / Join 是 JDK7 加入的线程池实现,它体现的是一种分治思想。
适用于能够进行任务拆分的 cpu 密集型运算,任务拆分是:将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。
Fork / Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork / Join 默认会创建与 cpu 核心数大小相同的线程池。
提交给 Fork / Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
下面学习 ThreadPoolExecutor
