akka是一个工具包,试用于可扩展、弹性的,多核并发和跨网络交互的系统。
可以简单理解为是一个实现了actor model的工具包和运行时。
Akka actor实现
actor system:
The actor system as a collaborating ensemble of actors is the natural unit for managing shared facilities like scheduling services, configuration, logging, etc. Several actor systems with different configurations may co-exist within the same JVM without problems, there is no global shared state within Akka itself, however the most common scenario will only involve a single actor system per JVM.
An actor is a container for State, Behavior, a Mailbox, Child Actors and a Supervisor Strategy. All of this is encapsulated behind an Actor Reference. One noteworthy aspect is that actors have an explicit lifecycle, they are not automatically destroyed when no longer referenced; after having created one, it is your responsibility to make sure that it will eventually be terminated as well—which also gives you control over how resources are released When an Actor Terminates.
actor lifecycle:
Demo代码:
flink使用的是旧版的 akka API,也就是官网上的classic akka。demo也是用classic。
public class EchoActor extends AbstractActor {private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);@Overridepublic Receive createReceive() {return receiveBuilder().match(String.class,s -> {log.info("Received String message: {}", s);if(!getSender().isTerminated()){getSender().tell("echo:" + s, getSelf());}}).matchAny(o -> log.info("received unknown message")).build();}}resouces目录添加配置文件application.confakka {actor {# provider=remote is possible, but prefer clusterprovider = remote}remote.artery.enabled = falseremote.classic {enabled-transports = ["akka.remote.classic.netty.tcp"]netty.tcp {hostname = "127.0.0.1"#0动态选择portport = 0}}}public class Server {public static void main(String[] args) throws ExecutionException, InterruptedException {ActorSystem actorSystem = ActorSystem.create("server");final ActorRef deadLettersActor =actorSystem.actorOf(DeadLettersActor.getProps(), "deadLettersActor");actorSystem.eventStream().subscribe(deadLettersActor, DeadLetter.class);//创建Actor并获取ActorRefActorRef myactorRef = actorSystem.actorOf(Props.create(EchoActor.class), "EchoServer");System.out.println(myactorRef.path());//tell非阻塞异步通信myactorRef.tell("ssss", null);//tell非阻塞异步通信,获取结果futureCompletableFuture<Object> future1 = Patterns.ask(myactorRef, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();actorSystem.log().info("response:{}", future1.get());// actorSystem.terminate();}}public class Client {public static void main(String[] args) throws ExecutionException, InterruptedException {ActorSystem actorSystem = ActorSystem.create("client");//根据url获取ActorSelectionActorSelection echoServer = actorSystem.actorSelection("akka.tcp://server@127.0.0.1:11880/user/EchoServer");System.out.println(echoServer.path());echoServer.tell("ssss", null);CompletableFuture<Object> future1 = Patterns.ask(echoServer, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();actorSystem.log().info("response:{}", future1.get());actorSystem.terminate();}}
