注:本文大部分文字摘抄《Akka 实战:快速构建高可用分布式应用》中内容,但此书中所用的akka版本比较老,所以本文中的代码是本文作者基于最新版本的akka以及官网所实现。

文中涉及到的所有Demo都在github上 网页地址为:https://github.com/boomblog/AkkaDemo clone地址为:https://github.com/boomblog/AkkaDemo.git

定时调度—Scheduler

Scheduler有两种:scheduleOnce 和 scheduler。
scheduleOnce表示延迟一段时间后执行,且只执行一次;
scheduler表示延迟后定时执行,并且可以取消定时执行。
Scheduler对象需要通过 Actor.scheduler()得到,它在整个 Actor 系统内都是单例的。
实际上,在 Actor 系统启动后,会读取 akka.scheduler.implementation 这个配置项,默认获取到的Scheduler 正是由此生成,当然,你也可以自定义该实现。

处理并发结果—Future

**使用 Future 的最简单方式就是通过 Patterns.ask 方法接收一个 Future 对象,然后使用同步或异步的方式去处理消息。同步的方式主要是通过Await.wait方法阻塞等待返回值来实现,异步主要通过各种回调函数实现。需要注意的是:在实现消息传递时,我们必须考虑到超时带来的影响。

  1. public class FutureDemo {
  2. static class FutureActor extends AbstractActor {
  3. @Override
  4. public Receive createReceive() {
  5. return receiveBuilder().matchAny(msg -> {
  6. Thread.sleep(4000);
  7. getSender().tell("reply", getSelf());
  8. }).build();
  9. }
  10. }
  11. public static void main(String[] args) {
  12. ActorSystem system = ActorSystem.create("sys");
  13. ActorRef ref = system.actorOf(Props.create(FutureActor.class), "fuActor");
  14. Timeout timeout = new Timeout(Duration.create(3, "seconds"));
  15. Future<Object> future = Patterns.ask(ref, "hello future", timeout);
  16. try {
  17. // Await同步获取响应,如果超时了则会抛出java.util.concurrent.TimeoutException
  18. String replymsg = (String) Await.result(future, timeout.duration());
  19. System.out.println(replymsg);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }

假如要异步处理消息,则需要显式地依赖 ExecutionContext 对象,并且调用回调函数。Future 提供的回调函数有:onComplete 、onSuccess和 onFailure ,它们分别表示完成、成功、失败的处理,下面是示例代码:

  1. future.onSuccess(new OnSuccess<Object>() {
  2. @Override
  3. public void onSuccess(Object msg) throws Throwable {
  4. System.out.println("receive: " + msg);
  5. }
  6. }, system.dispatcher());
  7. future.onFailure(new OnFailure() {
  8. @Override
  9. public void onFailure(Throwable ex) throws Throwable {
  10. if (ex instanceof AskTimeoutException) {
  11. System.out.println("超时异常");
  12. } else {
  13. System.out.println("其他异常 " + ex);
  14. }
  15. }
  16. }, system.dispatcher());
  17. future.onComplete(new OnComplete<Object>() {
  18. @Override
  19. public void onComplete(Throwable failure, Object success) throws Throwable {
  20. if (failure != null) {
  21. System.out.println("异常");
  22. } else {
  23. System.out.println(success);
  24. }
  25. }
  26. }, system.dispatcher());

当成功获取消息时,会进入 onSuccess 回调函数,并且将消息传给OnSuccess 方法;当出现异常时(比如超时),会进入 onFailure 回调函数,并且将异常传给 onFailure 方法,除了这两个函数外,还可以使用onComplete 函数,它相当于把 onSuccess 和 onFailure 的功能整合在一起。

事件总线

事件总线就是观察者模式,实现了基于发布—订阅的消息流处理。
在 Akka 中,事件总线被抽象成 EventBus 类型,它拥有发布(publish)、订阅(subscribe)、取消订阅( unsubscribe )等功能。具体来讲,一个完整的消息总线的过程是:发布者将 Event 发布到 EventBus上,事件订阅者( Subscriber )将会接收到相应的通知消息,通常来讲,订阅者会是一个 Actor,它通过原有的 onReceive 方法接收该消息。在这个过程中,还有一个概念非常重要,那就是 Classifier ,它用来描述一个事件分类,不同事件类型将有不同的订阅者,EventBus 将会通过 Classifier 来选择订阅者并向其发送消息。常见的 Classifier 有 LookupClassification 、SubchannelClassification 等,前者指定匹配的事件分类,由每个类别维护一个订阅者列表,后者可以匹配层级的事件分类。

Akka扩展机制

Akka 提供了一种非常好的扩展组件的方式,即 Akka Extensions (扩展)。实际上,Akka 内部有很多组件就是基于扩展实现的,比如 TypedActor 、序列化、集群指标等。

TypedActor使用

在 Akka 中,Actor 主要分为两类:AbstractActor(老版本叫UntypedActor) 和 TypedActor 。AbstractActor 更能体现经典 Actor 模型的优势,所以更加常用。而TypedActor 的使用方式比较接近 OOP,即通过接口——实现类和函数调用的方式来驱动任务的执行。

  1. public class TypedActorDemo {
  2. public interface UserService {
  3. public void saveUser(String id, String user);
  4. public Future<String> findUserForFuture(String id);
  5. public Option<String> findUserForOpt(String id);
  6. public String findUser(String id);
  7. }
  8. static class UserServiceImpl implements UserService {
  9. private static Map<String, String> map = new ConcurrentHashMap<String, String>();
  10. // 当方法无返回值(即 void)的时候,底层会采用 ActorRef.tell 的方式来调用,执行方式为异步;
  11. @Override
  12. public void saveUser(String id, String user) {
  13. map.put(id, user);
  14. }
  15. // 当方法返回 scala.concurrent.Future 时,会以 Patterns.ask 的方式来调用,然后将结果值包装到 Future 里并返回,它的执行方式也是异步
  16. @Override
  17. public Future<String> findUserForFuture(String id) {
  18. return Futures.successful(map.get(id));
  19. }
  20. // 当方法返回 akka.japi.Option 时,会以 Patterns.ask 的方式来调用,但是程序会一直阻塞,直到有返回值。假如返回值为 null,它会被包装成 Option.None 类型;
  21. @Override
  22. public Option<String> findUserForOpt(String id) {
  23. return Option.some(map.get(id));
  24. }
  25. // 当方法返回其他类型时,就和普通方法一样,程序会一直阻塞,直到有返回值。
  26. @Override
  27. public String findUser(String id) {
  28. return map.get(id);
  29. }
  30. }
  31. public static void main(String[] args) {
  32. ActorSystem system = ActorSystem.create("sys");
  33. TypedActorExtension typeActorExtension = TypedActor.get(system);
  34. UserService userService = typeActorExtension.typedActorOf(new TypedProps<UserServiceImpl>(UserService.class, UserServiceImpl.class));
  35. System.out.println("userService: " + userService);
  36. //异步执行?
  37. userService.saveUser("1", "afei");
  38. //异步执行
  39. Future<String> fu = userService.findUserForFuture("1");
  40. fu.onSuccess(new OnSuccess() {
  41. @Override
  42. public void onSuccess(Object result) throws Throwable {
  43. System.out.println("The future user is:" + result);
  44. }
  45. }, system.dispatcher());
  46. //阻塞直到有返回值
  47. Option<String> opt = userService.findUserForOpt("1");
  48. System.out.println("The Opt user is:" + opt.getClass());
  49. //阻塞直到有返回值
  50. String user = userService.findUser("1");
  51. System.out.println("The user is: " + user);
  52. }
  53. }

自定义扩展

Akka 的扩展需要依赖 Extensions API ,它包含两个概念:Extension和 ExtensionId 。Extension 表示一个可用的扩展组件,它对每一个ActorSystem 都是唯一的,即每次通过同一个 ActorSystem 得到的Extension 对象都是同一个。ExtensionId 是用来创建及查找 Extension 的组件,将其配置在*.conf 中可以让 ActorSystem 在创建阶段加载 Extension 。

接下来我们将要演示的示例场景是:某个 ActorSystem 需要和外界做 RPC 调用,而远程服务地址和端口是可配置的。

首先,我们需要在application.conf 文件中定义配置项:

  1. akkademo{
  2. server=serverdemo.io
  3. port=1234
  4. }

该配置包含 RPC 的 server 地址以及端口,它们会在加载时传入Extension 对象,Extension 的实现如下:

  1. static class RPCExtension implements Extension {
  2. private String server;
  3. private int port;
  4. public RPCExtension(String server, int port) {
  5. this.server = server;
  6. this.port = port;
  7. }
  8. //模拟RPC调用
  9. public void rpcCall(String cmd) {
  10. System.out.println("call " + cmd + "-->" + server + ":" + port);
  11. }
  12. }

它实现了 Extension 接口,其他地方和普通 Java Class 无异。为了使该类对象能被 Actor-System 查找并绑定,需要再定义一个 ExtensionId 类型:

  1. static class RPCExtProvider extends AbstractExtensionId<RPCExtension> {
  2. private static RPCExtProvider provider = new RPCExtProvider();
  3. @Override
  4. public RPCExtension createExtension(ExtendedActorSystem system) {
  5. Config config = system.settings().config();
  6. String server = config.getString("akkademo.server");
  7. int port = config.getInt("akkademo.port");
  8. return new RPCExtension(server, port);
  9. }
  10. public static RPCExtProvider getInstance() {
  11. return provider;
  12. }
  13. }

该类继承了 AbstractExtensionId 并实现了 createExtension 方法,该方法主要用于 Extension 组件的创建,它的 ExtendedActorSystem 参数即当前环境下的 ActorSystem 对象引用,所以可以直接通过它来获取 Config 对象,进而读取配置信息。在使用它查找 Extension 时,需要调用AbstractExtensionId 提供的 get 方法:

  1. RPCExtension rpcExt = RPCExtProvider.getInstance().get(system);
  2. rpcExt.rpcCall("hello");

当调用 get 方法时,RPCExtProvider.createExtension 会被自动调用一次,假如将该类配置在 akka.extensions 里,该方法会在 ActorSystem 创建阶段自动调用一次。

Akka Stream

实际项目中往往离不开对数据的操作,这些数据可能来自很多地方,比如文件、网络或者数据库,我们一般把这些地方叫作数据源。在得到这些数据后,我们通常需要对它们做诸如提取、转换的处理,然后将它们输出到某个目的地,或者展示给用户。在这个过程中,我们需要考虑一个很重要的问题:当数据源源不断地从外部「流」进来时,我们该如何利用有限的资源高效地处理它们。

Akka 的 Streams 模块为解决这类问题提供了很好的思路。Akka Streams 建立在现有的 Actor 模型之上,它将所有的处理过程抽象成异步&并行执行的函数,而数据将会在这些函数内流动并得到处理,这种做法极大提高了整个计算的效率。假如我们把数据的流向简单地理解成「输入——输出」,那么在一个「流」里,可能存在多个「输入——输出」的操作,这些操作可以作为独立的组件而存在,在你需要它们时,将它们一个个装配起来即可。从这点可以看出,Akka Streams 为我们提供了一个简洁并易于扩展的编程模型。Akka Streams 另外一个特点是它可以让程序在有限资源(内存)下高效地处理大量数据,这其中离不开对内部缓冲池的有效控制,假如把数据的两端看作生产者和消费者,那么当数据的消费速度跟不上生产速度时,它就会减慢生产速度,反之亦然,这个行为被称为「背压」(BackPressure ),它也是一个流量控制的手段。

Streams 组件

Akka Streams 的几个重要组件。

  • Source:产生一个输出,它的下游在处理时会接收它的数据。
  • Sink:需要一个输入,它通常是流处理的最后一个阶段。
  • Flow:拥有一个输入和输出,它包含一些类似集合的操作,可以做一些数据转换、过滤等操作。
  • RunnableGraph :当你拥有 Source、Sink 等结构后,还需要将它们连接在同一个管道上才能真正工作,此时我们需要构建一个RunnableGraph 对象,为最后的执行(run)做好准备工作。

一个流总是由 Source 开始,它是流的起点,也是数据的来源,你可以从集合或者文件等数据源中构建一个 Source,然后将其经过 Flow(可以有多个)的转换或过滤,最后使用 Sink 来进行最后的处理。

由于在 Akka 中,Streams 并不属于其核心模块,所以在使用它时需要先加入以下 maven 依赖:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-stream_2.13</artifactId>
  4. <version>2.5.23</version>
  5. </dependency>

一个简单代码样例:

  1. ActorSystem system = ActorSystem.create("sys");
  2. // 数据来源、流的起点
  3. // Source 包括两个泛型:第一个泛型表示它产生的数据类型,第二个泛型表示运行时产生的其他辅助数据,假如没有则设置为 NotUsed。
  4. Source<Integer, NotUsed> source = Source.range(1, 5);
  5. // 定义sink,用来循环打印数据,此时
  6. Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
  7. // 使用sink操作数据,此时会创建出一个RunnableGraph对象
  8. RunnableGraph<NotUsed> graph = source.to(sink);
  9. // 当执行 RunnableGraph.run 方法时,需要传入一个 Materializer 对象,它主要用来给流分配 Actor 并驱动其执行。
  10. Materializer materializer = ActorMaterializer.create(system);
  11. graph.run(materializer);

构建Source

1. 从集合中构建 Source
  1. List<String> list = new ArrayList<String>();
  2. list.add("sh");
  3. list.add("bj");
  4. list.add("nj");
  5. Source<String,NotUsed> s1=Source.from(list);
  6. s1.runForeach(System.out::println, materializer);

2. 从 Future 中构建 Source
  1. Source<String,NotUsed> s2=Source.fromFuture(Futures.successful("HelloAkka!"));

3. 重复生成元素,取前N个元素
  1. // 重复的生成元素
  2. Source<String, NotUsed> s3 = Source.repeat("Hello");
  3. // 取出前5个
  4. s3.limit(5).runForeach(System.out::println, materializer);

4. 使用 FileIO API 从文件中构建 Source,可将文件内容作为流处理的输入
  1. Source<ByteString, CompletionStage<IOResult>> source = FileIO.fromPath(Paths.get("demo_in.txt"));

构建 Sink

1. 使用 Sink 循环出每个元素。
  1. Sink<Integer,CompletionStage<Done>> sink1=Sink.foreach(System.out::println);

2. 使用 Sink 做 fold 运算,该运算会将 fold 的第一个参数作为初始值传入后面函数中的 x,每次计算后将结果继续作为下一次计算的参数输入。runWith 会将 Source 和 Sink 连接起来并运行。
  1. Sink<Integer, CompletionStage<Integer>> sink2 = Sink.fold(1, (x,y) -> x*y);
  2. CompletionStage<Integer> r1 = Source.range(1, 5).runWith(sink2,materializer);
  3. r1.thenAccept(System.out::println);// 1*2*3*4*5=120

3. 使用 Sink 做 reduce 运算,该运算和 fold 类似,不过没有初始参数。
  1. Sink<Integer,CompletionStage<Integer>> sink3=Sink.reduce((x,y)->x+y);
  2. CompletionStage<Integer> r2= Source.range(1, 5).runWith(sink3,materializer);
  3. r2.thenAccept(System.out::println);// 1+2+3+4+5=15

4. 使用 FileIO API 构建输出流的 Sink,结合 Source 可以实现简单的文件复制功能。
  1. Sink<ByteString, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("demo_out.txt"));

构建 Flow

Flow 需要一个输入和输出,一般作为中间过程而存在,在一个流操作中,我们可以包装多个 Flow 一起使用

定时器、Future、事件总线、扩展机制、Stream - 图1

在使用时,Flow 组件通过调用 Source.via 方法附加在 Source 上,此时会构建一个新的 Source 对象,同时它也可以通过调用 Flow.to 方法附加在 Sink 上,此时会构建一个新的 Sink 对象。下面代码演示了这个过程:

  1. Flow<String, Integer, NotUsed> flow = Flow.of(String.class).map(x -> {
  2. return Integer.parseInt(x) * 3;
  3. });
  4. Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
  5. //list是一个包含5个元素的集合,代码略...
  6. Source.from(list).via(flow).runWith(sink, materializer);

这段代码采用 via 方法将 Flow 组件附加在 Source 上,它实现的功能是:通过字符串集合构建出一个 Source 对象,然后将每个字符串取出来转换成数字并做相应计算,再使用 Sink.foreach 打印出来,其中转换的过程依赖于 Flow.map 方法来实现。如前面所说,我们也可以将 Flow 组件附加在 Sink 上以构建新的 Sink 组件,如:

  1. Source.from(list).runWith(flow.to(sink), materializer);

通过这个例子还可以看出,流操作和集合非常相似,但是大家要注意,我们仍然不能把流等同于集合,它们之间有个很大的区别是:流的大小往往是未知的,无界的,而集合的大小一般都是已知的。所以在处理流时,你事先不需要知道要处理多少数据,生产者可能会源源不断地流入数据进来,流的底层能保证在有限的资源下处理「无限」的数据。所以实际上,它们的底层实现也是有很大区别的。

组合 Source、Sink

在前面的例子中,我们分别将 Flow 附加在 Source 和 Sink 上,由此创建了一个新的 Source 和 Sink 对象,实际上这个过程可以看成是一种「输入输出」的组合。
由于 Source 是一个产生输出的组件,当它和 Flow 组合时,会生成一个拥有输出的 Source;同理,由于 Sink 是一个需要输入的组件,当它和 Flow 组合时,会生成一个需要输入的 Sink,这两种情况在前面已经实现过,这里不再给出示例。下图中的第三种情况是 Sink 和 Source 组合,根据它们的结构特点,可以生成一个拥有输入输出的 Flow,下面还有示例代码:

定时器、Future、事件总线、扩展机制、Stream - 图2

  1. Flow<String, Integer, NotUsed> flow = Flow.fromSinkAndSource(Sink.foreach(System.out::println), Source.range(1, 3));

实际上,还有一种常见的情况是多个 Flow 的组合,这通过 Flow.via 方法实现,很显然,这种组合会生成一个 Flow 组件。

日志处理小项目参见github。