在scala中可以方便的实现异步操作,这里是通过Future来实现的,和java中的Future很相似,但是功能更加强大。

andThen强制保证future的执行顺序

一个 future 可以绑定多个 onComplete 。然而,上下文环境并不会保证哪个 future 的 onComplete 会被率先触发,而 andThen 方法保证了回调函数的执行顺序。

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. val intFuture = Future {
  3. Thread.sleep(2000)
  4. println(Thread.currentThread().getName)
  5. 200
  6. }
  7. // 主程序的 onComplete 方法的调用顺序不一定
  8. intFuture onComplete {
  9. case Success(int) => println(s"this future returned $int")
  10. case _ => println("something wrong has happened.")
  11. }
  12. intFuture onComplete {
  13. case Success(int) => println(s"completed with the value of $int")
  14. case _ => println("something wrong has happened.")
  15. }
  16. Thread.sleep(3000)

执行上述的程序,控制台有可能先打印 this future returned $int ,也有可能先打印 completed with the value of $int 。

import scala.concurrent.ExecutionContext.Implicits.global

val intFuture = Future {
    Thread.sleep(2000)
    println(Thread.currentThread().getName)
    200
}

intFuture onComplete {
    case Success(int) => println(s"this future returned $int")
    case _ => println("something wrong has happened.")
}

intFuture andThen  {
    case Success(int) => println(s"completed with the value of $int")
    case _ => println("something wrong has happened.")
}

Thread.sleep(3000)

andThen 方法会返回原 future 的一个镜像,并且只会在该 future 调用完 onCompelete 方法之后,andThen 才会执行。

Future链

1

Future.sequence() VS Future.traverse()

如果我们有很多个Future,然后想让他们并行执行,则可以使用 Future.sequence()

println(s"\nStep 2: Create a List of future operations")
val futureOperations = List(
  donutStock("vanilla donut"),
  donutStock("plain donut"),
  donutStock("chocolate donut")
)

println(s"\nStep 5: Call Future.sequence to run the future operations in parallel")
val futureSequenceResults = Future.sequence(futureOperations)
futureSequenceResults.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

Future.traverse() 和Future.sequence() 类似, 唯一不同的是,Future.traverse()可以对要执行的Future进行操作,如下所示:

println(s"\nStep 3: Call Future.traverse to convert all Option of Int into Int")
val futureTraverseResult = Future.traverse(futureOperations){ futureSomeQty =>
  futureSomeQty.map(someQty => someQty.getOrElse(0))
}

futureTraverseResult.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

Future.foldLeft VS Future reduceLeft

foldLeft 和 reduceLeft 都是用来从左到右做集合操作的,区别在于foldLeft可以提供默认值。看下下面的例子:

println(s"\nStep 3: Call Future.foldLeft to fold over futures results from left to right")
val futureFoldLeft = Future.foldLeft(futureOperations)(0){ case (acc, someQty) =>
  acc + someQty.getOrElse(0)
}
futureFoldLeft.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

输出结果:

Step 3: Call Future.foldLeft to fold over futures results from left to right
Results 20
println(s"\nStep 3: Call Future.reduceLeft to fold over futures results from left to right")
val futureFoldLeft = Future.reduceLeft(futureOperations){ case (acc, someQty) =>
  acc.map(qty => qty + someQty.getOrElse(0))
}
futureFoldLeft.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

输出结果:

Step 3: Call Future.reduceLeft to fold over futures results from left to right
Results Some(20)

Future firstCompletedOf

firstCompletedOf在处理多个Future请求时,会返回第一个处理完成的future结果。

println(s"\nStep 3: Call Future.firstCompletedOf to get the results of the first future that completes")
val futureFirstCompletedResult = Future.firstCompletedOf(futureOperations)
futureFirstCompletedResult.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

Future zip VS zipWith

zip用来将两个future结果组合成一个tuple. zipWith则可以自定义Function来处理future返回的结果。

println(s"\nStep 3: Zip the values of the first future with the second future")
val donutStockAndPriceOperation = donutStock("vanilla donut") zip donutPrice()
donutStockAndPriceOperation.onComplete {
  case Success(results) => println(s"Results $results")
  case Failure(e)       => println(s"Error processing future operations, error = ${e.getMessage}")
}

输出值:

Step 3: Zip the values of the first future with the second future
checking donut stock
Results (Some(10),3.25)

使用zipwith的例子:

println(s"\nStep 4: Call Future.zipWith and pass-through function qtyAndPriceF")
val donutAndPriceOperation = donutStock("vanilla donut").zipWith(donutPrice())(qtyAndPriceF)
donutAndPriceOperation.onComplete {
  case Success(result) => println(s"Result $result")
  case Failure(e)      => println(s"Error processing future operations, error = ${e.getMessage}")
}

输出结果:

Step 4: Call Future.zipWith and pass-through function qtyAndPriceF
checking donut stock
Result (10,3.25)

Future andThen

andThen后面可以跟一个自定义的PartialFunction,来处理Future返回的结果, 如下所示:

println(s"\nStep 2: Call Future.andThen with a PartialFunction")
val donutStockOperation = donutStock("vanilla donut")
donutStockOperation.andThen { case stockQty => println(s"Donut stock qty = $stockQty")}

输出结果:

Step 2: Call Future.andThen with a PartialFunction
checking donut stock
Donut stock qty = Success(10)

自定义threadpool

上面的例子中, 我们都是使用了scala的全局ExecutionContext: scala.concurrent.ExecutionContext.Implicits.global.
同样的,我们也可以自定义你自己的ExecutionContext。下面是一个使用java.util.concurrent.Executors的例子:

  println("Step 1: Define an ExecutionContext")
  val executor = Executors.newSingleThreadExecutor()
  implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(executor)



  println("\nStep 2: Define a method which returns a Future")
  import scala.concurrent.Future
  def donutStock(donut: String): Future[Int] = Future {
    // assume some long running database operation
    println("checking donut stock")
    10
  }



  println("\nStep 3: Call method which returns a Future")
  val donutStockOperation = donutStock("vanilla donut")
  donutStockOperation.onComplete {
    case Success(donutStock)  => println(s"Results $donutStock")
    case Failure(e)           => println(s"Error processing future operations, error = ${e.getMessage}")
  }

  Thread.sleep(3000)
  executor.shutdownNow()

recover() recoverWith() and fallbackTo()

这三个方法主要用来处理异常的,recover是用来从你已知的异常中恢复,如下所示:

println("\nStep 3: Call Future.recover to recover from a known exception")
donutStock("unknown donut")
  .recover { case e: IllegalStateException if e.getMessage == "Out of stock" => 0 }
  .onComplete {
    case Success(donutStock)  => println(s"Results $donutStock")
    case Failure(e)           => println(s"Error processing future operations, error = ${e.getMessage}")
}

recoverWith()和recover()类似,不同的是他的返回值是一个Future。

println("\nStep 3: Call Future.recoverWith to recover from a known exception")
donutStock("unknown donut")
  .recoverWith { case e: IllegalStateException if e.getMessage == "Out of stock" => Future.successful(0) }
  .onComplete {
    case Success(donutStock)  => println(s"Results $donutStock")
    case Failure(e)           => println(s"Error processing future operations, error = ${e.getMessage}")
}

fallbackTo()是在发生异常时,去调用指定的方法:

println("\nStep 3: Call Future.fallbackTo")
val donutStockOperation = donutStock("plain donut")
  .fallbackTo(similarDonutStock("vanilla donut"))
  .onComplete {
    case Success(donutStock)  => println(s"Results $donutStock")
    case Failure(e)           => println(s"Error processing future operations, error = ${e.getMessage}")
}