Guarded Suspension模式:等待唤醒机制的规范实现

Guarded Suspension 模式

保护性地暂停
下图就是 Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject,内部有一个成员变量——受保护的对象,以及两个成员方法——get(Predicate p)和onChanged(T obj)方法。其中,对象 GuardedObject 就是我们前面提到的大堂经理,受保护对象就是餐厅里面的包间;受保护对象的 get() 方法对应的是我们的就餐,就餐的前提条件是包间已经收拾好了,参数 p 就是用来描述这个前提条件的;受保护对象的 onChanged() 方法对应的是服务员把包间收拾好了,通过 onChanged() 方法可以 fire 一个事件,而这个事件往往能改变前提条件 p 的计算结果。下图中,左侧的绿色线程就是需要就餐的顾客,而右侧的蓝色线程就是收拾包间的服务员。
并发设计模式(二) - 图1

  1. class GuardedObject<T>{
  2. //受保护的对象
  3. T obj;
  4. final Lock lock =
  5. new ReentrantLock();
  6. final Condition done =
  7. lock.newCondition();
  8. final int timeout=1;
  9. //获取受保护对象
  10. T get(Predicate<T> p) {
  11. lock.lock();
  12. try {
  13. //MESA管程推荐写法
  14. while(!p.test(obj)){
  15. done.await(timeout,
  16. TimeUnit.SECONDS);
  17. }
  18. }catch(InterruptedException e){
  19. throw new RuntimeException(e);
  20. }finally{
  21. lock.unlock();
  22. }
  23. //返回非空的受保护对象
  24. return obj;
  25. }
  26. //事件通知方法
  27. void onChanged(T obj) {
  28. lock.lock();
  29. try {
  30. this.obj = obj;
  31. done.signalAll();
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
  36. }

解决问题示例

  1. //处理浏览器发来的请求
  2. Respond handleWebReq(){
  3. //创建一消息
  4. Message msg1 = new
  5. Message("1","{...}");
  6. //发送消息
  7. send(msg1);
  8. //利用GuardedObject实现等待
  9. GuardedObject<Message> go
  10. =new GuardObjec<>();
  11. Message r = go.get(
  12. t->t != null);
  13. }
  14. void onMessage(Message msg){
  15. //如何找到匹配的go?
  16. GuardedObject<Message> go=???
  17. go.onChanged(msg);
  18. }

扩展 Guarded Suspension 模式

  1. class GuardedObject<T>{
  2. //受保护的对象
  3. T obj;
  4. final Lock lock =
  5. new ReentrantLock();
  6. final Condition done =
  7. lock.newCondition();
  8. final int timeout=2;
  9. //保存所有GuardedObject
  10. final static Map<Object, GuardedObject>
  11. gos=new ConcurrentHashMap<>();
  12. //静态方法创建GuardedObject
  13. static <K> GuardedObject
  14. create(K key){
  15. GuardedObject go=new GuardedObject();
  16. gos.put(key, go);
  17. return go;
  18. }
  19. static <K, T> void
  20. fireEvent(K key, T obj){
  21. GuardedObject go=gos.remove(key);
  22. if (go != null){
  23. go.onChanged(obj);
  24. }
  25. }
  26. //获取受保护对象
  27. T get(Predicate<T> p) {
  28. lock.lock();
  29. try {
  30. //MESA管程推荐写法
  31. while(!p.test(obj)){
  32. done.await(timeout,
  33. TimeUnit.SECONDS);
  34. }
  35. }catch(InterruptedException e){
  36. throw new RuntimeException(e);
  37. }finally{
  38. lock.unlock();
  39. }
  40. //返回非空的受保护对象
  41. return obj;
  42. }
  43. //事件通知方法
  44. void onChanged(T obj) {
  45. lock.lock();
  46. try {
  47. this.obj = obj;
  48. done.signalAll();
  49. } finally {
  50. lock.unlock();
  51. }
  52. }
  53. }

解决问题

  1. //处理浏览器发来的请求
  2. Respond handleWebReq(){
  3. int id=序号生成器.get();
  4. //创建一消息
  5. Message msg1 = new
  6. Message(id,"{...}");
  7. //创建GuardedObject实例
  8. GuardedObject<Message> go=
  9. GuardedObject.create(id);
  10. //发送消息
  11. send(msg1);
  12. //等待MQ消息
  13. Message r = go.get(
  14. t->t != null);
  15. }
  16. void onMessage(Message msg){
  17. //唤醒等待的线程
  18. GuardedObject.fireEvent(
  19. msg.id, msg);
  20. }

Balking模式:再谈线程安全的单例模式

编辑器提供的自动保存功能。自动保存功能的实现逻辑一般都是隔一定时间自动执行存盘操作,存盘操作的前提是文件做过修改,如果文件没有执行过修改操作,就需要快速放弃存盘操作。

下面的示例代码将自动保存功能代码化了,很显然 AutoSaveEditor 这个类不是线程安全的,因为对共享变量 changed 的读写没有使用同步,那如何保证 AutoSaveEditor 的线程安全性呢?

  1. class AutoSaveEditor{
  2. //文件是否被修改过
  3. boolean changed=false;
  4. //定时任务线程池
  5. ScheduledExecutorService ses =
  6. Executors.newSingleThreadScheduledExecutor();
  7. //定时执行自动保存
  8. void startAutoSave(){
  9. ses.scheduleWithFixedDelay(()->{
  10. autoSave();
  11. }, 5, 5, TimeUnit.SECONDS);
  12. }
  13. //自动存盘操作
  14. void autoSave(){
  15. if (!changed) {
  16. return;
  17. }
  18. changed = false;
  19. //执行存盘操作
  20. //省略且实现
  21. this.execSave();
  22. }
  23. //编辑操作
  24. void edit(){
  25. //省略编辑逻辑
  26. ......
  27. changed = true;
  28. }
  29. }

Balking 模式的经典实现

  1. boolean changed=false;
  2. //自动存盘操作
  3. void autoSave(){
  4. synchronized(this){
  5. if (!changed) {
  6. return;
  7. }
  8. changed = false;
  9. }
  10. //执行存盘操作
  11. //省略且实现
  12. this.execSave();
  13. }
  14. //编辑操作
  15. void edit(){
  16. //省略编辑逻辑
  17. ......
  18. change();
  19. }
  20. //改变状态
  21. void change(){
  22. synchronized(this){
  23. changed = true;
  24. }
  25. }

用 volatile 实现 Balking 模式

使用 volatile 的前提是对原子性没有要求。
示例代码: volatile关键字只能保证可见性,无法保证原子性和互斥性。所以calc方法有可能被重复执行。

  1. class Test{
  2. volatile boolean inited = false;
  3. int count = 0;
  4. void init(){
  5. if(inited){
  6. return;
  7. }
  8. inited = true;
  9. //计算count的值
  10. count = calc();
  11. }
  12. }

Balking 模式实现单次初始化

  1. class InitTest{
  2. boolean inited = false;
  3. synchronized void init(){
  4. if(inited){
  5. return;
  6. }
  7. //省略doInit的实现
  8. doInit();
  9. inited=true;
  10. }
  11. }

单例

  1. class Singleton{
  2. private static
  3. Singleton singleton;
  4. //构造方法私有化
  5. private Singleton(){}
  6. //获取实例(单例)
  7. public synchronized static
  8. Singleton getInstance(){
  9. if(singleton == null){
  10. singleton=new Singleton();
  11. }
  12. return singleton;
  13. }
  14. }

使用双重检查来优化性能

  1. class Singleton{
  2. private static volatile
  3. Singleton singleton;
  4. //构造方法私有化
  5. private Singleton() {}
  6. //获取实例(单例)
  7. public static Singleton
  8. getInstance() {
  9. //第一次检查
  10. if(singleton==null){
  11. synchronize{Singleton.class){
  12. //获取锁后二次检查
  13. if(singleton==null){
  14. singleton=new Singleton();
  15. }
  16. }
  17. }
  18. return singleton;
  19. }
  20. }

总结
Balking 模式和 Guarded Suspension 模式从实现上看似乎没有多大的关系,Balking 模式只需要用互斥锁就能解决,而 Guarded Suspension 模式则要用到管程这种高级的并发原语;但是从应用的角度来看,它们解决的都是“线程安全的 if”语义,不同之处在于,Guarded Suspension 模式会等待 if 条件为真,而 Balking 模式不会等待。

Balking 模式的经典实现是使用互斥锁,你可以使用 Java 语言内置 synchronized,也可以使用 SDK 提供 Lock;如果你对互斥锁的性能不满意,可以尝试采用 volatile 方案,不过使用 volatile 方案需要你更加谨慎。

当然你也可以尝试使用双重检查方案来优化性能,双重检查中的第一次检查,完全是出于对性能的考量:避免执行加锁操作,因为加锁操作很耗时。而加锁之后的二次检查,则是出于对安全性负责。


Thread-Per-Message模式:最简单实用的分工方法

Thread-Per-Message 模式,简言之就是为每个任务分配一个独立的线程

用 Thread 实现 Thread-Per-Message 模式

示例代码:

  1. final ServerSocketChannel ssc =
  2. ServerSocketChannel.open().bind(
  3. new InetSocketAddress(8080));
  4. //处理请求
  5. try {
  6. while (true) {
  7. // 接收请求
  8. SocketChannel sc = ssc.accept();
  9. // 每个请求都创建一个线程
  10. new Thread(()->{
  11. try {
  12. // 读Socket
  13. ByteBuffer rb = ByteBuffer
  14. .allocateDirect(1024);
  15. sc.read(rb);
  16. //模拟处理请求
  17. Thread.sleep(2000);
  18. // 写Socket
  19. ByteBuffer wb =
  20. (ByteBuffer)rb.flip();
  21. sc.write(wb);
  22. // 关闭Socket
  23. sc.close();
  24. }catch(Exception e){
  25. throw new UncheckedIOException(e);
  26. }
  27. }).start();
  28. }
  29. } finally {
  30. ssc.close();
  31. }

如果你熟悉网络编程,相信你一定会提出一个很尖锐的问题:上面这个 echo 服务的实现方案是不具备可行性的。原因在于 Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大。所以,为每个请求创建一个新的线程并不适合高并发场景。

于是,你开始质疑 Thread-Per-Message 模式,而且开始重新思索解决方案,这时候很可能你会想到 Java 提供的线程池。你的这个思路没有问题,但是引入线程池难免会增加复杂度。

Java 语言里,Java 线程是和操作系统线程一一对应的,这种做法本质上是将 Java 线程的调度权完全委托给操作系统,而操作系统在这方面非常成熟,所以这种做法的好处是稳定、可靠,但是也继承了操作系统线程的缺点:创建成本高。

用 Fiber 实现 Thread-Per-Message 模式

OpenJDK 有个 Loom 项目,就是要解决 Java 语言的轻量级线程问题,在这个项目中,轻量级线程被叫做 Fiber。

示例代码:

  1. final ServerSocketChannel ssc =
  2. ServerSocketChannel.open().bind(
  3. new InetSocketAddress(8080));
  4. //处理请求
  5. try{
  6. while (true) {
  7. // 接收请求
  8. final SocketChannel sc =
  9. serverSocketChannel.accept();
  10. Fiber.schedule(()->{
  11. try {
  12. // 读Socket
  13. ByteBuffer rb = ByteBuffer
  14. .allocateDirect(1024);
  15. sc.read(rb);
  16. //模拟处理请求
  17. LockSupport.parkNanos(2000*1000000);
  18. // 写Socket
  19. ByteBuffer wb =
  20. (ByteBuffer)rb.flip()
  21. sc.write(wb);
  22. // 关闭Socket
  23. sc.close();
  24. } catch(Exception e){
  25. throw new UncheckedIOException(e);
  26. }
  27. });
  28. }//while
  29. }finally{
  30. ssc.close();
  31. }

总结:
并发编程领域的分工问题,指的是如何高效地拆解任务并分配给线程


Worker Thread模式:如何避免重复创建线程?

Worker Thread 模式及其实现

Worker Thread 模式可以类比现实世界里车间的工作模式:车间里的工人,有活儿了,大家一起干,没活儿了就聊聊天等着。你可以参考下面的示意图来理解,Worker Thread 模式中 Worker Thread 对应到现实世界里,其实指的就是车间里的工人。不过这里需要注意的是,车间里的工人数量往往是确定的。这个方案就是 Java 语言提供的线程池。

并发设计模式(二) - 图2

正确地创建线程池

  • 创建有界的队列来接收任务
  • 清晰地指明拒绝策略
  • 在实际工作中给线程赋予一个业务相关的名字。

示例代码:

  1. ExecutorService es = new ThreadPoolExecutor(
  2. 50, 500,
  3. 60L, TimeUnit.SECONDS,
  4. //注意要创建有界队列
  5. new LinkedBlockingQueue<Runnable>(2000),
  6. //建议根据业务需求实现ThreadFactory
  7. r->{
  8. return new Thread(r, "echo-"+ r.hashCode());
  9. },
  10. //建议根据业务需求实现RejectedExecutionHandler
  11. new ThreadPoolExecutor.CallerRunsPolicy());

避免线程死锁

使用线程池过程中,还要注意一种线程死锁的场景。如果提交到相同线程池的任务不是相互独立的,而是有依赖关系的,那么就有可能导致线程死锁。实际工作中,我就亲历过这种线程死锁的场景。具体现象是应用每运行一段时间偶尔就会处于无响应的状态,监控数据看上去一切都正常,但是实际上已经不能正常工作了。
并发设计模式(二) - 图3
示例代码: 如果你执行下面的这段代码,会发现它永远执行不到最后一行。执行过程中没有任何异常,但是应用已经停止响应了。

  1. //L1、L2阶段共用的线程池
  2. ExecutorService es = Executors.
  3. newFixedThreadPool(2);
  4. //L1阶段的闭锁
  5. CountDownLatch l1=new CountDownLatch(2);
  6. for (int i=0; i<2; i++){
  7. System.out.println("L1");
  8. //执行L1阶段任务
  9. es.execute(()->{
  10. //L2阶段的闭锁
  11. CountDownLatch l2=new CountDownLatch(2);
  12. //执行L2阶段子任务
  13. for (int j=0; j<2; j++){
  14. es.execute(()->{
  15. System.out.println("L2");
  16. l2.countDown();
  17. });
  18. }
  19. //等待L2阶段任务执行完
  20. l2.await();
  21. l1.countDown();
  22. });
  23. }
  24. //等着L1阶段任务执行完
  25. l1.await();
  26. System.out.println("end");

当应用出现类似问题时,首选的诊断方法是查看线程栈。下图是上面示例代码停止响应后的线程栈,你会发现线程池中的两个线程全部都阻塞在 l2.await(); 这行代码上了,也就是说,线程池里所有的线程都在等待 L2 阶段的任务执行完,那 L2 阶段的子任务什么时候能够执行完呢?永远都没那一天了,为什么呢?因为线程池里的线程都阻塞了,没有空闲的线程执行 L2 阶段的任务了。

并发设计模式(二) - 图4

原因找到了,那如何解决就简单了,最简单粗暴的办法就是将线程池的最大线程数调大,如果能够确定任务的数量不是非常多的话,这个办法也是可行的,否则这个办法就行不通了。其实这种问题通用的解决方案是为不同的任务创建不同的线程池。对于上面的这个应用,L1 阶段的任务和 L2 阶段的任务如果各自都有自己的线程池,就不会出现这种问题了。

最后再次强调一下:提交到相同线程池中的任务一定是相互独立的,否则就一定要慎重。

总结:
Worker Thread 模式和 Thread-Per-Message 模式的区别有哪些呢?
从现实世界的角度看,你委托代办人做事,往往是和代办人直接沟通的;对应到编程领域,其实现也是主线程直接创建了一个子线程,主子线程之间是可以直接通信的。而车间工人的工作方式则是完全围绕任务展开的,一个具体的任务被哪个工人执行,预先是无法知道的;对应到编程领域,则是主线程提交任务到线程池,但主线程并不关心任务被哪个线程执行。