akka是一个工具包,试用于可扩展、弹性的,多核并发和跨网络交互的系统。
    可以简单理解为是一个实现了actor model的工具包和运行时。

    Akka actor实现
    actor system

    The actor system as a collaborating ensemble of actors is the natural unit for managing shared facilities like scheduling services, configuration, logging, etc. Several actor systems with different configurations may co-exist within the same JVM without problems, there is no global shared state within Akka itself, however the most common scenario will only involve a single actor system per JVM.

    actor

    An actor is a container for State, Behavior, a Mailbox, Child Actors and a Supervisor Strategy. All of this is encapsulated behind an Actor Reference. One noteworthy aspect is that actors have an explicit lifecycle, they are not automatically destroyed when no longer referenced; after having created one, it is your responsibility to make sure that it will eventually be terminated as well—which also gives you control over how resources are released When an Actor Terminates.

    actor lifecycle:
    actor_lifecycle.png
    Demo代码:
    flink使用的是旧版的 akka API,也就是官网上的classic akka。demo也是用classic。

    1. public class EchoActor extends AbstractActor {
    2. private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    3. @Override
    4. public Receive createReceive() {
    5. return receiveBuilder()
    6. .match(
    7. String.class,
    8. s -> {
    9. log.info("Received String message: {}", s);
    10. if(!getSender().isTerminated()){
    11. getSender().tell("echo:" + s, getSelf());
    12. }
    13. })
    14. .matchAny(o -> log.info("received unknown message"))
    15. .build();
    16. }
    17. }
    18. resouces目录添加配置文件application.conf
    19. akka {
    20. actor {
    21. # provider=remote is possible, but prefer cluster
    22. provider = remote
    23. }
    24. remote.artery.enabled = false
    25. remote.classic {
    26. enabled-transports = ["akka.remote.classic.netty.tcp"]
    27. netty.tcp {
    28. hostname = "127.0.0.1"
    29. #0动态选择port
    30. port = 0
    31. }
    32. }
    33. }
    34. public class Server {
    35. public static void main(String[] args) throws ExecutionException, InterruptedException {
    36. ActorSystem actorSystem = ActorSystem.create("server");
    37. final ActorRef deadLettersActor =
    38. actorSystem.actorOf(DeadLettersActor.getProps(), "deadLettersActor");
    39. actorSystem.eventStream().subscribe(deadLettersActor, DeadLetter.class);
    40. //创建Actor并获取ActorRef
    41. ActorRef myactorRef = actorSystem.actorOf(Props.create(EchoActor.class), "EchoServer");
    42. System.out.println(myactorRef.path());
    43. //tell非阻塞异步通信
    44. myactorRef.tell("ssss", null);
    45. //tell非阻塞异步通信,获取结果future
    46. CompletableFuture<Object> future1 = Patterns.ask(myactorRef, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();
    47. actorSystem.log().info("response:{}", future1.get());
    48. // actorSystem.terminate();
    49. }
    50. }
    51. public class Client {
    52. public static void main(String[] args) throws ExecutionException, InterruptedException {
    53. ActorSystem actorSystem = ActorSystem.create("client");
    54. //根据url获取ActorSelection
    55. ActorSelection echoServer = actorSystem.actorSelection("akka.tcp://server@127.0.0.1:11880/user/EchoServer");
    56. System.out.println(echoServer.path());
    57. echoServer.tell("ssss", null);
    58. CompletableFuture<Object> future1 = Patterns.ask(echoServer, "aaaa", Duration.ofMillis(1000)).toCompletableFuture();
    59. actorSystem.log().info("response:{}", future1.get());
    60. actorSystem.terminate();
    61. }
    62. }