注:本文大部分文字摘抄《Akka 实战:快速构建高可用分布式应用》中内容,但此书中所用的akka版本比较老,所以本文中的代码是本文作者基于最新版本的akka以及官网所实现。
文中涉及到的所有Demo都在github上 网页地址为:https://github.com/boomblog/AkkaDemo clone地址为:https://github.com/boomblog/AkkaDemo.git
Akka简介
Akka 是一款高性能、高容错性的分布式&并行应用框架,遵循 Apache 2 开源许可,底层通过 JVM 上另外一个流行的语言 Scala 实现,提供Java&Scala API 。它基于经典的 Actor 并发模型(即所有的消息都是基于 Actor 组件进行传递),拥有如下特点:
- 并行与并发:提供对并行与并发的高度抽象。
- 异步非阻塞:Akka-Actor 消息通信都是基于异步非阻塞。
- 高容错性:为跨多 JVM 的分布式模型提供强劲的容错处理,号称永不宕机。
- 持久化:Actor 携带的状态或消息可以被持久化,以便于在 JVM 崩溃后能恢复状态。
- 轻量级:每个 Actor 大约只占 300bytes,即 1G 内存可容纳接近 300 万个 Actor。
- 基本上,Akka 从底层就解决了我们大多数分布式&并行程序常见的难题,让工程师更专注于业务实现,同时,它也保留了多个扩展接口及配置,便于满足个性化定制的需要!
Akka应用场景
目前 Akka 已经在多家互联网&软件公司广泛使用,比如 eBay、Amazon、VMWare、PayPal、阿里、惠普、豌豆荚等,涉及行业包括游戏、金融投资、医疗保健、数据分析等。
使用场景包括:
- 服务后端:比如 rest web,websocket 服务,分布式消息处理等。
- 并发&并行:比如日志异步处理,密集数据计算等。
总之,对高并发和密集计算的系统,Akka 都是适用的!
Akka架构体系
Akka 采用 Scala 开发,运行于 JVM 之上,提供了 Scala 和 Java 两种 API,目前属于 Lightbend 公司(原名 Typesafe)。它实现了经典的 Actor 模型,同时也提供了丰富的组件,比如邮箱(MailBox)、路由(Routing)、持久化( Persistence )、网络(包括远程、集群)等,在底层对分布式&并行模式进行了高度且统一的抽象,使工程师用很少的代码就可以实现一个完整的分布式应用。
Actor模型
Actor 模型最早在 1973 年由 Carl Hewitt 提出,它高度抽象了分布式并行程序的运行模式,从底层屏蔽了线程和锁机制的管理,为开发者提供了简单可依赖的开发方式。
Actor 模型认为,并行计算的最小单元就是一个 Actor 实例,而每个实例拥有自己的状态和行为,在一个大型系统中,可能存在成千上万个 Actor 实例,它们之间通过消息的方式进行通信,每个 Actor 都能发送消息给其他 Actor,也能从其他 Actor 接收消息。当我们在执行某个计算任务时,会给对应的 Actor 实例发送一个相关的消息,该 Actor 在接收消息后开始执行计算任务,由于整个消息通信的过程是异步的,所以不用等到 Actor 执行完整个过程就能执行下一步(发送消息后会马上返回),这种异步通信的方式大大提高了程序的响应性。
体系结构
Actor 是 Akka 最核心的概念,也是最基本的执行单元,所以对 Actor 管理和监控的有效性是极为重要的。在 Akka 中,每个 Actor 都有自己的监管对象,即该 Actor 的创建者,它们通常会负责子 Actor 的失败处理,另外,某些 Actor 也需要对生命周期进行监控(比如该 Actor 的终止),以便及时响应并做正确处理,这些监督和监控者本身也都是一个 Actor。在 Akka 中,整个 Actor 体系被抽象成一个 ActorSystem ,它是一个层级的结构,拥有公共行为的配置和管理。
在 ActorSystem 基础上,Akka 也提供了一些配套的组件,比如持久化,HTTP 服务,网络服务等,它们都是构建高可用分布式应用不可或缺的部分,基本架构体系和周边产品如下图所示。
Actor组件
在 Akka 中,Actor 是一个高度抽象的对象引用,它包含以下几个要素:
- 引用( Actor Reference ):Actor 的引用不同于普通对象的引用,也不能通过传统「new」的方式直接创建一个 Actor 对象。很多时候需要通过 actorOf 或者 actor-Selection 等方式返回一个ActorRef 对象,该对象有可能存在于本地,也可能存在于远程节点,对我们来说,它是位置透明的。Actor 之间的通信和执行任务都是通过消息驱动的(而不是 API 的调用)。
- 状态(State):Actor 在不同时刻可能有着不同的状态,这些状态用变量来表示。在底层实现上,Actor 是运行于线程池之上的,肯定会存在多个 Actor 共享同一个线程的情况,那么会不会出现并发问题呢?实际上,Akka 为每个 Actor 都抽象出一个轻量级的执行「线程」(不是真的线程),在底层已经实现了隔离,所以基本上不用担心该问题的出现。另外,当 JVM 崩溃时,为了避免 Actor 状态的丢失,我们可以借助持久化方案来对状态进行持久化操作。
- 行为(Behavior):Actor 有接收和发送消息的能力,每当它接收一条消息后,就可以执行某个业务操作,同时也可以把消息转发到其他节点进行处理。
- 监管策略( Supervisor Strategy ):Actor 系统是一个层级结构,当任务被某个 Actor 分摊到子 Actor 时,父 Actor 就拥有监管子 Actor 的义务。在监管时,我们需要根据不同的情况选择不同的处理方案(比如停止、重启、恢复或者失败上溯)和策略(比如 1 vs 1、1 vs N 策略)。
邮箱(Mailbox)
每个 Actor 都有自己的邮箱,所有其他 Actor 发送过来的消息都会进入该邮箱。Akka 自带多种邮箱类型,也提供自定义邮箱的接口。
路由(Routing)
消息除了通过普通的 Actor 发送之外,也可以通过路由发送。当通过路由发送消息时,我们可以根据需求来选择不同的路由策略,比如轮询、广播等。路由也可以是一个 Actor。
状态持久化( Persistence )
任何程序都有失败的可能,即便是 JVM 如此强大稳定的平台也一样。当程序出错、JVM 崩溃时,任何关键状态的丢失,对我们后续的业务来讲都可能是致命的打击,所以状态数据的持久化变得非常重要。Akka 提供了 Actor 状态的持久化方案,以便我们在必要时恢复数据。
网络(远程和分布式集群)
网络功能是实现远程 Actor 和分布式集群的基础,这其中包含 I/O、网络通信(TCP/UDP)、序列化配置、分布式通信协议(Gossip)、节点(node)管理、集群分片等内容。
HTTP 模块
Akka 提供了简单易用的 HTTP 模块,支持完整的 HTTP 服务端与客户端开发,可以帮助我们快速构建性能极强的 Rest Web 服务。
相关开源项目
Akka 具有高性能、可扩展、设计友好等诸多优点,非常适合用来作为分布式应用的基础框架,而且由于对 HTTP 有非常好的支持,也让它在 Web 服务领域占有一席之地。目前业界已经有多个基于 Akka 实现的开源项目,项目类型涵盖了 Web 开发、微服务、分布式文件或计算服务等。下面是 Akka 中两个具有代表性的开源项目:
- Play 框架:一款大名鼎鼎的 Web 开发框架。它不同于其他 Servlet 系的框架,比如 Struts 或者 SpringMVC ,底层基于 Akka 构建,天生拥有异步的特点,具有极佳的性能。它默认提供 RESTful 风格的 API,同时也对 WebSocket 有不错的支持。
- Lagom 框架:在目前 IT 界,最火爆的概念要属「微服务」了,微服务的理念是,把业务功能拆成小的、独立的单元,它们之间能够互相通信而且支持水平扩展。Lagom 就是这样一款微服务框架,它基于异步的消息驱动,对分布式集群、持久化(如 JPA、NoSQL)都有良好的支持。同时,它也拥有完整的集成开发环境,非常便于在线部署和管理。
基本使用
环境搭建
JDK版本最好是在 1.6 以上,不过后续会介绍Akka的一些新特性,所以最好采用1.8的版本。
Maven依赖
使用最新版akka依赖。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.23</version>
</dependency>
注意Akka中不同组件分为了不同的模块,我们这里暂时只是引用了核心模块,通过它可以使用akka的最基本功能。除开核心模块还包括akka-persistence 、akka-remote 、akka-cluster 等模块,分别用于实现持久化、远程、集群等功能。
创建一个Actor
public class ActorDemo extends AbstractActor {
private LoggingAdapter log = Logging.getLogger(this.getContext().getSystem(), this);
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
s -> log.info(s))
.matchAny(o -> {
log.info("any" + o.toString());
})
.build();
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sys");
ActorRef actorRef = system.actorOf(Props.create(ActorDemo.class), "actorDemo");
actorRef.tell("hello world", ActorRef.noSender()); // ActorRef.noSender()实际上就是叫做deadLetters的actor
}
}
- 通常每个应用程序只需要创建一个ActorSystem对象
- 在创建 ActorSystem 和 Actor 时,建议都指定名字,如果不知道akka将自动生成,在该示例中,它们的名字分别是 sys 和 actorDemo ,在同一个 ActorSystem 中 Actor 不能重名。
- actorOf方法返回的不是Actor对象本身,而是ActorRef,表示Actor对象的引用。我们在向某个Actor发送消息时都是通过该引用进行的。
- 在Actor内部,可以使用getContext()来创建该Actor的子Actor,比如
ActorRef childActor=getContext().actorOf(Props.create(ChildActor.class),"childActor" );
工厂模式
ActorSystem 和 ActorContext 通过接收一个 Props 实例来创建 Actor,而 Props 实例本身有两种方式可以创建:
- Props.create(ActorDemo.class, Object… args)
- 指定一个Props工厂。
对于比较简单的场景使用第一种就足够了,而如果需要按统一的规则创建Actor,我们可以在Actor内定义一个创建Props的方法,比如:
public static Props createProps() {
return Props.create(new Creator<Actor>() {
@Override
public Actor create() throws Exception {
return new HelloWorldActor();
}
});
}
然后可以使用一下代码来创建Actor:
ActorRef actorRef = system.actorOf(ActorDemo.createProps(), "actorDemo");
可以看出,如果我们使用工厂模式,在创建Actor时调用工厂方法,我们不用关心Actor创建的具体过程了,有Actor自己封装即可。
发送与接收消息
Actor中的createReceive方法是用来接收并处理消息的。
createReceive方法需要放回Receive对象,而Receive对象就是真正用来处理消息的逻辑。
Receive对象可以定义规则使接收到的消息进入不同的处理逻辑:
- match(Class
type):消息类型为指定type的进入此分支
- matchEquals(P object):消息值与object相等的进入此分支
- matchAny():任何消息都可以进入到此分支
当然,一个消息只会进入到一个分支,如果一个消息匹配多个条件优先进入定义靠前的条件。
可以使用tell和ask两种方式发送消息,它们都以异步的方式发送消息,不同的是,前者发完后立即返回,而后者期待得到一个返回结果,假如在设置的时间(Timeout)内没有得到返回结果,发送方会收到一个超时异常。
tell方法
第一个参数即”消息”,它可以是任何可序列化的数据或对象,第二个参数表示发送者,noSender()表示无发送者(实际上是一个叫做 deadLetters 的 Actor,后面介绍)。假如想在Actor内部得到发送者,可以调用getSender ()方法。
在调用 tell 方法后,Actor 就会异步的去处理该消息,并不会阻塞后续代码的运行。
ask方法
这是一种“请求-响应”模式,ask方法会将返回结果包装在scala.concurrent.Future中。
对于一个Actor如何将响应返回给消息发送者呢,其实就是想响应作为消息发送给请求消息的发送者:
getSender().tell( "hello " +msg , getSelf());
使用样例:
public class AskDemo extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().matchAny(o -> {
System.out.println("发送者是" + getSender());
Thread.sleep(1000);
getSender().tell("hello " + o, getSelf());
}).build();
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sys");
ActorRef askActor = system.actorOf(Props.create(AskDemo.class), "askActorDemo");
Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
Future<Object> f = Patterns.ask(askActor, "Akka Ask", timeout);
System.out.println("ask ...");
f.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object result) throws Throwable {
System.out.println("收到消息:" + result);
}
}, system.getDispatcher());
}
}
消息转发
Actor可以将接收到的消息转发给其他Actor,语法为:
static class ForwardActor extends AbstractActor {
private ActorRef target = getContext().actorOf(Props.create(TargetActor.class), "targetActor");
@Override
public Receive createReceive() {
return receiveBuilder().matchAny((message) -> {
target.forward(message, getContext());
}).build();
}
}
TargetActor接收到转发过来的消息后,发送者任然是消息原始的发送者,而不是转发者。
查找Actor
对于已存在的 Actor,我们可以根据路径来进行查找,API 如下:ActorSelection as = [ActorSystem/ActorContext].actorSelection([path]);
在调用它时,必须指定一个绝对或者相对路径(包括远程路径),比如:
- /user/parentActor/childActor
- ../parentActor/childActor
- akka.tcp://testuser@127.0.0.1:2554/user/parentActor/childActor
查找结果为ActorSelection,表示一个或多个Actor,所以向ActorSelection发送消息时,对应的一个或多个Actor都将接收到消息。
消息不可变
Akka中都是消息驱动的,所以消息是非常重要。理论上,Actor 可以传递或接收任何类型的消息,但是为了避免出现竞态条件(对共享资源的并发写)和不可预知的数据篡改,最好把消息设计成不可变的对象。在该对象中,我们通常会把所有变量设计成只读,在该对象的整个生命周期内,这些数据将不可修改。所以在 Java 中,我们借助final关键字来实现。
对于集合框架,有必要通过 Collections.unmodifiable 系列的 API 来保证不变性。Collections.unmodifiable 内部使用 UnmodifiableList 、UnmodifiableMap 等 API 对现有的集合进行包装,它们通过重写其中的 set、add、put 等方法来达到限制数据修改的目的。
Actor行为切换
在业务处理过程中,往往会面临一些关键数据的变化,而这些变化也会进一步影响着程序的执行逻辑,很多时候,我们把这些关键数据称之为「状态」。状态的处理需要依赖于具体的业务,而业务本身也可能充满不确定性,一种比较好的做法是,我们把多个状态的处理过程(即行为)封装起来,形成一个个独立的组件,在使用时就可以很方便地组装、重用或切换。在 Actor 中,行为可以通过Receive来定义,一旦抽取完我们所需要的Receive,就可以使用 become/unbecome 方法来切换他们,其中 become 表示切换为某个行为,unbecome 表示修改回上一个行为。
Actor生命周期
Actor 在运行时中会经历不同的阶段,比如创建、运行、重启和销毁等,这一系列的过程或状态,我们称之为生命周期。在理想情况下,Actor 应该像一头老黄牛一样任劳任怨地不停工作,但实际情况往往是,当某个不算常态的错误发生时(比如网络偶尔超时),我们想让它重启一下,然后重复之前的动作;又或者,当一个程序异常被抛出时,我们不得不停止该 Actor,让它不能继续走下去,这些行为有可能是手动触发的,也可能是 Actor 的监督-容错机制导致的。无论怎样,当这些行为发生时,我们都希望能及时感知并做有效处理。
Actor 生命周期主要包括启动(Start)、恢复(Resume)、重启(Restart)、停止(Stop)这四个阶段,每个阶段都会伴随着自身状态信息的变化。
创建并启动
当通过 actorOf 创建并启动 Actor 时,该 Actor 除了默认拥有已指定的 path 外,也会被分配一个 UID(可以通过 getSelf().path().uid()得到该值),作为它的唯一标识。在 Actor 启动后,会默认调用 preStart 方法,在该方法里,我们可以做些资源初始化的操作。
恢复运行
当 Actor 出现某种异常后,通过容错机制,可以让该 Actor 恢复并继续运行,此时 Actor 会延用之前的实例,状态也会被保留下来。
重启
当 Actor 出现某种异常后,通过容错机制,可以让该 Actor 执行重启。重启的具体过程如下:
- 调用旧实例的 preRestart 方法,该方法默认情况下会停掉所有子级 Actor,并调用 postStop 方法;
- 创建新实例,并在新实例上调用 postRestart 方法,该方法默认情况下会调用 preStart 方法。
Actor 重启后,path 和 UID 不变,这意味着假如要继续使用该 Actor,不需要重新获取 ActorRef 对象,使用之前那个就可以了。同时也得注意到:Actor 重启后不会保留自身状态。
停止
当 Actor 停止时,会调用 postStop 方法,同时会发送一条 Terminated信息给自己的监控者,告知自己已处于停止状态。
停掉一个Actor
停止 Actor 大致有三种方式,它们分别是:
- 调用 ActorSystem 或者 getContext() 的 stop 方法;
- 给 Actor 发送一个 PoisonPill (毒丸)消息;
- 给 Actor 发送一个 Kill 的消息,此时会抛出ActorKilledException 异常,并上报到父级 supervisor 处理。
Actor 在停止时都会遵循一套比较可靠的流程:
- 当停止 Actor 时,正在处理的消息会在完全停止之前处理完毕,后续信息将不再进行处理,邮箱(用来保存 Actor 的消息)将被挂起;
- 给所有子级 Actor 发送终止指令,当子级都停掉后,再停掉自己,停止完毕后会调用 postStop 方法,在这里可以清理或释放资源;
- 向生命周期监控者( DeathWatch )发送 Terminated 消息,以便监控者做相应的处理。
可以使用getContext().watch();方法监控某个节点的停止信息,当该节点停止后,监控者会接收到一个Terminated消息。
监督与容错处理
Actor 系统采用”父监督”的模式进行管理,即父 Actor 会监督子 Actor 的异常情况,然后根据默认或者预设的处理逻辑来确定到底是该恢复 Actor、停止 Actor、重启 Actor 还是把错误上溯到父级。
Akka 提供了两种监督策略,分别是 One-For-One Strategy 和 All-For-One Strategy ,前者表示当一个子 Actor 出现异常时,只对该 Actor 做处理,后者则表示对所有子 Actor 都做处理,大部分时候应该选择 One-For-One Strategy (这也是默认的策略),除非子 Actor 之间的业务有很强的关联或者互相依赖。当程序中没有显式指定策略时,会启动一个默认策略:
- 当抛出 ActorInitializationException 和 ActorKilledException时,会终止子 Actor;
- 当抛出 Exception 时,会重启子 Actor;
- 抛出其他类型的 Throwable 异常时,会上溯失败到父级。
在 Actor 执行任务时可能会抛出不同类型的异常,很多时候,我们应该知道,哪些异常需要让 Actor 停止或重启,哪些异常可以「视而不见」。为了更加细化这种判断,我们需要自定义监督逻辑。
自定义一个监督逻辑非常简单,只需要在父 Actor 中创建一个SupervisorStrategy 对象,并通过 supervisorStrategy ()方法返回出来.
熔断(circuit breaker)
当一个Actor出现异常时,如果用户还继续向这个Actor发送消息,很有可能这个消息还是无法被处理,还额外的增加了服务器的压力,Akka中提供了熔断机制(circuit breaker)。
Circuit Breaker 状态图(图片来自 Akka 官网)
circuit breaker有三个状态,分别是:Closed、Open、Half-Open。
Close 状态
- 正常情况下,circuit breaker是closed状态:Actor出现异常或调用超过配置的callTimeout,就增加一次失败计数;
- 成功则重置失败计数为 0;
- 当失败次数达到maxFailures后,circuit breaker会进入Open状态。
Open 状态
- Actor接收到的所有调用将抛出 CircuitBreakerOpenException ,立即失败;
- 在resetTimeout后,circuit breaker 进入Half-Open状态。
Half-Open 状态
- 第一次调用将尝试运行而不会快速失败;
- 如果第一个调用成功,会重回 Closed 状态;
- 如果第一个调用失败,会进入 Open 状态,然后等待下一次resetTimeout 。
配置
Akka 程序在启动时会加载一些默认的配置项,所以我们不需要显式地提供配置文件,假如要个性化配置某些行为,则需要新建配置文件。在默认情况下,程序会加载 classpath 下的 application.conf 、application.json 、application.properties 文件。
在创建ActorSystem 时,可以显式加载自定义的配置文件,比如app.conf ,代码如下:
ActorSystem system = ActorSystem.create( "myapp",ConfigFactory.load( "app.conf" ));
Akka 提供的配置项涵盖了几乎所有的核心功能,比如日志、远程、路由、消息序列化、分布式集群等.