1 接受异步消息的主动对象

Active是“主动”的意思Active Object是“主动对象”的意思,所谓主动对象就是指其拥有自己的独立线程, 比如java.lang.Thread实例就是一个主动对象, 不过Active Object Pattern不仅仅是拥有独立的线程,它还可以接受异步消息,并且能够返回处理的结果。
我们在本书中频繁使用的System.gc() 方法就是一个“接受异步消息的主动对象”,调用gc方法的线程和gc自身的执行线程并不是同一个线程,在本章中,我们将实现一个类似于System.gc的可接受异步消息的主动对象。接受异步消息的主动对象工作原理如图所示。
image.png

2 标准Active Objects 模式设计

在本节中, 我们首先从标准的Active Objects设计入手, 将一个接口的方法调用转换成可接受异步消息的主动对象,也就是说方法的执行和方法的调用是在不同的线程中进行的,那么如何使得执行线程知道应该如何正确执行接口方法呢?我们需要将接口方法的参数以及具体实现封装成特定的Message告知执行线程。如果该接口方法需要返回值, 则必须得设计成Future的返回形式, 图为标准Active Objects模型设计的结构类图。

image.png

通过图27-3我们可以看出, 当某个线程调用Order Service接口的find Order Details方法时, 事实上是发送了一个包含find Order Details方法参数以及Order Service具体实现的Message至Message队列, 执行线程通过从队列中获取Message来调用具体的实现, 接口方法的调用和接口方法的执行分别处于不同的线程中, 因此我们称该接口为Active Objects。
image.png

2.1 OrderService接口设计

Order Service是一个比较简单的接口, 包含两个方法, 其中第一个为有返回值的方法,第二个方法则没有返回值,代码如所示:

  1. public interface OrderService {
  2. /**
  3. * 根据订单编码查询订单明细,有入惨也有返回值,但是返回类型必须时Future
  4. */
  5. Future<String> findOrderDetails(long orderId);
  6. /**
  7. * 提交订单,没有返回值
  8. * @param account
  9. * @param orderId
  10. */
  11. void order(String account, long orderId);
  12. }
  • �find OrderDetails(long orderId) :通过订单编号获取订单详情,有返回值的方法必须是Future类型的,因为方法的执行是在其他线程中进行的,势必不会立即得到正确的最终结果,通过Future可以立即得到返回。
  • Order(String account, long orderId) :提交用户的订单信息,是一种无返回值的方法。

2.2 OrderServiceImpl详解

  1. import java.util.concurrent.TimeUnit;
  2. public class OrderServiceImpl implements OrderService{
  3. @Override
  4. public Future<String> findOrderDetails(long orderId) {
  5. return FutureService.<Long,String>newService().submit(input -> {
  6. try {
  7. TimeUnit.SECONDS.sleep(10);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. return "The order Details Information";
  12. }, orderId, null);
  13. }
  14. @Override
  15. public void order(String account, long orderId) {
  16. try {
  17. TimeUnit.SECONDS.sleep(10);
  18. System.out.println(" process the order for account " + account + ",orderId " + orderId);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }

�OrderServiceImpl类是OrderService的一个具体实现,该类是在执行线程中将要被使用的类,其中findOrderDetails方法通过第19章中我们开发的Future立即返回一个结果,order方法则通过休眠来模拟该方法的执行比较耗时。

2.3 OrderServiceProxy详解

�OrderServiceProxy是OrderService的子类,它的作用是将OrderService的每一个方法都封装成MethodMessage,然后提交给ActiveMessage队列,在使用OrderService接口方法的时候,实际上是在调用OrderServiceProxy中的方法,代码如所示。

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. public class OrderServiceProxy implements OrderService {
  4. private final OrderService orderService;
  5. private final ActiveMessageQueue activeMessageQueue;
  6. public OrderServiceProxy(OrderService orderService, ActiveMessageQueue activeMessageQueue) {
  7. this.orderService = orderService;
  8. this.activeMessageQueue = activeMessageQueue;
  9. }
  10. @Override
  11. public Future<String> findOrderDetails(long orderId) {
  12. // 定义一个ActiveFuture,并且可支持立即返回
  13. final ActiveFuture<String> activeFuture = new ActiveFuture<>();
  14. // 收集方法入惨以及返回的ActiveFuture 封装成MethodMessage
  15. Map<String, Object> params = new HashMap<>();
  16. params.put("orderId", activeFuture);
  17. params.put("activeFuture", activeFuture);
  18. MethodMessage message = new FindOrderDetailsMessage(params, orderService);
  19. // 将methodmessage保存值activeMessageQueue中
  20. activeMessageQueue.offer(message);
  21. return activeFuture;
  22. }
  23. @Override
  24. public void order(String account, long orderId) {
  25. // 收集方法参数, 并且封装成MethodMessage,然后offer至队列中
  26. Map<String, Object> params = new HashMap<>();
  27. params.put("account", account);
  28. params.put("orderId", orderId);
  29. MethodMessage message = new OrderMessage(params, orderService);
  30. activeMessageQueue.offer(message);
  31. }
  32. }

OrderServiceProxy作为OrderService的一个实现,看上去与OrderService没多大关系,其主要作用是将OrderService接口定义的方法封装成MethodMessage,然后offer给Active-MessageQueue。若是无返回值的方法,则只需要提交Message到ActiveMessageQueue中即可, 但若是有返回值的方法,findOrderDetails是比较特殊的,它需要返回一个Active-Future,该Future的作用是可以立即返回,当调用线程获取结果时将进入阻塞状态, 代码所示。

  1. public class ActiveFuture<T> extends FutureTask {
  2. @Override
  3. public void finish(Object result) {
  4. super.finish(result);
  5. }
  6. }

ActiveFuture非常简单,是FutureTask的直接子类,其主要作用是重写finish方法,并且将protected的权限换成public, 可以使得执行线程完成任务之后传递最终结果。

2.4 MethodMessage

MethodMessage的主要作用是收集每一个接口的方法参数,并且提供execute方法供ActiveDaemonThread直接调用,该对象就是典型的WorkerThread模型中的Product(附有使用说明书的半成品,等待流水线工人的加工) , execute方法则是加工该产品的说明书。MethodMessage的代码如所示。

  1. import java.util.Map;
  2. public abstract class MethodMessage {
  3. // 用于收集方法参数,如果有返回Future类型则一并收集
  4. protected final Map<String, Object> param;
  5. protected final OrderService orderService;
  6. protected MethodMessage(Map<String, Object> param, OrderService orderService) {
  7. this.param = param;
  8. this.orderService = orderService;
  9. }
  10. // 抽象方法,扮演work thread的说明书
  11. public abstract void execute();
  12. }

� 其中,params主要用来收集方法参数,orderService是具体的接口实现,每一个方法都会被拆分成不同的Message。在OrderService中,我们定义了两个方法,因此需要实现两个MethodMessage。

(1) FindOrderDetailsMessage

FindOrderDetailsMessage的代码如所示

  1. import java.util.Map;
  2. public class FindOrderDetailsMessage extends MethodMessage {
  3. public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
  4. super(params, orderService);
  5. }
  6. @Override
  7. public void execute() {
  8. Future<String> realFuture = orderService.findOrderDetails((Long)param.get("orderId"));
  9. ActiveFuture<String> activeFuture = (ActiveFuture<String>)param.get("activeFuture");
  10. try {
  11. String result = realFuture.get();
  12. activeFuture.finish(result);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }

在上述代码中:
①执行orderService的findOrderDetails方法。
②调用orderServiceImpl返回的Future.get() ,此方法会导致阻塞直到findOrderDetails方法完全执行结束。
③当findOrderDetails执行结束时,将结果通过finish的方法传递给activeFuture。

(2) OrderMessage
OrderMessage的代码如所示。

  1. import java.util.Map;
  2. public class OrderMessage extends MethodMessage {
  3. public OrderMessage(Map<String, Object> params, OrderService orderService) {
  4. super(params, orderService);
  5. }
  6. @Override
  7. public void execute() {
  8. //获取参数
  9. String account = (String) param.get("account");
  10. long orderId = (long)param.get("orderId");
  11. // 执行真正的order方法
  12. orderService.order(account, orderId);
  13. }
  14. }

OrderMessage主要处理order方法, 从param中获取接口参数, 然后执行真正的OrderService的order方法。

2.5 ActiveMessageQueue

ActiveMessageQueue对应于Worker-Thread模式中的传送带,主要用于传送调用线程通过Proxy提交过来的MethodMessage,但是这个传送带允许存放无限的MethodMessage(没有limit的约束, 理论上可以放无限多个MethodMessage直到发生堆内存溢出的异常) ,代码所示。

  1. import java.util.LinkedList;
  2. public class ActiveMessageQueue {
  3. // 用于存放提交的MethodMessage消息
  4. private final LinkedList<MethodMessage> messages = new LinkedList<>();
  5. public ActiveMessageQueue() {
  6. // 启动Worker线程
  7. new ActiveDaemonThread(this).start();
  8. }
  9. public void offer(MethodMessage methodMessage) {
  10. synchronized (this) {
  11. messages.addLast(methodMessage);
  12. this.notify();
  13. }
  14. }
  15. protected MethodMessage take() {
  16. synchronized (this) {
  17. // 当MethodMessage 队列中没有Message的时候,执行线程进入阻塞
  18. while(messages.isEmpty() ) {
  19. try {
  20. this.wait();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. // 获取其中一个MethodMessage 并且从队列中移除
  26. return messages.removeFirst();
  27. }
  28. }
  29. }

�上述代码中:

  • 在创建Activ MessageQueue的同时启动ActiveDaemonThread线程, ActiveDaemonThread主要用来进行异步的方法执行,后面我们会介绍。
  • 执行offer方法没有进行limit的判断,允许提交无限个MethodMessage(直到发生堆内存溢出) ,并且当有新的Message加入时会通知ActiveDaemonThread线程。
  • take方法主要是被Active Daemon Thread线程使用,当message队列为空时ActiveDaemonThread线程将会被挂起(Guarded Suspension) ,如代码所示。 ```java public class ActiveDaemonThread extends Thread{

    private final ActiveMessageQueue queue;

    public ActiveDaemonThread(ActiveMessageQueue queue) {

    1. super("ActiveDaemonThread");
    2. this.queue = queue;
    3. // ActiveDaemonThread 为守护线程
    4. setDaemon(true);

    }

  1. @Override
  2. public void run() {
  3. for(;;) {
  4. /*
  5. 从MethodMessage队列中获取一个MethodMessage,然后执行execute方法
  6. */
  7. MethodMessage methodMessage = this.queue.take();
  8. methodMessage.execute();
  9. }
  10. }

}

  1. ActiveDaemonThread是一个守护线程,主要是从queue中获取Message然后执行execute方法(注意:保持为线程命名的习惯是一个比较好的编程习惯)
  2. <a name="DkHn8"></a>
  3. ### 2.6 OrderServiceFactory及测试
  4. 我们基本上已经完成了一个标准ActiveObjects的设计,接口方法的每一次调用实际上都是向Queue中提交一个对应的Message信息,当然这个工作主要是由Proxy完成的,但是为了让Proxy的构造透明化,我们需要设计一个Factory工具类,代码如所示。
  5. ```java
  6. public class OrderServiceFactory {
  7. // 将ActiveMessage定义成static的目的是,保持其整个JVM进程中是唯一的,并且ActiveDaemonThread 会在此刻启动
  8. private final static ActiveMessageQueue activeMessageQueue = new ActiveMessageQueue();
  9. private OrderServiceFactory(){}
  10. // 返回OrderServiceProxy
  11. public static OrderService toActiveObject(OrderService orderService) {
  12. return new OrderServiceProxy(orderService, activeMessageQueue);
  13. }
  14. public static void main(String[] args) throws InterruptedException {
  15. // 在创建OrderService时需要传递OrderService接口的具体实现
  16. OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
  17. orderService.order("hello", 1234);
  18. System.out.println("return immediately");
  19. Thread.currentThread().join();
  20. }
  21. }

3.通用Active Objects 框架设计

标准的ActiveObjects要将每一个方法都封装成Message(比如27.2节中定义的FindOrderDetailsMessage, OrderMessage) ,然后提交至Message队列中,这样的做法有点类似于远程方法调用(RPC:RemoteProcessCall) 。如果某个接口的方法很多, 那么需要封装很多的Message类; 同样如果有很多接口需要成为ActiveObject,则需要封装成非常多的Message类,这样显然不是很友好。在本节中,我们将设计一个更加通用的ActiveObject框架, 可以将任意的接口转换成ActiveObject。
在本节中,我们将使用JDK动态代理的方式实现一个更为通用的ActiveObjects,可以将任意接口方法转换为ActiveObjects,当然如果接口方法有返回值,则必须要求返回Future类型才可以,否则将会抛出IllegalActiveMethod异常, 代码如清单27-11所示。

  1. // 若方法不符合则其被转换为Active方法时抛出异常
  2. public class IllegalActiveMethod extends Exception{
  3. public IllegalActiveMethod(String message) {
  4. super(message);
  5. }
  6. }

通用的Active Objects设计消除了为每一个接口方法定义Method Message的过程, 同时也摒弃掉了为每一个接口创建定义Proxy的实现, 所有的操作都会被支持动态代理的工厂类Active Service Factory所替代, 通用Active Objects框架详细类图如图27-4所示
image.png

3.1 Active Message详解

相比较于Method Message, Active Message更加通用, 其可以满足所有Active Objects接口方法的要求, 与Method Message类似, Active Message也是用于收集接口方法信息和具体的调用方法的。Active Message的代码如所示。

  1. import java.lang.reflect.InvocationTargetException;
  2. import java.lang.reflect.Method;
  3. public class ActiveMessage {
  4. // 接口方法的参数
  5. private final Object[] objects;
  6. // 接口方法
  7. private final Method method;
  8. // 有返回值的方法 会返回ActiveFuture<?>类型
  9. private final ActiveFuture<Object> future;
  10. // 具体的Service接口
  11. private final Object service;
  12. // 构造ActiveMessage 是由Builder 来完成的
  13. private ActiveMessage(Builder builder) {
  14. this.objects = builder.objects;
  15. this.method = builder.method;
  16. this.future = builder.future;
  17. this.service = builder.service;
  18. }
  19. // ActiveMessage的方法通过反射的方式调用执行的具体实现
  20. public void execute() {
  21. try {
  22. // 执行接口的方法
  23. Object result = method.invoke(service, objects);
  24. if ( future != null ) {
  25. // 如果是有返回值的接口方法,则需要通过get方法获得最终的结果
  26. Future<?> realFuture = (Future<?>) result;
  27. Object realResult = realFuture.get();
  28. // 将结果交给ActiveFuture,接口方法的线程会得到返回
  29. future.finish(realResult);
  30. }
  31. } catch (IllegalAccessException e) {
  32. e.printStackTrace();
  33. } catch (InvocationTargetException e) {
  34. e.printStackTrace();
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. // Builder主要负责对ActiveMessage的构建,是一种典型的Gof Builder设计模式
  40. static class Builder {
  41. private Object[] objects;
  42. private Method method;
  43. private ActiveFuture<Object> future;
  44. private Object service;
  45. public Builder userMethod(Method method) {
  46. this.method = method;
  47. return this;
  48. }
  49. public Builder returnFuture(ActiveFuture<Object> future) {
  50. this.future = future;
  51. return this;
  52. }
  53. public Builder withObjects(Object[] objects) {
  54. this.objects = objects;
  55. return this;
  56. }
  57. public Builder forService(Object service) {
  58. this.service = service;
  59. return this;
  60. }
  61. // 构建ActiveMessage实例
  62. public MethodMessage build() {
  63. return new ActiveMessage(this);
  64. }
  65. }
  66. }

构造Active Message必须使用Builder方式进行build, 其中包含了调用某个方法必需的入参(objects) , 代表该方法的java.lang.reflect Method实例, 将要执行的Active Service实例(service) , 以及如果该接口方法有返回值, 需要返回的Future实例(future) 。

3.2 @ActiveMethod

通用的Active Objects更加灵活, 它允许你将某个接口的任意方法转换为ActiveMethod, 如果不需要转换, 则需要按照普通方法来执行, 而不会被单独的线程执行, 要做到这一点,就需要使用 @ActiveMethod注解来进行标记, 代码如清单27-13所示

  1. import java.lang.annotation.ElementType;
  2. import java.lang.annotation.Retention;
  3. import java.lang.annotation.RetentionPolicy;
  4. import java.lang.annotation.Target;
  5. @Retention(RetentionPolicy.RUNTIME)
  6. @Target(ElementType.METHOD)
  7. public @interface ActiveMethod {
  8. }

3.3 ActiveServiceFactory详解

  1. import java.lang.reflect.InvocationHandler;
  2. import java.lang.reflect.Method;
  3. import java.lang.reflect.Proxy;
  4. public class ActiveServiceFactory {
  5. // 定义ActiveMessageQueue,用于存放ActiveMessage
  6. private final static ActiveMessageQueue queue = new ActiveMessageQueue();
  7. private static <T> T active(T instance) {
  8. Object proxy = Proxy.newProxyInstance(instance.getClass().getClassLoader(),
  9. instance.getClass().getInterfaces(), new ActiveInvocationHandler<>(instance));
  10. return (T) proxy;
  11. }
  12. // ActiveInvocationHandler是InvocationHandler的子类,生产Proxy时需要使用到
  13. private static class ActiveInvocationHandler<T> implements InvocationHandler {
  14. private final T instance;
  15. ActiveInvocationHandler(T instance) {
  16. this.instance = instance;
  17. }
  18. @Override
  19. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  20. // 如果接口方法被@ActiveMessage 标记,则会转换为ActiveMessage
  21. if ( method.isAnnotationPresent(ActiveMethod.class)) {
  22. // 检查该方法是符合规范
  23. this.checkMethod(method);
  24. ActiveMessage.Builder builder = new ActiveMessage.Builder();
  25. builder.userMethod(method).withObjects(args).forService(instance);
  26. Object result = null;
  27. if ( this.isReturnFutureType(method)) {
  28. result = new ActiveFuture<>();
  29. builder.returnFuture((ActiveFuture)result);
  30. }
  31. //将ActiveMessage加入至队列中
  32. queue.offer(builder.build());
  33. return result;
  34. } else {
  35. return method.invoke(instance,args);
  36. }
  37. }
  38. // 检查有返回值的方法是否为Future,否则将会抛出 IllegalActiveMethod异常
  39. private void checkMethod(Method method) throws IllegalActiveMethod {
  40. // 有返回值,必须是ActiveFuture类型的返回值
  41. if ( !isReturnVoidType(method) && !isReturnFutureType(method)) {
  42. try {
  43. throw new IllegalActiveMethod("the method");
  44. } catch (IllegalActiveMethod illegalActiveMethod) {
  45. illegalActiveMethod.printStackTrace();
  46. }
  47. }
  48. }
  49. // 判断方法是否为Future返回类型
  50. private boolean isReturnFutureType(Method method) {
  51. return method.getReturnType().isAssignableFrom(Future.class);
  52. }
  53. // 判断方法是否无返回类型
  54. private boolean isReturnVoidType(Method method) {
  55. return method.getReturnType().equals(Void.TYPE);
  56. }
  57. }
  58. }

在上述代码中:

  • 静态方法active() 会根据ActiveService实例生成一个动态代理实例,其中会用到ActiveInvocationHandler作为newProxyInstance的InvocationHandler。
  • 在Active Invocation Handler的invoke方法中, 首先会判断该方法是否被Active Method标记, 如果没有则被当作正常方法来使用。
  • 如果接口方法被@Active Method标记, 则需要判断方法是否符合规范:有返回类型,必须是Future类型。
  • 定义Active Message.Builder分别使用method、方法参数数组以及Active Service实例, 如果该方法是Future的返回类型, 则还需要定义Active Future。
  • 最后将Active Message插入Active MessageQueue中, 并且返回method方法invoke结果。

    4 Active MessageQueue及其他