1.场景描述

虽然Thread为我们提供了可获取状态, 以及判断是否alive的方法, 但是这些方法均是针对线程本身的, 而我们提交的任务Runnable在运行过程中所处的状态如何是无法直接获得的, 比如它什么时候开始, 什么时候结束, 最不好的一种体验是无法获得Runnable任务执行后的结果。一般情况下想要获得最终结果, 我们不得不为Thread或者Runnable传入共享变量,但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致性的安全隐患。

2.当观察者模式遇到Thread

当某个对象发生状态改变需要通知第三方的时候,观察者模式就特别适合胜任这样的工作。观察者模式需要有事件源, 也就是引发状态改变的源头, 很明显Thread负责执行任务的逻辑单元,它最清楚整个过程的始末周期,而事件的接收者则是通知接受者一方,严格意义上的观察者模式是需要Observer的集合的, 我们在这里不需要完全遵守这样的规则,只需将执行任务的每一个阶段都通知给观察者即可。

2.1接口定义

1.Observable接口定义

Observable接口定义的代码如所示

  1. public interface Observable {
  2. // 任务生命周期的枚举类型
  3. enum Cycle{
  4. STARTED, RUNNING, DONE, ERROR
  5. }
  6. // 获取当前任务的生命周期状态
  7. Cycle getCycle();
  8. // 定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法
  9. void start();
  10. // 定义线程的打断方法,作用与start方法一样,也是为了屏蔽Thread的其他方法
  11. void interrupt();
  12. }

该接口主要是暴露给调用者使用的,其中四个枚举类型分别代表了当前任务执行生命周期的各个阶段,具体如下

  • getCycle() 方法用于获取当前任务处于哪个执行阶段。
  • start() 方法的目的主要是为了屏蔽Thread类其他的API, 可通过Observable的start对线程进行启动。
  • interrupt(方法的作用与start一样, 可通过Observable的interrupt对当前线程进行中断。

2.TaskLifecycle接口定义

  1. public interface TaskLifecycle<T> {
  2. // 任务启动时会触发onStart方法
  3. void onStart(Thread thread);
  4. // 任务正在运行时会触发onRunning方法
  5. void onRunning(Thread thread);
  6. // 任务运行结束时会触发onFinish方法,其中result是任务执行结束后的结果
  7. void onFinish(Thread thread, T result);
  8. // 任务执行报错时会触发onError方法
  9. void onError(Thread thread, Exception e);
  10. //生命周期接口的空实现(Adapter)
  11. class EmptyLifecycle<T> implements TaskLifecycle<T> {
  12. @Override
  13. public void onStart(Thread thread) {
  14. }
  15. @Override
  16. public void onRunning(Thread thread) {
  17. }
  18. @Override
  19. public void onFinish(Thread thread, T result) {
  20. }
  21. @Override
  22. public void onError(Thread thread, Exception e) {
  23. }
  24. }
  25. }

3.Task函数接口定义

  1. @FunctionalInterface
  2. public interface Task<T> {
  3. // 任务执行接口,该接口允许有返回值
  4. T call();
  5. }

2.2 ObservableThread实现

image.png

  1. public class ObservableThread<T> extends Thread implements Observable {
  2. private final TaskLifecycle<T> lifecycle;
  3. private final Task<T> task;
  4. private Cycle cycle;
  5. // 指定Task的实现,默认情况下使用EmptyLifecycle
  6. public ObservableThread(Task<T> task) throws IllegalAccessException {
  7. this(new TaskLifecycle.EmptyLifecycle<>(), task);
  8. }
  9. // 指定TaskLifecycle的同时指定Task
  10. public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) throws IllegalAccessException {
  11. super();
  12. // Task不允许为null
  13. if ( task == null )
  14. throw new IllegalAccessException("The task is required.");
  15. this.lifecycle = lifecycle;
  16. this.task = task;
  17. }
  18. @Override
  19. public void run() {
  20. // 在执行线程逻辑单元的时候,分别触发相应的事件
  21. this.update(Cycle.STARTED, null,null);
  22. try {
  23. this.update(Cycle.RUNNING, null, null);
  24. T result = this.task.call();
  25. this.update(Cycle.DONE, result, null);
  26. }catch (Exception e) {
  27. this.update(Cycle.ERROR, null, e);
  28. }
  29. }
  30. private void update(Cycle cycle, T result, Exception e) {
  31. this.cycle = cycle;
  32. if ( lifecycle == null )
  33. return;
  34. try {
  35. switch (cycle) {
  36. case STARTED:
  37. this.lifecycle.onError(currentThread());
  38. break;
  39. case RUNNING:
  40. this.lifecycle.onRunning(currentThread());
  41. break;
  42. case DONE:
  43. this.lifecycle.onFinish(currentThread(), result);
  44. break;
  45. case ERROR:
  46. this.lifecycle.onError(currentThread(), e);
  47. break;
  48. }
  49. }catch (Exception ex) {
  50. if ( cycle == Cycle.ERROR ) {
  51. throw ex;
  52. }
  53. }
  54. }
  55. @Override
  56. public Cycle getCycle() {
  57. return this.cycle;
  58. }
  59. }

重写父类的run方法, 并且将其修饰为final类型, 不允许子类再次对其进行重写, run方法在线程的运行期间,可监控任务在执行过程中的各个生命周期阶段,任务每经过一个阶段相当于发生了一次事件。

update方法用于通知时间的监听者, 此时任务在执行过程中发生了什么, 最主要的通知是异常的处理。如果监听者也就是Task Lifecycle, 在响应某个事件的过程中出现了意外,则会导致任务的正常执行受到影响,因此需要进行异常捕获,并忽略这些异常信息以保证Task Lifecycle的实现不影响任务的正确执行, 但是如果任务执行过程中出现错误并且抛出了异常, 那么update方法就不能忽略该异常, 需要继续抛出异常, 保持与call方法同样的意图。

2.3 测试用例代码实现

测试代码01:

  1. public class TaskClient {
  2. public static void main(String[] args) throws IllegalAccessException {
  3. Observable observable = new ObservableThread<>(
  4. () -> {
  5. try {
  6. TimeUnit.SECONDS.sleep(10);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println("finished done.");
  11. return null;
  12. }
  13. );
  14. observable.start();
  15. }
  16. }

测试代码02:

  1. import java.util.concurrent.TimeUnit;
  2. public class TaskClient1 {
  3. public static void main(String[] args) throws IllegalAccessException {
  4. final TaskLifecycle<String> lifecycle = new TaskLifecycle.EmptyLifecycle<String>(){
  5. @Override
  6. public void onFinish(Thread thread, String result) {
  7. System.out.println("The result is " + result);
  8. }
  9. };
  10. Observable observable = new ObservableThread<>(lifecycle, () ->
  11. {
  12. try {
  13. TimeUnit.SECONDS.sleep(10);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println(" finished done.");
  18. return "Hello Observer";
  19. });
  20. }
  21. }