定义返回Future的方法

下面我们看下如何定义一个返回Future的方法:

  1. println("Step 1: Define a method which returns a Future")
  2. import scala.concurrent.duration._
  3. import scala.util.{Failure, Success}
  4. import scala.concurrent.Future
  5. import scala.concurrent.ExecutionContext.Implicits.global
  6. def donutStock(donut: String): Future[Int] = Future {
  7. // assume some long running database operation
  8. println("checking donut stock")
  9. 10
  10. }

注意这里需要引入scala.concurrent.ExecutionContext.Implicits.global, 它会提供一个默认的线程池来异步执行Future。

有两种方法确定这个异步计算是否已经得到结果:

  1. 调用 future.isCompleted,如果异步计算还未执行完毕,则返回 false 。
  2. 调用 future.value,如果计算完毕,则返回 Some(Sussess(value)),否则返回 None 。

为什么 value 方法做了两层包裹呢?首先,需要考虑到这个异步计算是否执行完毕。因此最外层返回的是一个 Option 类型,如果有计算结果,则返回,否则 None。
另外,计算结果也包含了两种情况。如果计算时没有出现错误,则计算结果可以装入 Success 类中返回。反之,调用 value 将返回一个 Failure。

创建 Success,Failure

Future 提供诸多已经完成的 future 的工厂方法:successful, failed 以及 fromTry 。这些方法不需要手动导入上下文。
使用 successful 方法来创建一个已经完成的 future :

  1. val future: Future[Int] = Future.successful({
  2. println("返回一个已经完成的 Success[T]")
  3. 100
  4. })
  5. // Some(Success(100))
  6. println(future.value)

使用 failed 方法创建一个已经完成,但是出现异常的 future:

  1. val future: Future[Nothing] = Future.failed({
  2. println("该方法用于返回一个 Failure[T]")
  3. new Exception("Oops!")
  4. })
  5. //Some(Failure(java.lang.Exception: Oops!))
  6. println(future.value)

如果不确定抛出 Try[+T] 的哪一种情况,则调用 fromTry:

  1. val future: Future[Nothing] = Future.fromTry({
  2. println("可能返回 Success 或者 Failure")
  3. // Success(100)
  4. Failure(new Exception("Oops!"))
  5. })
  6. println(future.value)

Await阻塞方式获取Future的值

Await 是一种同步等待机制,主线程会在有限的时间内等待某个 Future 进行。
我们另引入一个包:scala.concurrent.duration._,这样就允许我们使用 2 second 这种方式来表示我们的最大等待时间了。
Await 主要有两个方法。第一个用法是调用 result 另主线程进入阻塞等待,直到获取该 future 的返回值。

  1. println("\nStep 2: Call method which returns a Future")
  2. import scala.concurrent.Await
  3. import scala.concurrent.duration._
  4. val vanillaDonutStock = Await.result(donutStock("vanilla donut"), 5 seconds)
  5. println(s"Stock of vanilla donut = $vanillaDonutStock")

donutStock() 是异步执行的,我们可以使用Await.result() 来阻塞主线程来等待donutStock()的执行结果。
下面是其输出:

  1. Step 2: Call method which returns a Future
  2. checking donut stock
  3. Stock of vanilla donut = 10

一般用于需要获取到该 future 的返回值才能做进一步操作的情况,如果只关心该 future 的完成状态,可以调用 ready 方法。当 future 仍处于工作状态时,主线程会等待至多 3 秒。

  1. Await.ready(intFuture, 3 second)

另外,通过 Thread.currentThread().getName 可以发现,此 future 是由另一个线程执行的:ForkJoinPool-X-worker-XX

不限制等待时间是

Await.result(future3,Duration.Inf)

onComplete异步非阻塞方式获取Future的值

如果你已经进入了 Future 空间内,就尽量不要再使用 Await 阻塞 future 的执行。Scala 提供注册 “回调函数” 的方式来令你通过函数副作用获取到某个 future 在未来返回的值。

我们可以使用Future.onComplete() 回调来实现非阻塞的通知:

println("\nStep 2: Non blocking future result")
import scala.util.{Failure, Success}

// onComplete 可以多个,按添加顺序依次执行
donutStock("vanilla donut").onComplete {
  // Success 和 Failures 是Try的两个子类
  case Success(stock) => println(s"Stock for vanilla donut = $stock")
  case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")
}

// 不加这个 Stock for vanilla donut = 10 这段有时能输出, 有时不能输出
//    Thread.sleep(3000)

Future.onComplete() 有两种可能情况,Success 或者 Failure,需要引入: import scala.util.{Failure, Success}

val intFuture = Future {
    println("正在计算...")
    println("执行此计算任务的线程是:" + Thread.currentThread().getName)
    Thread.sleep(1000)
    30
}

//    Await.ready(intFuture, 3 second)
//    和刚才的情况不同,如果主线程不阻塞一会,那么这个程序会提前结束推出。
Thread.sleep(3000)

var intValue : Int = 0

intFuture onComplete {
    case Success(value) => 
      println(value)
      // 通过代码块副作用获取到这个 Future 的 value 返回值。
      intValue = value
    case _ => println("出现了意外的错误")
}

这种方式不会阻塞主线程,为了能看到程序运行结果,我们需要主动调用 Thread.sleep 让主线程休眠一会,否则程序会立刻结束。onComplete 的返回值是一个 Unit 数据类型。

isCompleted判断有没有完成

 // step 1 加载上下文
 import scala.concurrent.ExecutionContext.Implicits.global

 // step 2 创建Future计算
 val num = Future.apply({
     1 + 1
 })

 // step 3 轮询
 while (!num.isCompleted) {}

 println(num.value)

foreach方法

除了用类似onSuccess的方法,使用 foreach也是可行的,不过foreach会忽略异常情况。但是使用faild.foreach是可以处理失败情况的。

 // step 1 加载上下文
    import scala.concurrent.ExecutionContext.Implicits.global

    // step 2 创建Future计算
    val num = Future.apply({
      1 + 1
    })

    num.foreach(print)
    num.failed.foreach(print)