注:本文大部分文字摘抄《Akka 实战:快速构建高可用分布式应用》中内容,但此书中所用的akka版本比较老,所以本文中的代码是本文作者基于最新版本的akka以及官网所实现。

文中涉及到的所有Demo都在github上 网页地址为:https://github.com/boomblog/AkkaDemo clone地址为:https://github.com/boomblog/AkkaDemo.git

远程

创建远程 ActorSystem

**要使用 Akka 的远程功能,需要先确保项目已经加入了 akka-remote 依赖,maven 配置如下:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-remote_2.12</artifactId>
  4. <version>2.5.23</version>
  5. </dependency>

定义远程 ActorSystem 并不需要代码的支持,仅仅只需要在*.conf 文件中做如下配置即可:

  1. akka {
  2. actor {
  3. provider = remote
  4. }
  5. remote {
  6. enabled-transports = ["akka.remote.netty.tcp"]
  7. netty.tcp {
  8. hostname = "127.0.0.1"
  9. port = 2552
  10. }
  11. }
  12. }

远程和本地的配置大部分都相似,区别主要在以下几个地方:
1)将 provider 配置成 remote ,表示提供远程功能。而默认的 provider 是 local 。
2)配置 akka.remote 节点,其中 enabled-transports 表示传输实现,我们可以通过 akka.remote.transport.Transport 接口来自定义该实现。hostname 和 port 分别表示服务的 IP 和端口号。假如不显式的配置 hostname,系统会默认调用 InetAddress.getLocalHost().getHostAddress()设置该值。port 的默认值为 2552,假如设置为 0,系统会生成一个随机可用的端口。

按照上面的配置,一旦使用 ActorSystem.create 创建 ActorSystem 实例,将会在 127.0.0.1:2552 上启动一个监听,并打印如下日志:

  1. [INFO] [07/24/2019 16:24:54.795] [main] [akka.remote.Remoting] Starting remoting
  2. [INFO] [07/24/2019 16:24:55.093] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://sys@127.0.0.1:2552]
  3. [INFO] [07/24/2019 16:24:55.095] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://sys@127.0.0.1:2552]

此时该 ActorSystem 的远程访问地址是:akka.tcp://sys@127.0.0.1:2552,其中 sys 表示 ActorSystem 的名称。

Actor 远程访问

**当一个远程 ActorSystem 启动后,我们可以通过远程地址直接访问到该系统下的 Actor。这个操作看起来是一个「客户端-服务端」类型的操作,但是实际上不是,在 Akka 中并没有直接提供专门的客户端 API 来访问服务端,服务的提供者和消费者必须都在 ActorSystem 环境下进行,它们的地位是对等的。

假如远程 ActorSystem 启动在 127.0.0.1:2552 节点上,并创建了一个名为「rmtActor」的 Actor,那么该 Actor 的远程地址就是「akka.tcp://sys@127.0.0.1:2552/user/rmtActor」,此时可以使用 actorSelection 方法获取该 Actor:

  1. ActorSelection selection =
  2. getContext().actorSelection("akka.tcp://sys@127.0.0.1:2552/user/rmtActor");
  3. selection.tell("hello remoteActor", getSelf());

同之前讲的 RPC 一样,当向远程 Actor 发送对象数据时,该对象一定要能被序列化,最简单的方式是让该类型实现 java.io.Serializable 接口。

当远程的 Actor 不可用时,在远程系统上做 DeathWatch 时和本地系统类似,即通过 getContext().watch 方法监控目标 Actor,当目标 Actor 终止或者由于网络原因不能正确地引用到 Actor 时,监控者就会接收到Terminated 消息。这里大家应该注意到,Terminated 消息的产生与网络也是有关系的。在远程环境中,Akka 会通过发送心跳信息来检测远程节点是否可用,心跳间隔时间默认为 1s,该时间可以通过 akka.remote.watch-failure-detector.heartbeat-interval 来设置。但是这里有个问题:到底怎样的节点才会被判断为不可用?是通过超时设置来判断吗?实际上,Akka 在做远程故障检测时并没有简单地将超时作为判断依据。而是基于对历史心跳数据的统计,来计算出一个可信度的值,然后将这个值和akka.remote.watch-failure-detector.threshold 设定的值做对比,假如达到这个值,则表示失败,在默认情况下 threshold 的值为 10,基本上可以满足大部分场景。在这个过程中,还有一个参数会影响检测效果,即akka.remote.watch-failure-detector.acceptable-heartbeat-pause ,它表示在发送心跳过程中的延迟,比如当本地系统和远程系统进行通信时突然断开网络,假如你不希望立马检测出故障,可以将该值设置大一点,当网络恢复后可以继续正常工作。总之,通过这种方式可以让故障检测变得更加智能,同时也避免了「一刀切」式的处理。

创建远程 Actor

在很多时候,远程 Actor 的创建会由「客户端」主导。当「客户端」通过配置或者代码指定远程创建节点时,它会通过网络将请求发送给该远程,然后委托它创建 Actor,此时客户端将拿到远程 Actor 的引用,后续使用方式和普通 Actor 并没有太大区别。假如要通过配置的方式指定节点,需要在*.conf 下加上以下内容:

  1. actor {
  2. provider = "remote"
  3. deployment {
  4. /rmtCrtActor{
  5. remote = "akka.tcp://sys@127.0.0.1:2552"
  6. }
  7. }
  8. }
  9. #其他配置略...

在这段配置中,指定了 Actor 的名称为 rmtCrtActor ,节点为 127.0.0.1:2552 上的 sys 系统。在创建时,务必要保证远程系统已经启动,并且远程和本地都要有 Actor Class ,下面是部分测试代码:

  1. ActorRef ref=system.actorOf(Props.create(RmtCreateActor.class),"rmtCrtActor");
  2. ref.tell("hello rmt",ActorRef.noSender());

注意,Actor 的名字一定要和配置中的路径对应。假如创建的是具有层级关系的 Actor,那么可以将路径配置为/*/ ``rmtCrtActor

假如在项目中,远程节点信息是需要动态获取的,那么可以考虑使用远程部署 API 来实现:

  1. Address addr = new Address("akka.tcp", "sys", "127.0.0.1", 2552);
  2. ActorRef ref=getContext().actorOf(Props.create(RmtCreateActor.class).withDeploy(new Deploy(new RemoteScope(addr))));

Akka集群

Akka 的集群功能由 akka-cluster 模块提供,它基于去中心化的 P2P 模型,没有单点故障和单点瓶颈,可以满足多个应用场景。

创建 Akka 集群添加maven配置:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-cluster_2.12</artifactId>
  4. <version>2.5.23</version>
  5. </dependency>

为了测试方便,我们将在本地创建三个节点,它们分别是:127.0.0.1:2550、127.0.0.1:2551、127.0.0.1:2552,在实际项目中,它们可能分别部署在不同的物理机上。按道理,每个节点的配置信息是不一样的,比如 IP 或者端口,那么给每个节点创建自己的配置文件会让程序结构看起来更加清晰,但是也会造成配置文件过多以及重复性内容,在我们这个示例中,可以让三个节点共用一个配置文件,然后通过传参的方式进行动态配置。下面是一个最基础的配置示例:

  1. akka {
  2. #loglevel=OFF
  3. actor {
  4. provider = "akka.cluster.ClusterActorRefProvider"
  5. }
  6. remote {
  7. log-remote-lifecycle-events = off
  8. netty.tcp {
  9. hostname = "127.0.0.1"
  10. port = 2550
  11. }
  12. }
  13. cluster {
  14. seed-nodes = [
  15. "akka.tcp://sys@127.0.0.1:2551",
  16. "akka.tcp://sys@127.0.0.1:2552"]
  17. }
  18. }

具体源码看github.

image.png

要注意的是,集群中所有成员的 ActorSystem 名称必须一样.

当新的节点启动时,它会发送消息给所有的 seed-nodes ,然后发送 join 命令给第一个应答节点,实际上,新节点也可以给任何一个集群中的节点发送 join 命令,不必非得是 seed-nodes 。当 seed-node 为空时,系统启动时不会加入任何节点,我们可以调用 Cluster.join 方法将当前节点加入指定的 Address,示例代码如下:

  1. Config config =ConfigFactory.parseString("akka.remote.netty.tcp.port=2555").withFallback(ConfigFactory.load("cluster.conf"));
  2. ActorSystem system = ActorSystem.create("sys", config);
  3. system.actorOf(Props.create(ClusterDemo.class),"scListener_JoinDemo");
  4. Cluster cluster = Cluster.get(system);
  5. Address address = new Address("akka.tcp", "sys", "127.0.0.1",2551);
  6. cluster.join(address);

其中 cluster.conf 中的 seed-nodes 已经设置为空数组[],其他配置和前面的示例完全相同,这里不再给出。当执行上面这段代码后,当前节点会向 127.0.0.1:2551 节点发送 join 请求并获得对方的欢迎提醒,此时它处于 joining 状态,当这个状态传播到整个集群之后,Leader 节点会将其设置为 up 状态,该节点便成功加入了集群。

Akka持久化

Akka 的持久化功能可以让 Actor 保存它们的内部状态,以便在程序出现故障或者 JVM 崩溃时进行自我恢复。要注意的是,Akka 持久化并不会直接保存 Actor 的当前状态,而是会保存所有变化,当需要恢复时,Actor 会通过回放这些变化来重建自己的状态,这个过程遵循 EventSourcing 的设计理念。

EventSourcing 是一种以事件为单位的架构模型,具体来讲就是:当业务对象在处理某个逻辑之后,会把这个处理过程相关的数据抽象并保存起来,这些数据包括时间、状态、动作等,而这些数据组成的对象我们可称之为事件。也就是说,事件对象描述的就是过去发生的每个行为以及这个行为产生的数据。当事件对象被保存成功后就不能再修改,只能继续追加。

一个复杂的业务逻辑可能会包含很多个事件对象,在必要的情况下,我们可以通过这些事件对象来回溯整个业务过程,但前提是,这些事件已经处于持久化的状态,而持久层既可以是文件,也可以是数据库。

在真正实现 EventSourcing 之前,我们需要根据业务来定义好事件命令和状态。

定义持久化 Actor

在 Akka 中,EventSourcing 已在持久化 Actor 中得到支持。由于 Akka 持久化是独立的模块,所以在使用时先确保已加入 maven 依赖:

  1. <dependency>
  2. <groupId>com.typesafe.akka</groupId>
  3. <artifactId>akka-persistence_2.12</artifactId>
  4. <version>2.5.23</version>
  5. </dependency>

该模块提供的 UntypedPersistentActor 即持久化 Actor,它拥有onReceiveCommand 和 onReceiveRecover 两个关键方法,前者用于接收外部消息(和普通 Actor 的 onReceive 方法类似),一般情况下,我们会在这个方法里面调用 persist 方法来保存事件对象;后者主要用于执行数据恢复逻辑,它会在 Actor 重启后自动调用。除了保存单个事件外,我们还可以调用 saveSnapshot 方法来保存当前状态下的快照,使用快照的好处是可以大幅度缩短恢复时间。

示例代码看github。

Akka 已经提供了用于保存事件日志和快照的插件,我们只需要在*.conf 文件中配置这些插件即可,比如:

  1. akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
  2. akka.persistence.snapshot-store.plugin ="akka.persistence.snapshot-store.local"
  3. akka.persistence.journal.leveldb.dir = "target/example/journal"
  4. akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

akka.persistence.journal.plugin 指定了事件日志的插件,这里采用了 LevelDB 作为底层存储,当调用 persist 方法后,事件将被存储在akka.persistence.journal.leveldb.dir 指定的路径下。

akka.persistence.snapshot-store.plugin 指定了快照存储的插件,当调用 saveSnapshot 方法后,快照将被存储在 akka.persistence.snapshot-store.local.dir 指定的路径下。

另外,由于 LevelDB 并没有集成在 Akka-Persistence 模块中,所以在使用时还得在 Maven 中加入相关依赖:

  1. <dependency>
  2. <groupId>org.iq80.leveldb</groupId>
  3. <artifactId>leveldb</artifactId>
  4. <version>0.7</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.fusesource.leveldbjni</groupId>
  8. <artifactId>leveldbjni-all</artifactId>
  9. <version>1.8</version>
  10. </dependency>

假如大家觉得它自带的插件并不符合项目需要,可以考虑去http://akka.io/community/ 获取其他存储插件。当然,你也可以自定义插件,比如对于事件日志,你可以继承 AsyncWriteJournal 并实现其相关方法,然后将其配置在*.conf 文件里:

  1. akka.persistence.journal.plugin = "custom-journal"
  2. custom-journal{
  3. class = "com.book.CustomJournal"
  4. /*其他配置略*/
  5. }

假如要自定义快照插件,则需要继承 SnapshotStore ,它的配置方式可参照上面的做法。

Http

很简单,看demo吧。