原文链接:https://java-design-patterns.com/patterns/promise/

Promise 模式:一种异步编程模式,它允许我们可以先开始一个任务的执行,并得到一个用于获取该任务执行结果的凭据对象,而不必等待该任务执行完毕就可以执行其他操作,等到我们需要该任务的执行结果时,再调用凭据对象的相关方法来获取,这样可以避免不必要的等待,增加了系统的并发性。

在Promise模式中,客户端代码调用某个异步方法所得到的返回值仅是一个凭据对象(该对象被称为Promise,意为“承诺”),凭借该对象,客户端代码可以获取异步方法相应的真正任务的执行结果。

Promise 模式

Promise 模式的支持类 PromiseSupport:

  1. public class PromiseSupport<V> implements Future<V> {
  2. // 定义运行状态
  3. protected static final int RUNNING = 1;
  4. protected static final int FAILED = 2;
  5. protected static final int COMPLETED = 3;
  6. // 锁对象
  7. protected final Object lock;
  8. // 当前状态
  9. protected volatile int state = RUNNING;
  10. // 返回值
  11. protected V value;
  12. // 异常返回值
  13. protected Exception exception;
  14. public PromiseSupport() {
  15. this.lock = new Object();
  16. }
  17. /**
  18. * 执行成功,将方法返回值回写
  19. * fulfill:履行
  20. *
  21. * @param value 返回值
  22. */
  23. protected void fulfill(V value) {
  24. this.value = value;
  25. this.state = COMPLETED;
  26. synchronized (lock) {
  27. // 方法执行完成,唤醒其他阻塞线程
  28. // 比如阻塞在get()方法上的线程
  29. lock.notifyAll();
  30. }
  31. }
  32. /**
  33. * 执行失败,异常回写
  34. *
  35. * @param exception 所执行方法抛出的异常
  36. */
  37. protected void fulfillExceptionally(Exception exception) {
  38. this.exception = exception;
  39. this.state = FAILED;
  40. synchronized (lock) {
  41. // 任务执行过程抛出异常,执行结束
  42. // 唤醒阻塞在get()方法上的线程
  43. lock.notifyAll();
  44. }
  45. }
  46. @Override
  47. public boolean cancel(boolean mayInterruptIfRunning) {
  48. return false;
  49. }
  50. @Override
  51. public boolean isCancelled() {
  52. return false;
  53. }
  54. @Override
  55. public boolean isDone() {
  56. return state > RUNNING;
  57. }
  58. @Override
  59. public V get() throws InterruptedException, ExecutionException {
  60. synchronized (lock) {
  61. // 任务未执行完
  62. while (state == RUNNING) {
  63. // 阻塞调用线程
  64. lock.wait();
  65. }
  66. }
  67. // 任务正常结束,将任务返回值返回
  68. if (state == COMPLETED) {
  69. return value;
  70. }
  71. // 任务异常结束,将异常抛出
  72. throw new ExecutionException(exception);
  73. }
  74. @Override
  75. public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  76. synchronized (lock) {
  77. // 任务未执行完
  78. while (state == RUNNING) {
  79. try {
  80. // 定时阻塞调用线程
  81. lock.wait(unit.toMillis(timeout));
  82. } catch (InterruptedException e) {
  83. // 打印异常日志
  84. System.out.println("Interrupted:" + e.getMessage());
  85. // wait()被中断后会清楚线程的中断标识
  86. // 重新设置当前线程的中断标志
  87. Thread.currentThread().interrupt();
  88. }
  89. }
  90. }
  91. // 任务正常结束,将任务返回值返回
  92. if (state == COMPLETED) {
  93. return value;
  94. }
  95. // 任务异常结束,将异常抛出
  96. throw new ExecutionException(exception);
  97. }
  98. }

Promise 支持:

  1. public class Promise<V> extends PromiseSupport<V> {
  2. // 要履行的动作
  3. private Runnable fulfillmentAction;
  4. // 动作执行过程中异常的处理
  5. private Consumer<? super Throwable> exceptionHandler;
  6. public Promise() {
  7. }
  8. @Override
  9. protected void fulfill(V value) {
  10. super.fulfill(value);
  11. // 拦截器,进行后续处理
  12. postFulfillment();
  13. }
  14. @Override
  15. protected void fulfillExceptionally(Exception exception) {
  16. super.fulfillExceptionally(exception);
  17. // 针对异常的处理
  18. handlerException();
  19. // 执行后续处理
  20. postFulfillment();
  21. }
  22. /**
  23. * 处理任务执行过程中产生的异常
  24. */
  25. private void handlerException() {
  26. if (exception == null) {
  27. return;
  28. }
  29. exceptionHandler.accept(exception);
  30. }
  31. /**
  32. * 任务执行完毕后需要执行的动作
  33. */
  34. private void postFulfillment() {
  35. if (null == fulfillmentAction) {
  36. return;
  37. }
  38. fulfillmentAction.run();
  39. }
  40. /**
  41. * 异步任务执行
  42. *
  43. * @param task 待执行的任务
  44. * @param executor 执行器
  45. * @return
  46. */
  47. public Promise<V> fulfillInAsync(final Callable<V> task, Executor executor) {
  48. executor.execute(() -> {
  49. try {
  50. // 执行任务并将返回值回写
  51. fulfill(task.call());
  52. } catch (Exception e) {
  53. // 执行任务产生异常,将异常回写
  54. fulfillExceptionally(e);
  55. }
  56. });
  57. return this;
  58. }
  59. /**
  60. * 任务执行完后对返回值进行处理
  61. *
  62. * @param action
  63. * @return
  64. */
  65. public Promise<Void> thenAccept(Consumer<? super V> action) {
  66. Promise<Void> dest = new Promise<>();
  67. fulfillmentAction = new ConsumerAction(this, dest, action);
  68. return dest;
  69. }
  70. /**
  71. * 任务执行过程中异常的处理
  72. *
  73. * @param exceptionHandler
  74. * @return
  75. */
  76. public Promise<V> onError(Consumer<? super Throwable> exceptionHandler) {
  77. this.exceptionHandler = exceptionHandler;
  78. return this;
  79. }
  80. /**
  81. * 将上一个Promise的处理结果传递给下一个Promise
  82. *
  83. * @param function
  84. * @param <T>
  85. * @return
  86. */
  87. public <T> Promise<T> thenApply(Function<? super V, T> function) {
  88. Promise<T> dest = new Promise<>();
  89. fulfillmentAction = new TransferAction<T>(this, dest, function);
  90. return dest;
  91. }
  92. private class ConsumerAction implements Runnable {
  93. private final Promise<V> src;
  94. private final Promise<Void> dest;
  95. private final Consumer<? super V> action;
  96. public ConsumerAction(Promise<V> src, Promise<Void> dest, Consumer<? super V> action) {
  97. this.src = src;
  98. this.dest = dest;
  99. this.action = action;
  100. }
  101. @Override
  102. public void run() {
  103. try {
  104. // 异步获取返回值
  105. action.accept(src.get());
  106. // 将空值回写
  107. dest.fulfill(null);
  108. } catch (Throwable t) {
  109. // 异常
  110. dest.fulfillExceptionally((Exception) t.getCause());
  111. }
  112. }
  113. }
  114. private class TransferAction<T> implements Runnable {
  115. private final Promise<V> src;
  116. private final Promise<T> dest;
  117. private final Function<? super V, T> func;
  118. public TransferAction(Promise<V> src, Promise<T> dest, Function<? super V, T> func) {
  119. this.src = src;
  120. this.dest = dest;
  121. this.func = func;
  122. }
  123. @Override
  124. public void run() {
  125. try {
  126. dest.fulfill(func.apply(src.get()));
  127. } catch (Throwable t) {
  128. dest.fulfillExceptionally((Exception) t.getCause());
  129. }
  130. }
  131. }
  132. }

应用

下载一个文件并计算其行数,计算的行数将被消耗并打印到控制台上:
下载工具类:Utility

  1. public class Utility {
  2. // 下载文件并返回文件路径
  3. public static String downloadFile(String urlString) throws IOException {
  4. System.out.println("正在下载url的文件" + urlString);
  5. URL url = new URL(urlString);
  6. File file = File.createTempFile("promise_pattern", null);
  7. try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(url.openStream()));
  8. FileWriter writer = new FileWriter(file);) {
  9. String line;
  10. while ((line = bufferedReader.readLine()) != null) {
  11. writer.write(line);
  12. writer.write("\n");
  13. }
  14. System.out.println("下载文件存储位置:" + file.getAbsolutePath());
  15. return file.getAbsolutePath();
  16. }
  17. }
  18. // 返回文件的行数
  19. public static Integer countLines(String fileLocation) {
  20. try(BufferedReader reader = new BufferedReader(new FileReader(fileLocation))) {
  21. return Math.toIntExact(reader.lines().count());
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. return 0;
  26. }
  27. // 计算每个字符出现的频率
  28. // frequency:频率
  29. public static Map<Character, Long> characterFrequency(String fileLocation) {
  30. try (final BufferedReader reader = new BufferedReader(new FileReader(fileLocation))){
  31. return reader.lines()
  32. .flatMapToInt(String::chars)
  33. .mapToObj(x->(char)x)
  34. .collect(Collectors.groupingBy(Function.identity(),Collectors.counting()));
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. }
  38. return Collections.emptyMap();
  39. }
  40. public static Optional<Character> lowestFrequencyChar(Map<Character, Long> characterFrequency) {
  41. return characterFrequency
  42. .entrySet()
  43. .stream()
  44. .min(Comparator.comparingLong(Map.Entry::getValue))
  45. .map(Map.Entry::getKey);
  46. }
  47. }

下载主类:App

  1. public class App {
  2. private static final String DEFAULT_URL =
  3. "https://github.com/ZhSMM/java-design-patterns/blob/master/abstract-document/pom.xml";
  4. private final ExecutorService executor;
  5. private final CountDownLatch stopLatch;
  6. public App() {
  7. this.executor = Executors.newFixedThreadPool(2);
  8. this.stopLatch = new CountDownLatch(2);
  9. }
  10. public static void main(String[] args) throws InterruptedException {
  11. App app = new App();
  12. try {
  13. app.promiseUsage();
  14. } finally {
  15. app.stop();
  16. }
  17. }
  18. private void promiseUsage() {
  19. calculateLineCount();
  20. calculateLowestFrequency();
  21. }
  22. private void calculateLowestFrequency() {
  23. lowestFrequencyChar().thenAccept(character -> {
  24. System.out.println("出现最少的字符:" + character);
  25. taskCompleted();
  26. });
  27. }
  28. private void calculateLineCount() {
  29. countLines().thenAccept(lines -> {
  30. System.out.println("文件行数:" + lines);
  31. taskCompleted();
  32. });
  33. }
  34. private Promise<Optional<Character>> lowestFrequencyChar() {
  35. return characterFrequency().thenApply(Utility::lowestFrequencyChar);
  36. }
  37. private Promise<Map<Character, Long>> characterFrequency() {
  38. return download(DEFAULT_URL).thenApply(Utility::characterFrequency);
  39. }
  40. private Promise<Integer> countLines() {
  41. return download(DEFAULT_URL).thenApply(Utility::countLines);
  42. }
  43. // 异步下载
  44. private Promise<String> download(String urlString) {
  45. return new Promise<String>()
  46. .fulfillInAsync(() -> Utility.downloadFile(urlString), executor)
  47. .onError(throwable -> {
  48. throwable.printStackTrace();
  49. taskCompleted();
  50. });
  51. }
  52. private void taskCompleted() {
  53. // 相当于计数器建一
  54. stopLatch.countDown();
  55. }
  56. // 关闭
  57. private void stop() throws InterruptedException {
  58. stopLatch.await();
  59. executor.shutdownNow();
  60. }
  61. }