Agents

注:本节未经校验,如有问题欢迎提issue

Akka中的Agent是受 Clojure agent启发的.

Agent 提供对独立位置的异步修改. Agent在其生命周期中绑定到一个单独的存储位置,对这个存储位置的数据的修改(到一个新的状态)仅允许作为一个操作的结果发生。 对其进行修改的操作是函数,该函数被异步地应用于Agent的状态,其返回值成为Agent的新状态。 Agent的状态应该是不可变的。

虽然对Agent的修改是异步的,但是其状态总是可以随时被任何线程 来获得(通过 getapply)而不需要发送消息。

Agent是响应式的(reactive). 对所有agent的更新操作在一个ExecutionContext的不同线程中并发执行。在每一个时刻,每一个Agent最多只有一个 send 被执行. 从某个线程派发到agent上的操作的执行次序与其发送的次序一致,但有可能与从其它线程源派发来的操作交织在一起。

注意

Agent对创建它们的节点是本地的。这意味着你一般不应包括它们在消息中,因为可能会被传递到远程actor或作为远程actor的构造函数参数;那些远程Actor不能读取或更新Agent。

创建Agent

创建Agent时,调用 Agent(value) ,传入它的初始值并提供一个隐式的ExecutionContext供其使用,在这些例子中我们将使用默认的全局量,不过你的方法可能不同(YMMV : Your Method May Vary):

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. import akka.agent.Agent
  3. val agent = Agent(5)

读取 Agent 的值

Agent可以用括号调用来去引用 (你可以获取一个Agent的值) ,像这样:

  1. val result = agent()

或者使用 get 方法:

  1. val result = agent.get

读取Agent的当前值不包括任何消息传递,并立即执行。所以说虽然Agent的更新的异步的,对它的状态的读取却是同步的。

更新 Agent (send & alter)

更新Agent有两种方法:send一个函数来转换当前的值,或直接send一个新值。Agent会自动异步地应用新的值或函数。更新是以一种“发射后不管”的方式完成的,唯一的保证是它会被应用。 至于什么时候应用则没有保证,但是从同一个线程发到Agent的操作将被顺序应用。你通过调用send函数来应用一个值或函数。

  1. // send a value, enqueues this change
  2. // of the value of the Agent
  3. agent send 7
  4. // send a function, enqueues this change
  5. // to the value of the Agent
  6. agent send (_ + 1)
  7. agent send (_ * 2)

你也可以在一个独立的线程中派发一个函数来改变其内部状态。这样将不使用响应式线程池,并可以被用于长时间运行或阻塞的操作。 相应的方法是 sendOff 。 派发器不管使用 sendOff 还是 send 都会顺序执行。

  1. // the ExecutionContext you want to run the function on
  2. implicit val ec = someExecutionContext()
  3. // sendOff a function
  4. agent sendOff longRunningOrBlockingFunction

所有的send都有一个对应的alter方法来返回一个Future。参考Futures来获取Future的更多信息。

  1. // alter a value
  2. val f1: Future[Int] = agent alter 7
  3. // alter a function
  4. val f2: Future[Int] = agent alter (_ + 1)
  5. val f3: Future[Int] = agent alter (_ * 2)
  1. // the ExecutionContext you want to run the function on
  2. implicit val ec = someExecutionContext()
  3. // alterOff a function
  4. val f4: Future[Int] = agent alterOff longRunningOrBlockingFunction

等待Agent的返回值

也可以获得一个Agent值的Future,将在所有当前排队的更新请求都完成以后完成:

  1. val future = agent.future

参考Futures来获取Future的更多信息。

Monadic 用法

Agent 也支持 monadic 操作, 这样你就可以用for-comprehensions对操作进行组合. 在 monadic 用法中, 旧的Agent不会变化,而是创建新的Agent。 所以老的值(Agents)仍像原来一样可用。这就是所谓的‘持久’.

monadic 用法示例:

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. val agent1 = Agent(3)
  3. val agent2 = Agent(5)
  4. // uses foreach
  5. for (value <- agent1)
  6. println(value)
  7. // uses map
  8. val agent3 = for (value <- agent1) yield value + 1
  9. // or using map directly
  10. val agent4 = agent1 map (_ + 1)
  11. // uses flatMap
  12. val agent5 = for {
  13. value1 <- agent1
  14. value2 <- agent2
  15. } yield value1 + value2

配置

有一些配置属性是针对Agent模块的,请参阅参考配置

废弃的事务性Agent

Agent参与封闭 STM 事务是 2.3 废弃的功能。

如果Agent在一个封闭的事务中使用,然后它将参与该事务。如果你在一个事务内发送到Agent,然后对该Agent的派发将暂停直到该事务被提交,如果事务中止则丢弃该派发。下面是一个示例:

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. import akka.agent.Agent
  3. import scala.concurrent.duration._
  4. import scala.concurrent.stm._
  5. def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
  6. atomic { txn =>
  7. if (from.get < amount) false
  8. else {
  9. from send (_ - amount)
  10. to send (_ + amount)
  11. true
  12. }
  13. }
  14. }
  15. val from = Agent(100)
  16. val to = Agent(20)
  17. val ok = transfer(from, to, 50)
  18. val fromValue = from.future // -> 50
  19. val toValue = to.future // -> 70