1. 定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

image.png


2. 实现

  1. class GuardedObject {
  2. private Object response;
  3. private final Object lock = new Object();
  4. public Object get() {
  5. synchronized (lock) {
  6. // 条件不满足则等待
  7. while (response == null) {
  8. try {
  9. lock.wait();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. return response;
  15. }
  16. }
  17. public void complete(Object response) {
  18. synchronized (lock) {
  19. // 条件满足,通知等待线程
  20. this.response = response;
  21. lock.notifyAll();
  22. }
  23. }
  24. }

应用代码
一个线程等待另一个线程的执行结果

public class TestGuardedObject {
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            //子线程执行下载
            try {
                List<String> response = download();
                log.debug("download complete...");
                guardedObject.complete(response);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        //主线程阻塞等待
        log.debug("waiting...");
        Object response = guardedObject.get();
        log.debug("get response: [{}] lines", ((List<String>) response).size());
    }
}

执行结果
image.png


3. 带超时版 GuardedObject

如果要控制超时时间呢

/**
 * 添加超时处理
 */
@Slf4j(topic = "c.GuardedObjectV2")
class GuardedObjectV2 {

    private Object response;
    private final Object lock = new Object();

    public Object get(long millis) {
        synchronized (lock) {
            // 1) 记录最初时间
            long last = System.currentTimeMillis();
            // 2) 已经经历的时间
            long timePassed = 0;
            while (response == null) {
                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - last;
                log.debug("timePassed: {}, object is null {}", timePassed, response == null);
            }
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}

测试,没有超时

public class TestGuardedObjectV2 {
    public static void main(String[] args) {
        GuardedObjectV2 v2 = new GuardedObjectV2();
        new Thread(() -> {
            sleep(1);
            v2.complete(null);
            sleep(1);
            v2.complete(Arrays.asList("a", "b", "c"));
        }).start();

        Object response = v2.get(2500);
        if (response != null) {
            log.debug("get response: [{}] lines", ((List<String>) response).size());
        } else {
            log.debug("can't get response");
        }
    }
}

输出

13:48:27.485 [main]  c.GuardedObjectV2 - waitTime: 2500
13:48:28.493 [Thread-0]  c.GuardedObjectV2 - notify...
13:48:28.493 [main]  c.GuardedObjectV2 - timePassed: 1009, object is null true
13:48:28.493 [main]  c.GuardedObjectV2 - waitTime: 1491
13:48:29.493 [Thread-0]  c.GuardedObjectV2 - notify...
13:48:29.493 [main]  c.GuardedObjectV2 - timePassed: 2009, object is null false
13:48:29.493 [main]  c.TestGuardedObjectV2 - get response: [3] lines

测试, 超时场景

// 等待时间不足
List<String> lines = v2.get(1000);

输出

13:47:31.293 [main]  c.GuardedObjectV2 - waitTime: 1000
13:47:32.292 [Thread-0]  c.GuardedObjectV2 - notify...
13:47:32.292 [main]  c.GuardedObjectV2 - timePassed: 1000, object is null true
13:47:32.292 [main]  c.GuardedObjectV2 - waitTime: 0
13:47:32.292 [main]  c.GuardedObjectV2 - break...
13:47:32.292 [main]  c.TestGuardedObjectV2 - can't get response
13:47:33.293 [Thread-0]  c.GuardedObjectV2 - notify...

4. 多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右 侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,
这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

image.png

新增 id 用来标识 Guarded Object

/**
 * 添加多任务处理
 */
// 增加超时效果
class GuardedObjectV3 {

    // 标识 Guarded Object
    private int id;

    public GuardedObjectV3(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    // 结果
    private Object response;

    // 获取结果
    // timeout 表示要等待多久 2000
    public Object get(long timeout) {
        synchronized (this) {
            // 开始时间 15:00:00
            long begin = System.currentTimeMillis();
            // 经历的时间
            long passedTime = 0;
            while (response == null) {
                // 这一轮循环应该等待的时间
                long waitTime = timeout - passedTime;
                // 经历的时间超过了最大等待时间时,退出循环
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // 虚假唤醒 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 求得经历时间
                passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
            return response;
        }
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}


中间解耦类

class Mailboxes {
    private static Map<Integer, GuardedObjectV3> boxes = new Hashtable<>();

    private static int id = 1;

    // 产生唯一 id
    private static synchronized int generateId() {
        return id++;
    }

    public static GuardedObjectV3 getGuardedObject(int id) {
        return boxes.remove(id);
    }

    public static GuardedObjectV3 createGuardedObject() {
        GuardedObjectV3 go = new GuardedObjectV3(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    public static Set<Integer> getIds() {
        return boxes.keySet();
    }
}

业务代码

@Slf4j(topic = "c.People")
class People extends Thread {
    @Override
    public void run() {
        // 收信
        GuardedObjectV3 guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}

@Slf4j(topic = "c.Postman")
class Postman extends Thread {
    private int id;
    private String mail;

    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
        GuardedObjectV3 guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complete(mail);
    }
}

测试

public static void main(String[] args) {
  for (int i = 0; i < 3; i++) {
    new People().start();
    }
    Sleeper.sleep(1);
    for (Integer id : Mailboxes.getIds()) {
      new Postman(id, "内容" + id).start();
      }
}

某次运行结果

14:05:58.869 [Thread-2]  c.People - 开始收信 id:2
14:05:58.870 [Thread-1]  c.People - 开始收信 id:1
14:05:58.871 [Thread-0]  c.People - 开始收信 id:3
14:05:59.858 [Thread-4]  c.Postman - 送信 id:2, 内容:内容2
14:05:59.858 [Thread-5]  c.Postman - 送信 id:1, 内容:内容1
14:05:59.860 [Thread-1]  c.People - 收到信 id:1, 内容:内容1
14:05:59.860 [Thread-2]  c.People - 收到信 id:2, 内容:内容2
14:05:59.857 [Thread-3]  c.Postman - 送信 id:3, 内容:内容3
14:05:59.861 [Thread-0]  c.People - 收到信 id:3, 内容:内容3