线程与进程:一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有其自己得CPU一样。其底层机制是切分CPU时间
线程模型:简化了在单一程序中同时交织在一起的多个操作的处理。在使用线程时,CPU将轮流给每个任务分配其占用时间。每个任务都觉得自己一直占用CPU,但事实上CPU时间是划分成片段分配给了所有的任务。
线程驱动的任务由Runnable接口来提供。Runnable只是定义了线程的任务而已,并不是实现线程的方式。run方法也并无特殊之处,它不会产生任何内在的线程能力。
ex:定义任务
public class LiftOff implements Runnable{
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff(){}
public LiftOff(int countDown){
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
@Override
public void run() {
System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
while (countDown-- > 0){
System.out.print(status());
Thread.yield();
}
}
}
shutdownNow与showdown的区别
shutdownNow:会引发中断异常,但此时任务依旧在运行;可以在中断异常程序处理中加上你特定的处理 ( 比如,发生中断异常时候,return 退出程序)
showdown:任务完成之后才会关闭
对使用线程池来说:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LiftOff implements Runnable{
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff(){}
public LiftOff(int countDown){
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
@Override
public void run() {
System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
while (countDown-- > 0){
System.out.print(status());
Thread.yield();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
System.out.println("线程被外部打断了"+e.getMessage());
return;
}
}
}
@Override
public String toString() {
return "LiftOff{" +
"id=" + id +
'}';
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new LiftOff());
executorService.shutdownNow();
}
}
多线程下访问一个资源可能会出现的问题:countDown资源在其他线程处理的时候可能被访问到。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LiftOff implements Runnable{
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff(){}
public LiftOff(int countDown){
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" +
(countDown > 0 ? countDown : "Liftoff!") + "), ";
}
@Override
public void run() {
System.out.println("LiftOff当前线程为: "+Thread.currentThread().getName());
while (countDown-- > 0){
System.out.print(status());
Thread.yield();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
System.out.println("线程被外部打断了"+e.getMessage());
return;
}
}
}
@Override
public String toString() {
return "LiftOff{" +
"id=" + id +
'}';
}
public static void main(String[] args) {
LiftOff liftOff = new LiftOff();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(liftOff);
}
executorService.shutdown();
}
}
//~ output
#0(7), #0(8), #0(9), #0(5), #0(6), #0(3), #0(2), #0(3), #0(Liftoff!), #0(1),
由以下示例扩展对数据表跑批的任务
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class LiftOffExt1 implements Runnable{
Random random = new Random(47);
private String name;
public LiftOffExt1(){}
public LiftOffExt1(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("对"+name+"表开始进行跑批任务");
try {
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+"表跑批任务结束");
}
//待优化 ->参考设计模式进行优化
public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
switch (i){
case 1:
new Thread(new LiftOffExt1("user")).start();
break;
case 2:
new Thread(new LiftOffExt1("book")).start();
break;
case 3:
new Thread(new LiftOffExt1("cpu")).start();
break;
case 4:
new Thread(new LiftOffExt1("note")).start();
break;
case 5:
new Thread(new LiftOffExt1("mobile")).start();
break;
}
}
}
}
Executor管理线程:允许管理异步任务的执行,而无需显示的管理线程的生命周期
留意各个方法的对应关系,
//从ThreadPoolExecutor构造器参数中分析各个执行器静态方法的区别
Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
也可以通过ThreadPoolExecutor来手动构建自己的线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor属性说明
1、corePoolSize:核心线程数
* 核心线程会一直存活,及时没有任务需要执行
* 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
* 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
2、queueCapacity:任务队列容量(阻塞队列)
* 当核心线程数达到最大时,新任务会放在队列中排队等待执行
3、maxPoolSize:最大线程数
* 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
* 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
4、 keepAliveTime:线程空闲时间
* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
* 如果allowCoreThreadTimeout=true,则会直到线程数量=0
5、allowCoreThreadTimeout:允许核心线程超时
6、rejectedExecutionHandler:任务拒绝处理器
* 两种情况会拒绝处理任务:
- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
* 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
* ThreadPoolExecutor类有几个内部实现类来处理这类情况:
- AbortPolicy 丢弃任务,抛运行时异常
- CallerRunsPolicy 执行任务
- DiscardPolicy 忽视,什么都不会发生
- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* 实现RejectedExecutionHandler接口,可自定义处理器
ThreadPoolExecutor执行顺序:
线程池按以下行为执行任务
1. 当线程数小于核心线程数时,创建线程。
2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
3. 当线程数大于等于核心线程数,且任务队列已满
-1 若线程数小于最大线程数,创建线程
-2 若线程数等于最大线程数,抛出异常,拒绝任务
ThreadPoolExecutor属性一般设置:
通常核心线程数可以设为CPU数量+1,而最大线程数可以设为CPU的数量*2+1。
获取CPU数量的方法为:
Runtime.getRuntime().availableProcessors();
Executor管理线程:允许管理异步任务的执行,而无需显示的管理线程的生命周期
留意各个方法的对应关系
//从ThreadPoolExecutor构造器参数中分析各个执行器静态方法的区别
Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
也可以通过ThreadPoolExecutor来手动构建自己的线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
从任务中产生返回值
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
class TaskResult implements Callable<String> {
Random random = new Random(47);
private int id;
public TaskResult(int id){
this.id = id;
}
@Override
public String call() throws Exception {
try {
//模拟操作的耗时
TimeUnit.SECONDS.sleep(random.nextInt(50));
}catch (InterruptedException e){
System.out.println("任务被中断了");
}
return "TaskResult "+id;
}
}
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
futures.add( executorService.submit(new TaskResult(i)));//不会阻塞主线程
}
for (Future<String> future : futures) {
if(future.isDone()) {//isDone是非阻塞调用
System.out.println(future.get());//阻塞式的调用
}
}
executorService.shutdownNow();//全部执行完成后才会shutdown,但并不阻塞当前线程的执行
System.out.println("关闭任务后代码继续执行");
}
}
模拟一个学校收取费用的示例
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
class TaskResult2 implements Callable<String> {
Random random = new Random();
private int collegeNo;
public TaskResult2(int collegeNo){
this.collegeNo = collegeNo;
}
/**
* 前提 student表百万的数据
* group上有个索引
* select sum(money) from student where college = "一"
* select sum(money) from student; //直接查询
* select sum(money) from student where college = "二"
* select sum(money) from student where college = "三"
* select sum(money) from student where college = "四"
*
* ----------------------------------
* / id name college money
* / 1 AA 一 8
* / 2 BB 二 9
* / 3 CC 三 7
* / 4 DD 四 6
* .....................
*
* @return
* @throws Exception
*/
@Override
public String call() throws Exception {
//select sum(money) from student where college = collegeNo;
final int randomMoney = random.nextInt(1000);
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
System.out.println(randomMoney);
return collegeNo+ "==" +randomMoney;
}
}
public class CallableDemo2App {
public static void main(String[] args) {
Integer totalMoney = 0;
List<Future<String>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0;i < 5;i++) {
Future<String> res = executorService.submit(new TaskResult2(i));
futures.add(res);
}
for (Future<String> future : futures) {
try {
final String s = future.get();
String money = s.split("==")[1];
totalMoney += Integer.valueOf(money);
}catch (InterruptedException | ExecutionException e){
System.out.println("获取结果被中断了"+e.getMessage());
}
}
System.out.println("一共获取的钱数为: "+totalMoney);
executorService.shutdown();
}
}
无法捕获从线程中逃逸的异常
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExceptionThread implements Runnable{
@Override
public void run() {
throw new RuntimeException("线程异常");
}
public static void testExec(){
//new Thread(new ExceptionThread()).start();
ExecutorService executorService = Executors.newCachedThreadPool();
try {
executorService.execute(new ExceptionThread());
}catch (RuntimeException e){
System.out.println(e.getMessage());
}
executorService.shutdown();
}
public static void main(String[] args) {
testExec();
}
}
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
public class NaiveExceptionHandling {
public static void main(String[] args) {
try {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
} catch (RuntimeException ue) {
// This statement will NOT execute!
System.out.println("Exception has been handled!");
}
}
}
注意:无法获取异常栈的信息,只能处理异常
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class ExceptionThread2 implements Runnable{
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println(" run() by "+t);
System.out.println(" eh = "+t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(" 异常信息被caught "+e);
//e.printStackTrace();
}
}
class HandlerThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " Creating new Thread");
Thread t = new Thread(r);
System.out.println(" created thread "+t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println(" eh= "+t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory());
service.execute(new ExceptionThread2());
service.shutdown();
}
}