1. 线程池
1.1 ThreadPoolExecutor
- Running:运行状态。能处理新提交的任务,也能处理阻塞队列中的任务
- Shutdown:关闭状态。不能接受新提交的任务,但是能够继续处理阻塞队列中已经保存的任务。
- Stop:不接收新的任务,也不能处理队列中已经保存的任务
- Tidying:如果所有的任务都已经终止,线程池会进入该状态
- Terminated:Tidying 状态调用
terminated()
进入该状态。
ThreadPool 提供的方法
- execute ():提交任务,交给线程池执行
- submit ():提交任务,能够返回执行结果 execute+ Future
- shutdown ():关闭线程池,等待任务都执行完
shutdownNow() :关闭线程池,不等待任务执行完
getTaskCount ():线程池已执行和未执行的任务总数
- getCompletedTaskCount ():已完成的任务数量
- getPoolSize ():线程池当前的线程数量
- getActiveCount ():当前线程池中正在执行任务的线程数量
- getQueue():返回执行程序使用的任务队列
- isTerminating():如果此执行程序正在终止,则返回true
- isTerminated()
生产上的一个使用实例
/**
* 同步录播课章节顺序
*
* @return
* @throws InterruptedException
*/
@ResponseBody
@RequestMapping(value = "/synchroCourseVideoChapter", method = RequestMethod.GET)
public String synchroCourseVideoChapter() throws InterruptedException {
log.info("#############videoChapter:同步排序章节开始");
String status;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
ClassModule module = new ClassModule();
module.setPageSize(100);
module.setDelFlag(0);
module.setTeachMethod("TEACH_METHOD_VIDEO");
int page = 1;
for (; ; page++) {
module.setPage(page);
List<ClassModule> classModules = classModuleServiceImpl.findClassModuleByPage(module);
if (classModules.isEmpty() || classModules.size() == 0) {
log.info("#############videoChapter:暂无章节可以处理");
break;
}
for (ClassModule classModule : classModules)
executorService.execute(new CourseVideoChapterThread(classModule.getId()));
// 获取任务队列的长度
int size = executorService.getQueue().size();
try {
// 判断任务队列长度,计算休眠时间
Thread.sleep((size / 10) == 0 ? (size / 10) : 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
for (; ; ) {
// 用死循环去判断执行程序是否已经终止,来更改状态
if (executorService.isTerminated()) {
// 所有的子线程都结束了
status = JsonMsg.SUCCESS;
log.info("#############videoChapter:更新录播课章节顺序处理完成");
break;
}
Thread.sleep(1000);
}
return status;
}
线程池类图
1.2 Executor 接口
- Executors.newCachedThreadPool:创建可缓存的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Executors.newFixedThreadPool:创建定长线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Executors.newScheduledThreadPool:创建定长线程池,并且支持定时和周期性的执行
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
Demo:
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.schedule(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 3, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
}, 1, 3, TimeUnit.SECONDS);
// executorService.shutdown();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
log.warn("timer run");
}
}, new Date(), 5 * 1000);
}
- Executors.newSingleThreadExecutor:创建单线程化的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1.3 线程池的合理配置
- CPU 密集型任务,就需要尽量压榨 CPU,参考值可以设为 NCPU+1 (NCPU:CPU数量)
- CPU 密集型:指的是系统的硬盘、内存性能相对 CPU 要好很多(硬盘、内存 > CPU)
- IO 密集型任务,参考值可以设置为 2*NCPU
- 系统的 CPU 性能相对硬盘、内存要好很多 (CPU > 硬盘、内存)
2. 多线程并发最佳实战
- 使用本地变量
- 使用不可变类
- 最小化锁的作用域范围:S = 1/(1-a + a/n) (阿木达尔定律)
- a:串行计算部分所占比例
- n:并行处理的节点个数
- 当 a=0 时,没有串行只有并行,最大加速比 s=n;当 a=1 时,最小加速比 s=1;当n→∞时,极限加速比
s→ 1/a
,这也就是加速比的上限。例如,若串行代码占整个代码的25%,则并行处理的总体性能不可能超过4。这一公式已被学术界所接受,并被称做「阿姆达尔定律」(Amdahl law)。
- 使用线程池的 Executor,而不是直接 new Thread 执行
- 宁可使用同步也不要使用线程的 wait 和 notify
- 使用 Blocking Queue 实现生产-消费模式
- 使用并发集合而不是加了锁的同步集合
- 使用 Semaphore 刨建有界的访向
- 宁可使用同步代码块,也不使用同步的方法
- 避免使用静态变量
3. Spring 与线程安全
- Spring bean:singleton、prototype
- 提供了 scope 属性来表示该 bean 的作用域,它是这个 bean 的生命周期
- spring 默认 scope 为 singleton,没生成一个 bean,默认是单例对象,该对象的生命周期与 spring 容器是一致的,只是在第一次被注入时才会创建。
- scope 为 prototype 时,bean 被定义为在每次注入时都被创建。(频繁创建,会严重影响性能)
- 无状态对象
- 我们交于 spring 管理的对象都会无状态对象,不会因为多线程导致状态被破坏。
- 无状态对象是自身没有状态的对象,包括 dto、vo、service、dao、controller 等
spring 根本就没有对 bean 的多线程安全问题做出保证和措施,对于每个 bean 的线程安全问题,根本原因是因为每个 bean 自身设计没有在 bean 中声明任何有状态的实例变量或类变量。如果必须加入有状态的实例变量或者类变量,让类变成有状态的对象时,就需要使用 ThreadLocal
把变量变成线程私有的;如果 bean 的实例变量或者类变量需要在多个线程之间共享,那么我们就只能使用 synchronized、lock、CAS
等这些来实现线程同步的方法了。
4. HashMap 和 ConcurrentHashMap
单线程下 HashMap 的 rehash:
多线程并发下的 rehash:
横线上边是第一个线程,执行一半,下边的第二个线程开始执行,并且执行完了 rehash,接下来线程1被唤醒:
key = 5 和 key = 9 出现循环列表,访问时会 rehash 会出现死循环。
1.7 里的 ConcurrentHashMap:
1.8 里的 ConcurrentHashMap:
**