一:终止线程的设计模式

问题:一个线程如何安全的终止另一个线程

stop()的问题:如果线程锁住共享资源,被杀死后没有机会释放锁,其他线程也无法获取锁
System.exit()的问题: 会让整个程序都停止

正确姿势:Java的中断机制

Two-phase Termination模式,两段式

第一阶段:线程T1向T2发送终止指令
第二阶段:线程T2响应终止指令

Java中,终止任务的前提:线程进入Runnable状态,—-调用 interrupt(), 休眠线程——-> Runnable
优雅的讲Runnable —>终止:让线程执行完run 方法,
一般方法:设置标志位,检查标志位,判断是否返回run方法

注意点:
一:仅检查终止标志位不够,还要判断线程是否处于休眠态
二:正确合理的处理异常, 我们需要自定义线程终止标志位用于终止线程

使用场景:
1:安全的终止线程,需要释放该释放的资源
2:确保终止处理逻辑在线程结束之前一定会执行时,可以使用该方法

二:避免共享的设计模式

Immutability模式 CorpOnWrite模式,Thread-Specific Storage模式

  • Immutability 模式属性的不可变性
  • 读写模式注意拷贝的性能问题
  • ThreadSpecific Storage注意异步执行问题

Immutability 模式

多线程同事读写一个共享比那里存在并发
这种就是把共享变量变成不可变性
对象一旦被创建就不会再发发生变化。

如何实现:
类中定义final ,并只存在读方法,,但是被继承后有可能会被改,类也被定义成fianl

JDK 中很多是不可变的类,如String Long,Integer Double ,这些对象的安全性靠不可变性保证。
不可变的三点要求:类和 属性都是final ,所有方法均只读

注意实现

  1. 对象的所有属性都是final,并保障不可变性
  2. 不可变对象也需要政正常发布

要确定不可变性的边界,是否要求属性对象也是具备不可变性

  1. class Foo{
  2. int age=0;
  3. int name="abc";
  4. }
  5. final class Bar {
  6. final Foo foo;
  7. void setAge(int a){
  8. foo.age=a;
  9. }
  10. }
  11. barfinal的,但是还是有set方法

可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。

  1. //Foo线程安全
  2. final class Foo{
  3. final int age=0;
  4. final String name="abc";
  5. }
  6. //Bar线程不安全
  7. class Bar {
  8. Foo foo;
  9. void setFoo(Foo f){
  10. this.foo=f;
  11. }
  12. }

Copy-on-Write模式

String的 replace() 方法,并不会修改原字符串的内容,而是复制一个新的字符串,也是一种读写

String Interge Long 都基于Copy on Write方案实现
确定:消耗内存
如果读多写少可以考虑

应用场景:
CopyOnWriteArrayList
CopyOnWriteArraySet
无锁集合,
Unix的操作系统创建的挨批 fork

Thread-Specific Storage 模式——没有共享就没有伤害

本地线程变量,本地线程的变量副本,

三: 多线程版本的if模式

Guarded Suspension模式

守护线程
就是守护-挂起,一个任务在等待其他任务的返回
多线程版本的if?

应用场景:
多线程环境下多个线程访问相同实例资源,从实例资源获取资源并处理
实例资源需要管理自身拥有的资源,并对线程的请求作出判断

代码实例:

  1. public class GuardedObject<T> {
  2. //结果
  3. private T obj;
  4. //获取结果
  5. public T get(){
  6. synchronized (this){
  7. //没有结果等待 防止虚假唤醒
  8. while (obj==null){
  9. try {
  10. this.wait();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. return obj;
  16. }
  17. }
  18. //产生结果
  19. public void complete(T obj){
  20. synchronized (this){
  21. //获取到结果,给obj赋值
  22. this.obj = obj;
  23. //唤醒等待结果的线程
  24. this.notifyAll();
  25. }
  26. }
  27. }

Balking模式

如果一个线程做了,那么当前线程就不做了
典型的就是单例模式
同 Guarded Suspension 存在守护条件,如有条件不足就中断

常见的场景
sychronized 轻量级锁膨胀,只有一个线程膨胀获取到Monitor对象
单例模式
服务组件的初始化

如果实现Blanking模式
1:锁机制 sychronized reentrantLock
2:cas
3:共享变量 volatile
编辑器的自动保存功能?

  1. boolean changed=false;
  2. // 自动存盘操作
  3. void autoSave(){
  4. synchronized(this){
  5. if (!changed) {
  6. return;
  7. }
  8. changed = false;
  9. }
  10. // 执行存盘操作
  11. // 省略且实现
  12. this.execSave();
  13. }
  14. // 编辑操作
  15. void edit(){
  16. // 省略编辑逻辑
  17. ......
  18. change();
  19. }
  20. // 改变状态
  21. void change(){
  22. synchronized(this){
  23. changed = true;
  24. }
  25. }

d单词初始化

  1. boolean inited = false;
  2. synchronized void init(){
  3. if(inited){
  4. return;
  5. }
  6. //省略doInit的实现
  7. doInit();
  8. inited=true;
  9. }

四:多线程分工

Thread-Per-Message 模式

注意线程的创建,销毁,是否会造成OOM

为每个任务分配一个独立的线程
应用场景:网络编程服务端的实现,

  1. final ServerSocketChannel ssc= ServerSocketChannel.open().bind(new InetSocketAddress(8080));
  2. //处理请求
  3. try {
  4. while (true) {
  5. // 接收请求
  6. SocketChannel sc = ssc.accept();
  7. // 每个请求都创建一个线程
  8. new Thread(()->{
  9. try {
  10. // 读Socket
  11. ByteBuffer rb = ByteBuffer.allocateDirect(1024);
  12. sc.read(rb);
  13. //模拟处理请求
  14. Thread.sleep(2000);
  15. // 写Socket
  16. ByteBuffer wb = (ByteBuffer)rb.flip();
  17. sc.write(wb);
  18. // 关闭Socket
  19. sc.close();
  20. }catch(Exception e){
  21. throw new UncheckedIOException(e);
  22. }
  23. }).start();
  24. }
  25. } finally {
  26. ssc.close();
  27. }

缺陷:比较耗时,内存占用大,为每个请求创建一个线程不适合高并发

Worker Thread模式

注意死锁问题,提交任务之前不要有依赖性

避免重复创建线程

  1. ExecutorService es = Executors.newFixedThreadPool(200);
  2. final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080));
  3. //处理请求
  4. try {
  5. while (true) {
  6. // 接收请求
  7. SocketChannel sc = ssc.accept();
  8. // 将请求处理任务提交给线程池
  9. es.execute(()->{
  10. try {
  11. // 读Socket
  12. ByteBuffer rb = ByteBuffer.allocateDirect(1024);
  13. sc.read(rb);
  14. //模拟处理请求
  15. Thread.sleep(2000);
  16. // 写Socket
  17. ByteBuffer wb =
  18. (ByteBuffer)rb.flip();
  19. sc.write(wb);
  20. // 关闭Socket
  21. sc.close();
  22. }catch(Exception e){
  23. throw new UncheckedIOException(e);
  24. }
  25. });
  26. }
  27. } finally {
  28. ssc.close();
  29. es.shutdown()

场景:线程池

生产者-消费者

可以直接用线程池实现
image.png
解耦生产者,消费者

  1. public class Test {
  2. public static void main(String[] args) {
  3. // 生产者线程池
  4. ExecutorService producerThreads = Executors.newFixedThreadPool(3);
  5. // 消费者线程池
  6. ExecutorService consumerThreads = Executors.newFixedThreadPool(2);
  7. // 任务队列,长度为10
  8. ArrayBlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(10);
  9. // 生产者提交任务
  10. producerThreads.submit(() -> {
  11. try {
  12. taskQueue.put(new Task("任务"));
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. });
  17. // 消费者处理任务
  18. consumerThreads.submit(() -> {
  19. try {
  20. Task task = taskQueue.take();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. static class Task {
  27. // 任务名称
  28. private String taskName;
  29. public Task(String taskName) {
  30. this.taskName = taskName;
  31. }
  32. }
  33. }

优点:
支持异步

消除生成者与消费者直接的速度差异

过饱问题:
生产者速度大于消费速度 ,任务不断挤压