akka是一个工具包,试用于可扩展、弹性的,多核并发和跨网络交互的系统。
可以简单理解为是一个实现了actor model的工具包和运行时。
Akka actor实现
actor system:
The actor system as a collaborating ensemble of actors is the natural unit for managing shared facilities like scheduling services, configuration, logging, etc. Several actor systems with different configurations may co-exist within the same JVM without problems, there is no global shared state within Akka itself, however the most common scenario will only involve a single actor system per JVM.
An actor is a container for State, Behavior, a Mailbox, Child Actors and a Supervisor Strategy. All of this is encapsulated behind an Actor Reference. One noteworthy aspect is that actors have an explicit lifecycle, they are not automatically destroyed when no longer referenced; after having created one, it is your responsibility to make sure that it will eventually be terminated as well—which also gives you control over how resources are released When an Actor Terminates.
actor lifecycle:
Demo代码:
flink使用的是旧版的 akka API,也就是官网上的classic akka。demo也是用classic。
public class EchoActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
s -> {
log.info("Received String message: {}", s);
if(!getSender().isTerminated()){
getSender().tell("echo:" + s, getSelf());
}
})
.matchAny(o -> log.info("received unknown message"))
.build();
}
}
resouces目录添加配置文件application.conf
akka {
actor {
# provider=remote is possible, but prefer cluster
provider = remote
}
remote.artery.enabled = false
remote.classic {
enabled-transports = ["akka.remote.classic.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
#0动态选择port
port = 0
}
}
}
public class Server {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ActorSystem actorSystem = ActorSystem.create("server");
final ActorRef deadLettersActor =
actorSystem.actorOf(DeadLettersActor.getProps(), "deadLettersActor");
actorSystem.eventStream().subscribe(deadLettersActor, DeadLetter.class);
//创建Actor并获取ActorRef
ActorRef myactorRef = actorSystem.actorOf(Props.create(EchoActor.class), "EchoServer");
System.out.println(myactorRef.path());
//tell非阻塞异步通信
myactorRef.tell("ssss", null);
//tell非阻塞异步通信,获取结果future
CompletableFuture<Object> future1 = Patterns.ask(myactorRef, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();
actorSystem.log().info("response:{}", future1.get());
// actorSystem.terminate();
}
}
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ActorSystem actorSystem = ActorSystem.create("client");
//根据url获取ActorSelection
ActorSelection echoServer = actorSystem.actorSelection("akka.tcp://server@127.0.0.1:11880/user/EchoServer");
System.out.println(echoServer.path());
echoServer.tell("ssss", null);
CompletableFuture<Object> future1 = Patterns.ask(echoServer, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();
actorSystem.log().info("response:{}", future1.get());
actorSystem.terminate();
}
}