Actor

Actix是一个Rust库,为开发并发应用程序提供了框架。

Actix建立在Actor Model上。允许将应用程序编写为一组独立执行但合作的应用程序 通过消息进行通信的”Actor”。 Actor是封装的对象状态和行为,并在actix库提供的Actor System中运行。

Actor在特定的执行上下文Context中运行,上下文对象仅在执行期间可用。每个Actor都有一个单独的执行上下文。执行上下文还控制actor的生命周期。

Actor通过交换消息进行通信。分派Actor可以可选择等待响应。Actor不是直接引用,而是通过引用地址

任何Rust类型都可以是一个actor,它只需要实现Actor trait。

为了能够处理特定消息actor必须提供的此消息的Handler实现。所有消息是静态类型的。消息可以以异步方式处理。Actor可以生成其他actor或将future/stream添加到执行上下文。Actor trait提供了几种允许控制actor生命周期的方法。

Actor生命周期

Started

actor总是以Started状态开始。在这种状态下,actorstarted()方法被调用。 Actor trait为此方法提供了默认实现。在此状态期间可以使用actor上下文,并且actor可以启动更多actor或注册异步流或执行任何其他所需的配置。

Running

调用Actor的started()方法后,actor转换为Running状态。actor可以一直处于running状态。

Stopping

在以下情况下,Actor的执行状态将更改为stopping状态:

  • Context :: stop由actor本身调用
  • actor的所有地址都被消毁。即没有其他Actor引用它。
  • 在上下文中没有注册事件对象。

一个actor可以通过创建一个新的地址或添加事件对象,并返回Running :: Continue,从而使stopped状态恢复到running状态,。

如果一个actor因为调用了Context :: stop()状态转换为stop,则上下文立即停止处理传入的消息并调用Actor::stopping()。如果actor没有恢复到running状态,那么全部未处理的消息被删除。

默认情况下,此方法返回Running :: Stop,确认停止操作。

Stopped

如果actor在停止状态期间没有修改执行上下文,则actor状态会转换到Stopped。这种状态被认为是最终状态,此时actor被消毁

Message

Actor通过发送消息与其他actor通信。在actix中所有消息是typed。消息可以是实现Message trait的任何rust类型。 Message :: Result定义返回类型。让我们定义一个简单的Ping消息 - 接受此消息的actor需要返回io::Result<bool>

  1. extern crate actix;
  2. use std::io;
  3. use actix::prelude::*;
  4. struct Ping;
  5. impl Message for Ping {
  6. type Result = Result<bool, io::Error>;
  7. }
  8. fn main() {}

生成actor

如何开始一个actor取决于它的上下文(context)。产生一个新的异步actor是通过实现Actortrait 的startcreate方法。它提供了几种不同的方式创造Actor;有关详细信息,请查看文档。

完整的例子

  1. use std::io;
  2. use actix::prelude::*;
  3. use futures::Future;
  4. /// Define message
  5. struct Ping;
  6. impl Message for Ping {
  7. type Result = Result<bool, io::Error>;
  8. }
  9. // Define actor
  10. struct MyActor;
  11. // Provide Actor implementation for our actor
  12. impl Actor for MyActor {
  13. type Context = Context<Self>;
  14. fn started(&mut self, ctx: &mut Context<Self>) {
  15. println!("Actor is alive");
  16. }
  17. fn stopped(&mut self, ctx: &mut Context<Self>) {
  18. println!("Actor is stopped");
  19. }
  20. }
  21. /// Define handler for `Ping` message
  22. impl Handler<Ping> for MyActor {
  23. type Result = Result<bool, io::Error>;
  24. fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) -> Self::Result {
  25. println!("Ping received");
  26. Ok(true)
  27. }
  28. }
  29. fn main() {
  30. let sys = System::new("example");
  31. // Start MyActor in current thread
  32. let addr = MyActor.start();
  33. // Send Ping message.
  34. // send() message returns Future object, that resolves to message result
  35. let result = addr.send(Ping);
  36. // spawn future to reactor
  37. Arbiter::spawn(
  38. result.map(|res| {
  39. match res {
  40. Ok(result) => println!("Got result: {}", result),
  41. Err(err) => println!("Got error: {}", err),
  42. }
  43. })
  44. .map_err(|e| {
  45. println!("Actor is probably died: {}", e);
  46. }));
  47. sys.run();
  48. }

使用MessageResponse进行响应

让我们看看上面例子中为impl Handler定义的Result类型。看看我们如何返回Result <bool,io :: Error>?我们能够用这种类型响应我们的actor的传入消息,因为它具有为该类型实现的MessageResponse trait。这是该 trait的定义:

  1. pub trait MessageResponse <AActorMMessage> {
  2. fn handle <RResponseChannel <M >>(selfctx:&mut A :: ContexttxOption <R>);
  3. }

有时,使用没有为其实现此 trait的类型响应传入消息是有意义的。当发生这种情况时,我们可以自己实现这一 trait。这是一个例子,我们用GotPing回复Ping消息,并用GotPong回复Pong消息。

  1. use actix::dev::{MessageResponse, ResponseChannel};
  2. use actix::prelude::*;
  3. use futures::Future;
  4. enum Messages {
  5. Ping,
  6. Pong,
  7. }
  8. enum Responses {
  9. GotPing,
  10. GotPong,
  11. }
  12. impl<A, M> MessageResponse<A, M> for Responses
  13. where
  14. A: Actor,
  15. M: Message<Result = Responses>,
  16. {
  17. fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) {
  18. if let Some(tx) = tx {
  19. tx.send(self);
  20. }
  21. }
  22. }
  23. impl Message for Messages {
  24. type Result = Responses;
  25. }
  26. // Define actor
  27. struct MyActor;
  28. // Provide Actor implementation for our actor
  29. impl Actor for MyActor {
  30. type Context = Context<Self>;
  31. fn started(&mut self, ctx: &mut Context<Self>) {
  32. println!("Actor is alive");
  33. }
  34. fn stopped(&mut self, ctx: &mut Context<Self>) {
  35. println!("Actor is stopped");
  36. }
  37. }
  38. /// Define handler for `Messages` enum
  39. impl Handler<Messages> for MyActor {
  40. type Result = Responses;
  41. fn handle(&mut self, msg: Messages, ctx: &mut Context<Self>) -> Self::Result {
  42. match msg {
  43. Messages::Ping => Responses::GotPing,
  44. Messages::Pong => Responses::GotPong,
  45. }
  46. }
  47. }
  48. fn main() {
  49. let sys = System::new("example");
  50. // Start MyActor in current thread
  51. let addr = MyActor.start();
  52. // Send Ping message.
  53. // send() message returns Future object, that resolves to message result
  54. let ping_future = addr.send(Messages::Ping);
  55. let pong_future = addr.send(Messages::Pong);
  56. // Spawn pong_future onto event loop
  57. Arbiter::spawn(
  58. pong_future
  59. .map(|res| {
  60. match res {
  61. Responses::GotPing => println!("Ping received"),
  62. Responses::GotPong => println!("Pong received"),
  63. }
  64. })
  65. .map_err(|e| {
  66. println!("Actor is probably died: {}", e);
  67. }),
  68. );
  69. // Spawn ping_future onto event loop
  70. Arbiter::spawn(
  71. ping_future
  72. .map(|res| {
  73. match res {
  74. Responses::GotPing => println!("Ping received"),
  75. Responses::GotPong => println!("Pong received"),
  76. }
  77. })
  78. .map_err(|e| {
  79. println!("Actor is probably died: {}", e);
  80. }),
  81. );
  82. sys.run();
  83. }