一:终止线程的设计模式
问题:一个线程如何安全的终止另一个线程
stop()的问题:如果线程锁住共享资源,被杀死后没有机会释放锁,其他线程也无法获取锁
System.exit()的问题: 会让整个程序都停止
Two-phase Termination模式,两段式
第一阶段:线程T1向T2发送终止指令
第二阶段:线程T2响应终止指令
Java中,终止任务的前提:线程进入Runnable状态,—-调用 interrupt(), 休眠线程——-> Runnable
优雅的讲Runnable —>终止:让线程执行完run 方法,
一般方法:设置标志位,检查标志位,判断是否返回run方法
注意点:
一:仅检查终止标志位不够,还要判断线程是否处于休眠态
二:正确合理的处理异常, 我们需要自定义线程终止标志位用于终止线程
使用场景:
1:安全的终止线程,需要释放该释放的资源
2:确保终止处理逻辑在线程结束之前一定会执行时,可以使用该方法
二:避免共享的设计模式
Immutability模式 CorpOnWrite模式,Thread-Specific Storage模式
- Immutability 模式属性的不可变性
- 读写模式注意拷贝的性能问题
- ThreadSpecific Storage注意异步执行问题
Immutability 模式
多线程同事读写一个共享比那里存在并发
这种就是把共享变量变成不可变性
对象一旦被创建就不会再发发生变化。
如何实现:
类中定义final ,并只存在读方法,,但是被继承后有可能会被改,类也被定义成fianl
JDK 中很多是不可变的类,如String Long,Integer Double ,这些对象的安全性靠不可变性保证。
不可变的三点要求:类和 属性都是final ,所有方法均只读
注意实现
- 对象的所有属性都是final,并保障不可变性
- 不可变对象也需要政正常发布
要确定不可变性的边界,是否要求属性对象也是具备不可变性
class Foo{
int age=0;
int name="abc";
}
final class Bar {
final Foo foo;
void setAge(int a){
foo.age=a;
}
}
bar是final的,但是还是有set方法
可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。
//Foo线程安全
final class Foo{
final int age=0;
final String name="abc";
}
//Bar线程不安全
class Bar {
Foo foo;
void setFoo(Foo f){
this.foo=f;
}
}
Copy-on-Write模式
String的 replace() 方法,并不会修改原字符串的内容,而是复制一个新的字符串,也是一种读写
String Interge Long 都基于Copy on Write方案实现
确定:消耗内存
如果读多写少可以考虑
应用场景:
CopyOnWriteArrayList
CopyOnWriteArraySet
无锁集合,
Unix的操作系统创建的挨批 fork
Thread-Specific Storage 模式——没有共享就没有伤害
本地线程变量,本地线程的变量副本,
三: 多线程版本的if模式
Guarded Suspension模式
守护线程
就是守护-挂起,一个任务在等待其他任务的返回
多线程版本的if?
应用场景:
多线程环境下多个线程访问相同实例资源,从实例资源获取资源并处理
实例资源需要管理自身拥有的资源,并对线程的请求作出判断
代码实例:
public class GuardedObject<T> {
//结果
private T obj;
//获取结果
public T get(){
synchronized (this){
//没有结果等待 防止虚假唤醒
while (obj==null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return obj;
}
}
//产生结果
public void complete(T obj){
synchronized (this){
//获取到结果,给obj赋值
this.obj = obj;
//唤醒等待结果的线程
this.notifyAll();
}
}
}
Balking模式
如果一个线程做了,那么当前线程就不做了
典型的就是单例模式
同 Guarded Suspension 存在守护条件,如有条件不足就中断
常见的场景
sychronized 轻量级锁膨胀,只有一个线程膨胀获取到Monitor对象
单例模式
服务组件的初始化
如果实现Blanking模式
1:锁机制 sychronized reentrantLock
2:cas
3:共享变量 volatile
编辑器的自动保存功能?
boolean changed=false;
// 自动存盘操作
void autoSave(){
synchronized(this){
if (!changed) {
return;
}
changed = false;
}
// 执行存盘操作
// 省略且实现
this.execSave();
}
// 编辑操作
void edit(){
// 省略编辑逻辑
......
change();
}
// 改变状态
void change(){
synchronized(this){
changed = true;
}
}
d单词初始化
boolean inited = false;
synchronized void init(){
if(inited){
return;
}
//省略doInit的实现
doInit();
inited=true;
}
四:多线程分工
Thread-Per-Message 模式
注意线程的创建,销毁,是否会造成OOM
为每个任务分配一个独立的线程
应用场景:网络编程服务端的实现,
final ServerSocketChannel ssc= ServerSocketChannel.open().bind(new InetSocketAddress(8080));
//处理请求
try {
while (true) {
// 接收请求
SocketChannel sc = ssc.accept();
// 每个请求都创建一个线程
new Thread(()->{
try {
// 读Socket
ByteBuffer rb = ByteBuffer.allocateDirect(1024);
sc.read(rb);
//模拟处理请求
Thread.sleep(2000);
// 写Socket
ByteBuffer wb = (ByteBuffer)rb.flip();
sc.write(wb);
// 关闭Socket
sc.close();
}catch(Exception e){
throw new UncheckedIOException(e);
}
}).start();
}
} finally {
ssc.close();
}
缺陷:比较耗时,内存占用大,为每个请求创建一个线程不适合高并发
Worker Thread模式
注意死锁问题,提交任务之前不要有依赖性
避免重复创建线程
ExecutorService es = Executors.newFixedThreadPool(200);
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080));
//处理请求
try {
while (true) {
// 接收请求
SocketChannel sc = ssc.accept();
// 将请求处理任务提交给线程池
es.execute(()->{
try {
// 读Socket
ByteBuffer rb = ByteBuffer.allocateDirect(1024);
sc.read(rb);
//模拟处理请求
Thread.sleep(2000);
// 写Socket
ByteBuffer wb =
(ByteBuffer)rb.flip();
sc.write(wb);
// 关闭Socket
sc.close();
}catch(Exception e){
throw new UncheckedIOException(e);
}
});
}
} finally {
ssc.close();
es.shutdown()
场景:线程池
生产者-消费者
可以直接用线程池实现
解耦生产者,消费者
public class Test {
public static void main(String[] args) {
// 生产者线程池
ExecutorService producerThreads = Executors.newFixedThreadPool(3);
// 消费者线程池
ExecutorService consumerThreads = Executors.newFixedThreadPool(2);
// 任务队列,长度为10
ArrayBlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(10);
// 生产者提交任务
producerThreads.submit(() -> {
try {
taskQueue.put(new Task("任务"));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者处理任务
consumerThreads.submit(() -> {
try {
Task task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
static class Task {
// 任务名称
private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
}
}
优点:
支持异步
消除生成者与消费者直接的速度差异
过饱问题:
生产者速度大于消费速度 ,任务不断挤压