Guarded Suspension模式:等待唤醒机制的规范实现
Guarded Suspension 模式
保护性地暂停
下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject,内部有一个成员变量——受保护的对象,以及两个成员方法——get(Predicate p)和onChanged(T obj)方法。其中,对象 GuardedObject 就是我们前面提到的大堂经理,受保护对象就是餐厅里面的包间;受保护对象的 get() 方法对应的是我们的就餐,就餐的前提条件是包间已经收拾好了,参数 p 就是用来描述这个前提条件的;受保护对象的 onChanged() 方法对应的是服务员把包间收拾好了,通过 onChanged() 方法可以 fire 一个事件,而这个事件往往能改变前提条件 p 的计算结果。下图中,左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。
class GuardedObject<T>{//受保护的对象T obj;final Lock lock =new ReentrantLock();final Condition done =lock.newCondition();final int timeout=1;//获取受保护对象T get(Predicate<T> p) {lock.lock();try {//MESA管程推荐写法while(!p.test(obj)){done.await(timeout,TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保护对象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}}
解决问题示例
//处理浏览器发来的请求Respond handleWebReq(){//创建一消息Message msg1 = newMessage("1","{...}");//发送消息send(msg1);//利用GuardedObject实现等待GuardedObject<Message> go=new GuardObjec<>();Message r = go.get(t->t != null);}void onMessage(Message msg){//如何找到匹配的go?GuardedObject<Message> go=???go.onChanged(msg);}
扩展 Guarded Suspension 模式
class GuardedObject<T>{//受保护的对象T obj;final Lock lock =new ReentrantLock();final Condition done =lock.newCondition();final int timeout=2;//保存所有GuardedObjectfinal static Map<Object, GuardedObject>gos=new ConcurrentHashMap<>();//静态方法创建GuardedObjectstatic <K> GuardedObjectcreate(K key){GuardedObject go=new GuardedObject();gos.put(key, go);return go;}static <K, T> voidfireEvent(K key, T obj){GuardedObject go=gos.remove(key);if (go != null){go.onChanged(obj);}}//获取受保护对象T get(Predicate<T> p) {lock.lock();try {//MESA管程推荐写法while(!p.test(obj)){done.await(timeout,TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}//返回非空的受保护对象return obj;}//事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}}
解决问题
//处理浏览器发来的请求Respond handleWebReq(){int id=序号生成器.get();//创建一消息Message msg1 = newMessage(id,"{...}");//创建GuardedObject实例GuardedObject<Message> go=GuardedObject.create(id);//发送消息send(msg1);//等待MQ消息Message r = go.get(t->t != null);}void onMessage(Message msg){//唤醒等待的线程GuardedObject.fireEvent(msg.id, msg);}
Balking模式:再谈线程安全的单例模式
编辑器提供的自动保存功能。自动保存功能的实现逻辑一般都是隔一定时间自动执行存盘操作,存盘操作的前提是文件做过修改,如果文件没有执行过修改操作,就需要快速放弃存盘操作。
下面的示例代码将自动保存功能代码化了,很显然 AutoSaveEditor 这个类不是线程安全的,因为对共享变量 changed 的读写没有使用同步,那如何保证 AutoSaveEditor 的线程安全性呢?
class AutoSaveEditor{//文件是否被修改过boolean changed=false;//定时任务线程池ScheduledExecutorService ses =Executors.newSingleThreadScheduledExecutor();//定时执行自动保存void startAutoSave(){ses.scheduleWithFixedDelay(()->{autoSave();}, 5, 5, TimeUnit.SECONDS);}//自动存盘操作void autoSave(){if (!changed) {return;}changed = false;//执行存盘操作//省略且实现this.execSave();}//编辑操作void edit(){//省略编辑逻辑......changed = true;}}
Balking 模式的经典实现
boolean changed=false;//自动存盘操作void autoSave(){synchronized(this){if (!changed) {return;}changed = false;}//执行存盘操作//省略且实现this.execSave();}//编辑操作void edit(){//省略编辑逻辑......change();}//改变状态void change(){synchronized(this){changed = true;}}
用 volatile 实现 Balking 模式
使用 volatile 的前提是对原子性没有要求。
示例代码: volatile关键字只能保证可见性,无法保证原子性和互斥性。所以calc方法有可能被重复执行。
class Test{volatile boolean inited = false;int count = 0;void init(){if(inited){return;}inited = true;//计算count的值count = calc();}}
Balking 模式实现单次初始化
class InitTest{boolean inited = false;synchronized void init(){if(inited){return;}//省略doInit的实现doInit();inited=true;}}
单例
class Singleton{private staticSingleton singleton;//构造方法私有化private Singleton(){}//获取实例(单例)public synchronized staticSingleton getInstance(){if(singleton == null){singleton=new Singleton();}return singleton;}}
使用双重检查来优化性能
class Singleton{private static volatileSingleton singleton;//构造方法私有化private Singleton() {}//获取实例(单例)public static SingletongetInstance() {//第一次检查if(singleton==null){synchronize{Singleton.class){//获取锁后二次检查if(singleton==null){singleton=new Singleton();}}}return singleton;}}
总结
Balking 模式和 Guarded Suspension 模式从实现上看似乎没有多大的关系,Balking 模式只需要用互斥锁就能解决,而 Guarded Suspension 模式则要用到管程这种高级的并发原语;但是从应用的角度来看,它们解决的都是“线程安全的 if”语义,不同之处在于,Guarded Suspension 模式会等待 if 条件为真,而 Balking 模式不会等待。
Balking 模式的经典实现是使用互斥锁,你可以使用 Java 语言内置 synchronized,也可以使用 SDK 提供 Lock;如果你对互斥锁的性能不满意,可以尝试采用 volatile 方案,不过使用 volatile 方案需要你更加谨慎。
当然你也可以尝试使用双重检查方案来优化性能,双重检查中的第一次检查,完全是出于对性能的考量:避免执行加锁操作,因为加锁操作很耗时。而加锁之后的二次检查,则是出于对安全性负责。
Thread-Per-Message模式:最简单实用的分工方法
Thread-Per-Message 模式,简言之就是为每个任务分配一个独立的线程
用 Thread 实现 Thread-Per-Message 模式
示例代码:
final ServerSocketChannel ssc =ServerSocketChannel.open().bind(new InetSocketAddress(8080));//处理请求try {while (true) {// 接收请求SocketChannel sc = ssc.accept();// 每个请求都创建一个线程new Thread(()->{try {// 读SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模拟处理请求Thread.sleep(2000);// 写SocketByteBuffer wb =(ByteBuffer)rb.flip();sc.write(wb);// 关闭Socketsc.close();}catch(Exception e){throw new UncheckedIOException(e);}}).start();}} finally {ssc.close();}
如果你熟悉网络编程,相信你一定会提出一个很尖锐的问题:上面这个 echo 服务的实现方案是不具备可行性的。原因在于 Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大。所以,为每个请求创建一个新的线程并不适合高并发场景。
于是,你开始质疑 Thread-Per-Message 模式,而且开始重新思索解决方案,这时候很可能你会想到 Java 提供的线程池。你的这个思路没有问题,但是引入线程池难免会增加复杂度。
Java 语言里,Java 线程是和操作系统线程一一对应的,这种做法本质上是将 Java 线程的调度权完全委托给操作系统,而操作系统在这方面非常成熟,所以这种做法的好处是稳定、可靠,但是也继承了操作系统线程的缺点:创建成本高。
用 Fiber 实现 Thread-Per-Message 模式
OpenJDK 有个 Loom 项目,就是要解决 Java 语言的轻量级线程问题,在这个项目中,轻量级线程被叫做 Fiber。
示例代码:
final ServerSocketChannel ssc =ServerSocketChannel.open().bind(new InetSocketAddress(8080));//处理请求try{while (true) {// 接收请求final SocketChannel sc =serverSocketChannel.accept();Fiber.schedule(()->{try {// 读SocketByteBuffer rb = ByteBuffer.allocateDirect(1024);sc.read(rb);//模拟处理请求LockSupport.parkNanos(2000*1000000);// 写SocketByteBuffer wb =(ByteBuffer)rb.flip()sc.write(wb);// 关闭Socketsc.close();} catch(Exception e){throw new UncheckedIOException(e);}});}//while}finally{ssc.close();}
总结:
并发编程领域的分工问题,指的是如何高效地拆解任务并分配给线程
Worker Thread模式:如何避免重复创建线程?
Worker Thread 模式及其实现
Worker Thread 模式可以类比现实世界里车间的工作模式:车间里的工人,有活儿了,大家一起干,没活儿了就聊聊天等着。你可以参考下面的示意图来理解,Worker Thread 模式中 Worker Thread 对应到现实世界里,其实指的就是车间里的工人。不过这里需要注意的是,车间里的工人数量往往是确定的。这个方案就是 Java 语言提供的线程池。

正确地创建线程池
- 创建有界的队列来接收任务
- 清晰地指明拒绝策略
- 在实际工作中给线程赋予一个业务相关的名字。
示例代码:
ExecutorService es = new ThreadPoolExecutor(50, 500,60L, TimeUnit.SECONDS,//注意要创建有界队列new LinkedBlockingQueue<Runnable>(2000),//建议根据业务需求实现ThreadFactoryr->{return new Thread(r, "echo-"+ r.hashCode());},//建议根据业务需求实现RejectedExecutionHandlernew ThreadPoolExecutor.CallerRunsPolicy());
避免线程死锁
使用线程池过程中,还要注意一种线程死锁的场景。如果提交到相同线程池的任务不是相互独立的,而是有依赖关系的,那么就有可能导致线程死锁。实际工作中,我就亲历过这种线程死锁的场景。具体现象是应用每运行一段时间偶尔就会处于无响应的状态,监控数据看上去一切都正常,但是实际上已经不能正常工作了。
示例代码: 如果你执行下面的这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。
//L1、L2阶段共用的线程池ExecutorService es = Executors.newFixedThreadPool(2);//L1阶段的闭锁CountDownLatch l1=new CountDownLatch(2);for (int i=0; i<2; i++){System.out.println("L1");//执行L1阶段任务es.execute(()->{//L2阶段的闭锁CountDownLatch l2=new CountDownLatch(2);//执行L2阶段子任务for (int j=0; j<2; j++){es.execute(()->{System.out.println("L2");l2.countDown();});}//等待L2阶段任务执行完l2.await();l1.countDown();});}//等着L1阶段任务执行完l1.await();System.out.println("end");
当应用出现类似问题时,首选的诊断方法是查看线程栈。下图是上面示例代码停止响应后的线程栈,你会发现线程池中的两个线程全部都阻塞在 l2.await(); 这行代码上了,也就是说,线程池里所有的线程都在等待 L2 阶段的任务执行完,那 L2 阶段的子任务什么时候能够执行完呢?永远都没那一天了,为什么呢?因为线程池里的线程都阻塞了,没有空闲的线程执行 L2 阶段的任务了。

原因找到了,那如何解决就简单了,最简单粗暴的办法就是将线程池的最大线程数调大,如果能够确定任务的数量不是非常多的话,这个办法也是可行的,否则这个办法就行不通了。其实这种问题通用的解决方案是为不同的任务创建不同的线程池。对于上面的这个应用,L1 阶段的任务和 L2 阶段的任务如果各自都有自己的线程池,就不会出现这种问题了。
最后再次强调一下:提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重。
总结:
Worker Thread 模式和 Thread-Per-Message 模式的区别有哪些呢?
从现实世界的角度看,你委托代办人做事,往往是和代办人直接沟通的;对应到编程领域,其实现也是主线程直接创建了一个子线程,主子线程之间是可以直接通信的。而车间工人的工作方式则是完全围绕任务展开的,一个具体的任务被哪个工人执行,预先是无法知道的;对应到编程领域,则是主线程提交任务到线程池,但主线程并不关心任务被哪个线程执行。
