1 接受异步消息的主动对象
Active是“主动”的意思Active Object是“主动对象”的意思,所谓主动对象就是指其拥有自己的独立线程, 比如java.lang.Thread实例就是一个主动对象, 不过Active Object Pattern不仅仅是拥有独立的线程,它还可以接受异步消息,并且能够返回处理的结果。
我们在本书中频繁使用的System.gc() 方法就是一个“接受异步消息的主动对象”,调用gc方法的线程和gc自身的执行线程并不是同一个线程,在本章中,我们将实现一个类似于System.gc的可接受异步消息的主动对象。接受异步消息的主动对象工作原理如图所示。
2 标准Active Objects 模式设计
在本节中, 我们首先从标准的Active Objects设计入手, 将一个接口的方法调用转换成可接受异步消息的主动对象,也就是说方法的执行和方法的调用是在不同的线程中进行的,那么如何使得执行线程知道应该如何正确执行接口方法呢?我们需要将接口方法的参数以及具体实现封装成特定的Message告知执行线程。如果该接口方法需要返回值, 则必须得设计成Future的返回形式, 图为标准Active Objects模型设计的结构类图。
通过图27-3我们可以看出, 当某个线程调用Order Service接口的find Order Details方法时, 事实上是发送了一个包含find Order Details方法参数以及Order Service具体实现的Message至Message队列, 执行线程通过从队列中获取Message来调用具体的实现, 接口方法的调用和接口方法的执行分别处于不同的线程中, 因此我们称该接口为Active Objects。
2.1 OrderService接口设计
Order Service是一个比较简单的接口, 包含两个方法, 其中第一个为有返回值的方法,第二个方法则没有返回值,代码如所示:
public interface OrderService {
/**
* 根据订单编码查询订单明细,有入惨也有返回值,但是返回类型必须时Future
*/
Future<String> findOrderDetails(long orderId);
/**
* 提交订单,没有返回值
* @param account
* @param orderId
*/
void order(String account, long orderId);
}
- �find OrderDetails(long orderId) :通过订单编号获取订单详情,有返回值的方法必须是Future类型的,因为方法的执行是在其他线程中进行的,势必不会立即得到正确的最终结果,通过Future可以立即得到返回。
- Order(String account, long orderId) :提交用户的订单信息,是一种无返回值的方法。
2.2 OrderServiceImpl详解
import java.util.concurrent.TimeUnit;
public class OrderServiceImpl implements OrderService{
@Override
public Future<String> findOrderDetails(long orderId) {
return FutureService.<Long,String>newService().submit(input -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "The order Details Information";
}, orderId, null);
}
@Override
public void order(String account, long orderId) {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println(" process the order for account " + account + ",orderId " + orderId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
�OrderServiceImpl类是OrderService的一个具体实现,该类是在执行线程中将要被使用的类,其中findOrderDetails方法通过第19章中我们开发的Future立即返回一个结果,order方法则通过休眠来模拟该方法的执行比较耗时。
2.3 OrderServiceProxy详解
�OrderServiceProxy是OrderService的子类,它的作用是将OrderService的每一个方法都封装成MethodMessage,然后提交给ActiveMessage队列,在使用OrderService接口方法的时候,实际上是在调用OrderServiceProxy中的方法,代码如所示。
import java.util.HashMap;
import java.util.Map;
public class OrderServiceProxy implements OrderService {
private final OrderService orderService;
private final ActiveMessageQueue activeMessageQueue;
public OrderServiceProxy(OrderService orderService, ActiveMessageQueue activeMessageQueue) {
this.orderService = orderService;
this.activeMessageQueue = activeMessageQueue;
}
@Override
public Future<String> findOrderDetails(long orderId) {
// 定义一个ActiveFuture,并且可支持立即返回
final ActiveFuture<String> activeFuture = new ActiveFuture<>();
// 收集方法入惨以及返回的ActiveFuture 封装成MethodMessage
Map<String, Object> params = new HashMap<>();
params.put("orderId", activeFuture);
params.put("activeFuture", activeFuture);
MethodMessage message = new FindOrderDetailsMessage(params, orderService);
// 将methodmessage保存值activeMessageQueue中
activeMessageQueue.offer(message);
return activeFuture;
}
@Override
public void order(String account, long orderId) {
// 收集方法参数, 并且封装成MethodMessage,然后offer至队列中
Map<String, Object> params = new HashMap<>();
params.put("account", account);
params.put("orderId", orderId);
MethodMessage message = new OrderMessage(params, orderService);
activeMessageQueue.offer(message);
}
}
OrderServiceProxy作为OrderService的一个实现,看上去与OrderService没多大关系,其主要作用是将OrderService接口定义的方法封装成MethodMessage,然后offer给Active-MessageQueue。若是无返回值的方法,则只需要提交Message到ActiveMessageQueue中即可, 但若是有返回值的方法,findOrderDetails是比较特殊的,它需要返回一个Active-Future,该Future的作用是可以立即返回,当调用线程获取结果时将进入阻塞状态, 代码所示。
public class ActiveFuture<T> extends FutureTask {
@Override
public void finish(Object result) {
super.finish(result);
}
}
ActiveFuture非常简单,是FutureTask的直接子类,其主要作用是重写finish方法,并且将protected的权限换成public, 可以使得执行线程完成任务之后传递最终结果。
2.4 MethodMessage
MethodMessage的主要作用是收集每一个接口的方法参数,并且提供execute方法供ActiveDaemonThread直接调用,该对象就是典型的WorkerThread模型中的Product(附有使用说明书的半成品,等待流水线工人的加工) , execute方法则是加工该产品的说明书。MethodMessage的代码如所示。
import java.util.Map;
public abstract class MethodMessage {
// 用于收集方法参数,如果有返回Future类型则一并收集
protected final Map<String, Object> param;
protected final OrderService orderService;
protected MethodMessage(Map<String, Object> param, OrderService orderService) {
this.param = param;
this.orderService = orderService;
}
// 抽象方法,扮演work thread的说明书
public abstract void execute();
}
� 其中,params主要用来收集方法参数,orderService是具体的接口实现,每一个方法都会被拆分成不同的Message。在OrderService中,我们定义了两个方法,因此需要实现两个MethodMessage。
(1) FindOrderDetailsMessage
FindOrderDetailsMessage的代码如所示
import java.util.Map;
public class FindOrderDetailsMessage extends MethodMessage {
public FindOrderDetailsMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}
@Override
public void execute() {
Future<String> realFuture = orderService.findOrderDetails((Long)param.get("orderId"));
ActiveFuture<String> activeFuture = (ActiveFuture<String>)param.get("activeFuture");
try {
String result = realFuture.get();
activeFuture.finish(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在上述代码中:
①执行orderService的findOrderDetails方法。
②调用orderServiceImpl返回的Future.get() ,此方法会导致阻塞直到findOrderDetails方法完全执行结束。
③当findOrderDetails执行结束时,将结果通过finish的方法传递给activeFuture。
(2) OrderMessage
OrderMessage的代码如所示。
import java.util.Map;
public class OrderMessage extends MethodMessage {
public OrderMessage(Map<String, Object> params, OrderService orderService) {
super(params, orderService);
}
@Override
public void execute() {
//获取参数
String account = (String) param.get("account");
long orderId = (long)param.get("orderId");
// 执行真正的order方法
orderService.order(account, orderId);
}
}
OrderMessage主要处理order方法, 从param中获取接口参数, 然后执行真正的OrderService的order方法。
2.5 ActiveMessageQueue
ActiveMessageQueue对应于Worker-Thread模式中的传送带,主要用于传送调用线程通过Proxy提交过来的MethodMessage,但是这个传送带允许存放无限的MethodMessage(没有limit的约束, 理论上可以放无限多个MethodMessage直到发生堆内存溢出的异常) ,代码所示。
import java.util.LinkedList;
public class ActiveMessageQueue {
// 用于存放提交的MethodMessage消息
private final LinkedList<MethodMessage> messages = new LinkedList<>();
public ActiveMessageQueue() {
// 启动Worker线程
new ActiveDaemonThread(this).start();
}
public void offer(MethodMessage methodMessage) {
synchronized (this) {
messages.addLast(methodMessage);
this.notify();
}
}
protected MethodMessage take() {
synchronized (this) {
// 当MethodMessage 队列中没有Message的时候,执行线程进入阻塞
while(messages.isEmpty() ) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取其中一个MethodMessage 并且从队列中移除
return messages.removeFirst();
}
}
}
�上述代码中:
- 在创建Activ MessageQueue的同时启动ActiveDaemonThread线程, ActiveDaemonThread主要用来进行异步的方法执行,后面我们会介绍。
- 执行offer方法没有进行limit的判断,允许提交无限个MethodMessage(直到发生堆内存溢出) ,并且当有新的Message加入时会通知ActiveDaemonThread线程。
take方法主要是被Active Daemon Thread线程使用,当message队列为空时ActiveDaemonThread线程将会被挂起(Guarded Suspension) ,如代码所示。 ```java public class ActiveDaemonThread extends Thread{
private final ActiveMessageQueue queue;
public ActiveDaemonThread(ActiveMessageQueue queue) {
super("ActiveDaemonThread");
this.queue = queue;
// ActiveDaemonThread 为守护线程
setDaemon(true);
}
@Override
public void run() {
for(;;) {
/*
从MethodMessage队列中获取一个MethodMessage,然后执行execute方法
*/
MethodMessage methodMessage = this.queue.take();
methodMessage.execute();
}
}
}
ActiveDaemonThread是一个守护线程,主要是从queue中获取Message然后执行execute方法(注意:保持为线程命名的习惯是一个比较好的编程习惯) 。
<a name="DkHn8"></a>
### 2.6 OrderServiceFactory及测试
我们基本上已经完成了一个标准ActiveObjects的设计,接口方法的每一次调用实际上都是向Queue中提交一个对应的Message信息,当然这个工作主要是由Proxy完成的,但是为了让Proxy的构造透明化,我们需要设计一个Factory工具类,代码如所示。
```java
public class OrderServiceFactory {
// 将ActiveMessage定义成static的目的是,保持其整个JVM进程中是唯一的,并且ActiveDaemonThread 会在此刻启动
private final static ActiveMessageQueue activeMessageQueue = new ActiveMessageQueue();
private OrderServiceFactory(){}
// 返回OrderServiceProxy
public static OrderService toActiveObject(OrderService orderService) {
return new OrderServiceProxy(orderService, activeMessageQueue);
}
public static void main(String[] args) throws InterruptedException {
// 在创建OrderService时需要传递OrderService接口的具体实现
OrderService orderService = OrderServiceFactory.toActiveObject(new OrderServiceImpl());
orderService.order("hello", 1234);
System.out.println("return immediately");
Thread.currentThread().join();
}
}
3.通用Active Objects 框架设计
标准的ActiveObjects要将每一个方法都封装成Message(比如27.2节中定义的FindOrderDetailsMessage, OrderMessage) ,然后提交至Message队列中,这样的做法有点类似于远程方法调用(RPC:RemoteProcessCall) 。如果某个接口的方法很多, 那么需要封装很多的Message类; 同样如果有很多接口需要成为ActiveObject,则需要封装成非常多的Message类,这样显然不是很友好。在本节中,我们将设计一个更加通用的ActiveObject框架, 可以将任意的接口转换成ActiveObject。
在本节中,我们将使用JDK动态代理的方式实现一个更为通用的ActiveObjects,可以将任意接口方法转换为ActiveObjects,当然如果接口方法有返回值,则必须要求返回Future类型才可以,否则将会抛出IllegalActiveMethod异常, 代码如清单27-11所示。
// 若方法不符合则其被转换为Active方法时抛出异常
public class IllegalActiveMethod extends Exception{
public IllegalActiveMethod(String message) {
super(message);
}
}
通用的Active Objects设计消除了为每一个接口方法定义Method Message的过程, 同时也摒弃掉了为每一个接口创建定义Proxy的实现, 所有的操作都会被支持动态代理的工厂类Active Service Factory所替代, 通用Active Objects框架详细类图如图27-4所示
3.1 Active Message详解
相比较于Method Message, Active Message更加通用, 其可以满足所有Active Objects接口方法的要求, 与Method Message类似, Active Message也是用于收集接口方法信息和具体的调用方法的。Active Message的代码如所示。
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class ActiveMessage {
// 接口方法的参数
private final Object[] objects;
// 接口方法
private final Method method;
// 有返回值的方法 会返回ActiveFuture<?>类型
private final ActiveFuture<Object> future;
// 具体的Service接口
private final Object service;
// 构造ActiveMessage 是由Builder 来完成的
private ActiveMessage(Builder builder) {
this.objects = builder.objects;
this.method = builder.method;
this.future = builder.future;
this.service = builder.service;
}
// ActiveMessage的方法通过反射的方式调用执行的具体实现
public void execute() {
try {
// 执行接口的方法
Object result = method.invoke(service, objects);
if ( future != null ) {
// 如果是有返回值的接口方法,则需要通过get方法获得最终的结果
Future<?> realFuture = (Future<?>) result;
Object realResult = realFuture.get();
// 将结果交给ActiveFuture,接口方法的线程会得到返回
future.finish(realResult);
}
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Builder主要负责对ActiveMessage的构建,是一种典型的Gof Builder设计模式
static class Builder {
private Object[] objects;
private Method method;
private ActiveFuture<Object> future;
private Object service;
public Builder userMethod(Method method) {
this.method = method;
return this;
}
public Builder returnFuture(ActiveFuture<Object> future) {
this.future = future;
return this;
}
public Builder withObjects(Object[] objects) {
this.objects = objects;
return this;
}
public Builder forService(Object service) {
this.service = service;
return this;
}
// 构建ActiveMessage实例
public MethodMessage build() {
return new ActiveMessage(this);
}
}
}
构造Active Message必须使用Builder方式进行build, 其中包含了调用某个方法必需的入参(objects) , 代表该方法的java.lang.reflect Method实例, 将要执行的Active Service实例(service) , 以及如果该接口方法有返回值, 需要返回的Future实例(future) 。
3.2 @ActiveMethod
通用的Active Objects更加灵活, 它允许你将某个接口的任意方法转换为ActiveMethod, 如果不需要转换, 则需要按照普通方法来执行, 而不会被单独的线程执行, 要做到这一点,就需要使用 @ActiveMethod注解来进行标记, 代码如清单27-13所示
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ActiveMethod {
}
3.3 ActiveServiceFactory详解
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class ActiveServiceFactory {
// 定义ActiveMessageQueue,用于存放ActiveMessage
private final static ActiveMessageQueue queue = new ActiveMessageQueue();
private static <T> T active(T instance) {
Object proxy = Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), new ActiveInvocationHandler<>(instance));
return (T) proxy;
}
// ActiveInvocationHandler是InvocationHandler的子类,生产Proxy时需要使用到
private static class ActiveInvocationHandler<T> implements InvocationHandler {
private final T instance;
ActiveInvocationHandler(T instance) {
this.instance = instance;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果接口方法被@ActiveMessage 标记,则会转换为ActiveMessage
if ( method.isAnnotationPresent(ActiveMethod.class)) {
// 检查该方法是符合规范
this.checkMethod(method);
ActiveMessage.Builder builder = new ActiveMessage.Builder();
builder.userMethod(method).withObjects(args).forService(instance);
Object result = null;
if ( this.isReturnFutureType(method)) {
result = new ActiveFuture<>();
builder.returnFuture((ActiveFuture)result);
}
//将ActiveMessage加入至队列中
queue.offer(builder.build());
return result;
} else {
return method.invoke(instance,args);
}
}
// 检查有返回值的方法是否为Future,否则将会抛出 IllegalActiveMethod异常
private void checkMethod(Method method) throws IllegalActiveMethod {
// 有返回值,必须是ActiveFuture类型的返回值
if ( !isReturnVoidType(method) && !isReturnFutureType(method)) {
try {
throw new IllegalActiveMethod("the method");
} catch (IllegalActiveMethod illegalActiveMethod) {
illegalActiveMethod.printStackTrace();
}
}
}
// 判断方法是否为Future返回类型
private boolean isReturnFutureType(Method method) {
return method.getReturnType().isAssignableFrom(Future.class);
}
// 判断方法是否无返回类型
private boolean isReturnVoidType(Method method) {
return method.getReturnType().equals(Void.TYPE);
}
}
}
在上述代码中:
- 静态方法active() 会根据ActiveService实例生成一个动态代理实例,其中会用到ActiveInvocationHandler作为newProxyInstance的InvocationHandler。
- 在Active Invocation Handler的invoke方法中, 首先会判断该方法是否被Active Method标记, 如果没有则被当作正常方法来使用。
- 如果接口方法被@Active Method标记, 则需要判断方法是否符合规范:有返回类型,必须是Future类型。
- 定义Active Message.Builder分别使用method、方法参数数组以及Active Service实例, 如果该方法是Future的返回类型, 则还需要定义Active Future。
- 最后将Active Message插入Active MessageQueue中, 并且返回method方法invoke结果。
4 Active MessageQueue及其他