熟悉ES6的同学可能知道,promise是JS在ES6中引入的新特性,其主要目的是将回调转变成链式调动。

当我们不确定 future 何时会完成时,可以会借助 Promise 许下一个 “承诺” ,它表示:在某个未来的时间点,一定能够得到值。
当然scala的promise和ES6的promise还是不一样的

基本使用

  1. 定义Promise对象
  2. 添加回调函数(成功的回调或者失败的都行)
  3. 为Promise对象赋值(成功值或者失败值)

注意:方法一赋值会抛出异常,方法二不会

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. val promise = Promise[String]
  3. promise.future.foreach(str => print("我的祖国是: " + str))
  4. promise.future.failed.foreach(e => println(e))
  5. /*
  6. 赋值方法一:
  7. 使用success和failure。complete方式相当于success和failure的抽象
  8. 这种赋值方法有个问题就是我们的promise只允许赋值一次,重复赋值会被抛出异常
  9. */
  10. promise.success("china")
  11. promise.failure(new Exception("not china"))
  12. promise.complete(Success[String]("china"))
  13. promise.complete(Failure[String](new Exception("not china")))
  14. /*
  15. 赋值方法二:
  16. 使用try方法赋值,这种赋值和方法一类似,但是这种方法赋值不会抛出异常
  17. */
  18. promise.trySuccess("china")
  19. promise.tryFailure(new Exception("not china"))
  20. promise.tryComplete(Success("china"))
  21. promise.tryComplete(Failure(new Exception("not china")))
  22. Thread.sleep(3000)

Promise和future结合使用

PromisedInt 在这里充当着代理的作用。它承诺提供的值具体要由哪个 future 来计算并提供,程序的调用者可能并不关心:它也许是 intFuture ,也许是 IntFuture2 。因此,我们仅需要为代理( PromisedInt.future )设置回调函数,而不是其它的 future

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. // 当我们不确定 future 何时会完成时,可以会借助 Promise 许下一个 “承诺” ,它表示:在某个未来的时间点,一定能够得到值。
  3. val promisedInt: Promise[Int] = Promise[Int]
  4. // 然而,这个 Int 值的计算实际上委托给了其它的 future 来完成。受托的 Future 在计算完结果之后会调用该 promise 的 success 方法来 “兑现” 这个承诺
  5. // promisedInt 承诺的值由 intFuture 真正实现
  6. val intFuture = Future {
  7. println("正在计算…")
  8. println("执行此计算任务的线程是:" + Thread.currentThread().getName)
  9. Thread.sleep(1000)
  10. //一旦这样做,这个 promise 将和当前的 future 绑定。
  11. promisedInt.success(300)
  12. }
  13. //考虑到异常情况,除了 success 方法, Promise 还提供了 failure , Complete 等方法。无论调用哪种方法,一个 Promise 都只能被使用一次。
  14. // promisedInt.success(300)
  15. // promisedInt.failure(new Exception(“可能的错误”))
  16. // promisedInt.complete(Success(1))
  17. //onComplete 回调函数中 “兑现” 它的返回值
  18. promisedInt.future onComplete {
  19. case Success(value) => println(value)
  20. case _ => println("出现了意外的错误")
  21. }
  22. Thread.sleep(3000)

输出

  1. 正在计算…
  2. 执行此计算任务的线程是:scala-execution-context-global-11
  3. 300

Promise 的使用模式

基于回调函数的API转换

示例:自定义timeout

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. val timer = new Timer(true)
  3. def timeout(t: Int) = {
  4. val promisedUnit = Promise[Unit]
  5. timer.schedule(new TimerTask {
  6. override def run(): Unit = {
  7. promisedUnit.success()
  8. timer.cancel()
  9. }
  10. }, t)
  11. promisedUnit.future
  12. }
  13. println("延时之前")
  14. timeout(1000).foreach(_ => println("延时一秒"))
  15. println("延时之后")
  16. Thread.sleep(2000)
  17. //输出
  18. //延时之前
  19. //延时之后
  20. //延时一秒

扩展Future的方法

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. /**
  3. * 接收一个future,生成一个FutureOps,FutureOps具有or方法
  4. * 当future调用or方法的时候就会寻找该隐式类生成一个FutureOps
  5. *
  6. * @param self
  7. * @tparam T
  8. */
  9. implicit class FutureOps[T](val self: Future[T]) {
  10. def or(that: Future[T]): Future[T] = {
  11. val promisedT = Promise[T]
  12. // 下面两个赋值,只能成功一个,哪个Future先完善,哪个成功
  13. self.onComplete(x => promisedT tryComplete (x))
  14. that.onComplete(y => promisedT tryComplete (y))
  15. promisedT.future
  16. }
  17. }
  18. val future1 = Future {
  19. Thread.sleep(1000)
  20. 1
  21. }
  22. val future2 = Future(2)
  23. future1.or(future2).foreach(println)
  24. Thread.sleep(2000)

和Future通信,结束Future

type Cancellable[T] = (Promise[Unit], Future[T])

/**
 * 这个方法接收首一个方法参数b,b接收一个future,返回一个T
 * 我们在主线程里完善Future就可以在返回T的代码块里感知到
 * 然后通过判断我们的Future是否完善来抛出异常来结束我们的异步程序
 *
 * @param b
 * @tparam T
 * @return
 */

def cancellable[T](b: Future[Unit] => T): Cancellable[T] = {
    import scala.concurrent.ExecutionContext.Implicits.global
    val cancel = Promise[Unit]
    val f = Future {
        val r = b(cancel.future)

        // 在这里向cancel对象执行赋值操作,如果我们的cancel已经被赋值
        // 如果我们正常结束,这里就不会被赋值
        // 如果从外部调用cancel结束,那这里就会抛出异常
        if (!cancel.tryFailure(new Exception)) {
            throw new CancellationException
        }
        r
    }
    (cancel, f)
}

使用await进行同步

基本用法

import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
    Thread.sleep(1000)
    1
}
// 使用await.ready,返回完善返回值的Future对象
Await.ready(future,Duration.apply(3,TimeUnit.SECONDS)).foreach(println)
// 使用await.result,返回Future里面的完善值
println(Await.result(future, Duration(3, TimeUnit.SECONDS)))

使用blocking语句,增加异步回调线程提高速度

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking

//当前时间,单位是纳秒
val startTime = System.nanoTime()
// 如果异步程序数大于默认线程数,异步程序会分批次执行
val futures = for (- <- 0 until 10) yield Future {
    Thread.sleep(1000)
}


// 如果异步程序里,使用block结构,会创建多余的线程执行程序,防止阻塞
//        val futures = for (- <- 0 until 10) yield Future {
//            blocking {
//                Thread.sleep(1000)
//            }
//        }

for (f <- futures) Await.ready(f, Duration.Inf)
val endTime = System.nanoTime()

println((endTime - startTime) / 1000000)

例子

import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global  

println("Step 1: Define a method which returns a Future")
  import scala.concurrent.ExecutionContext.Implicits.global
  def donutStock(donut: String): Int = {
    if(donut == "vanilla donut") 10
    else throw new IllegalStateException("Out of stock")
  }


  println(s"\nStep 2: Define a Promise of type Int")
  val donutStockPromise = Promise[Int]()



  println("\nStep 3: Define a future from Promise")
  val donutStockFuture = donutStockPromise.future
  donutStockFuture.onComplete {
    case Success(stock) => println(s"Stock for vanilla donut = $stock")
    case Failure(e)     => println(s"Failed to find vanilla donut stock, exception = $e")
  }



  println("\nStep 4: Use Promise.success or Promise.failure to control execution of your future")
  val donut = "vanilla donut"
  if(donut == "vanilla donut") {
    donutStockPromise.success(donutStock(donut))
  } else {
    donutStockPromise.failure(Try(donutStock(donut)).failed.get)
  }



  println("\nStep 5: Completing Promise using Promise.complete() method")
  val donutStockPromise2 = Promise[Int]()
  val donutStockFuture2 = donutStockPromise2.future
  donutStockFuture2.onComplete {
    case Success(stock) => println(s"Stock for vanilla donut = $stock")
    case Failure(e)     => println(s"Failed to find vanilla donut stock, exception = $e")
  }
  donutStockPromise2.complete(Try(donutStock("unknown donut")))

上面例子中我们使用了 Promise.success, Promise.failure, Promise.complete() 来控制程序的运行。